Issues with Real-Time RTSP Stream and MLFlow Integration

I’m currently working with a Jetson AGX Orin 64 GB Developer Kit running JetPack version 6.0 rev2 (R36.3). My camera source is a Zed X One, and I’m trying to achieve a setup where the video feed is simultaneously processed by an ML model (YOLOv5 StrongSORT OC Sort) and streamed via RTSP.

Here is the GStreamer pipeline I’m experimenting with:
dataloader.txt (5.2 KB)
This is my python file.

this is sample for trying saving file and rtsp stream test

gst-launch-1.0 nvarguscamerasrc ! 'video/x-raw(memory:NVMM), format=NV12, framerate=15/1' ! nvv4l2h264enc bitrate=8000000 ! h264parse ! tee name=t t. ! queue ! mp4mux ! filesink location=goske_output.mp4 -e  t. ! queue ! rtspclientsink location="rtsp://redirect-rtsp:8554/main_cam" protocols=tcp

The goal is to use the video source in real time both for ML processing and for RTSP streaming. However, I’m experiencing significant delays from real-time for both streams, and I can’t seem to achieve optimal performance.

Right now, I’m experimenting and don’t have strict requirements regarding resolution, bitrate, or other parameters, but the delay is a major issue.

Has anyone successfully achieved a similar setup with low latency? Are there any tips for optimizing the GStreamer pipeline to minimize delay in such a dual-output scenario? Any guidance or suggestions would be greatly appreciated.

Thanks in advance!

Hi,

For the camera basic functionality first needs to check the device and driver configuration.
You can reference to below program guide for the detailed information of device tree and driver implementation.
https://docs.nvidia.com/jetson/archives/r36.3/DeveloperGuide/SD/CameraDevelopment/SensorSoftwareDriverProgramming.html?highlight=programing#sensor-software-driver-programming

Please refer to Applications Using V4L2 IOCTL Directly by using V4L2 IOCTL to verify basic camera functionality.
https://docs.nvidia.com/jetson/archives/r36.3/DeveloperGuide/SD/CameraDevelopment/SensorSoftwareDriverProgramming.html?highlight=programing#to-run-a-v4l2-ctl-test

Once confirm the configure and still failed below link help to get log and some information and some tips for debug.
https://elinux.org/Jetson/l4t/Camera_BringUp#Steps_to_enable_more_debug_messages

Thanks!

Hi,
If you run gstreamer pipeline with rtspsrc to receive and decode the stream, please adjust this property:

  latency             : Amount of ms to buffer
                        flags: readable, writable
                        Unsigned Integer. Range: 0 - 4294967295 Default: 2000

It is 2000 ms by default and please try small value such as 500 ms.

For AGX Orin ,you can do

  1. Run the script to enable maximum performance:

VPI - Vision Programming Interface: Performance Benchmark

  1. Boost clocks of VI, ISP, and NVCSI blocks:

Jetson/l4t/Camera BringUp - eLinux.org

  1. Enable the property to hardware encoder:
  maxperf-enable      : Enable or Disable Max Performance mode
                        flags: readable, writable, changeable only in NULL or READY state
                        Boolean. Default: false

With these settings the delay is 2 seconds.

class JetsonCameraStreamLoader:
    def __init__(self, img_size=640, stride=32, auto=True, transforms=None):
        self.img_size = img_size
        self.stride = stride
        self.auto = auto
        self.transforms = transforms

        self.sources = ['Jetson Camera']
        self.mode = 'stream'

        self.pipeline = None
        self.stopped = False

        # Build the GStreamer pipeline
        self.build_pipeline()

        # Start the pipeline
        self.pipeline.set_state(Gst.State.PLAYING)

        # Create the bus and set up message handling
        self.bus = self.pipeline.get_bus()
        self.bus.add_signal_watch()
        self.bus.connect("message", self.on_bus_message)

    def build_pipeline(self):
        # Create a new GStreamer pipeline
        pipeline = Gst.Pipeline()

        # Create elements
        source = create_element('nvarguscamerasrc', 'source')
        capsfilter = create_element('capsfilter', 'capsfilter')
        capsfilter.set_property('caps', Gst.Caps.from_string(
            'video/x-raw(memory:NVMM),width=960,height=600,format=NV12,framerate=10/1'))

        tee = create_element('tee', 'tee')
        queue1 = create_element('queue', 'queue1')
        nvvidconv = create_element('nvvidconv', 'nvvidconv')
        capsfilter2 = create_element('capsfilter', 'capsfilter2')
        capsfilter2.set_property('caps', Gst.Caps.from_string('video/x-raw, format=BGRx'))

        videoconvert = create_element('videoconvert', 'videoconvert')
        capsfilter3 = create_element('capsfilter', 'capsfilter3')
        capsfilter3.set_property('caps', Gst.Caps.from_string('video/x-raw, format=RGB'))

        appsink = create_element('appsink', 'appsink')
        appsink.set_property('emit-signals', False)
        appsink.set_property('sync', False)
        appsink.set_property('drop', False)
        appsink.set_property('max-buffers', 1)
        appsink.set_property('wait-on-eos', False)
        appsink.set_property('enable-last-sample', False)

        queue2 = create_element('queue', 'queue2')
        encoder = create_element('nvv4l2h264enc', 'encoder')
        encoder.set_property('bitrate', 8000000)

        h264parse = create_element('h264parse', 'h264parse')
        rtsp_sink = create_element('rtspclientsink', 'rtsp_sink')
        rtsp_sink.set_property('location', 'rtsp://redirect-rtsp:8554/main_cam')
        rtsp_sink.set_property('protocols', 'tcp')

        elements = [
            source, capsfilter, tee,
            queue1, nvvidconv, capsfilter2, videoconvert, capsfilter3, appsink,
            queue2, encoder, h264parse, rtsp_sink
        ]

        for elem in elements:
            if not elem:
                print(f"Failed to create element {elem.get_name()}")
                sys.exit(1)
            pipeline.add(elem)

        # Link elements in the main pipeline
        if not link_elements([source, capsfilter, tee]):
            print("Failed to link main pipeline elements")
            sys.exit(1)

        # Link tee to queue1 (appsink branch)
        tee_pad1 = tee.get_request_pad('src_%u')
        queue1_pad = queue1.get_static_pad('sink')
        if tee_pad1.link(queue1_pad) != Gst.PadLinkReturn.OK:
            print('Failed to link tee to queue1')
            sys.exit(1)
        if not link_elements([queue1, nvvidconv, capsfilter2, videoconvert, capsfilter3, appsink]):
            print("Failed to link appsink branch")
            sys.exit(1)

        # Link tee to queue2 (RTSP branch)
        tee_pad2 = tee.get_request_pad('src_%u')
        queue2_pad = queue2.get_static_pad('sink')
        if tee_pad2.link(queue2_pad) != Gst.PadLinkReturn.OK:
            print('Failed to link tee to queue2')
            sys.exit(1)
        if not link_elements([queue2, encoder, h264parse, rtsp_sink]):
            print("Failed to link RTSP branch")
            sys.exit(1)

        self.pipeline = pipeline

    def on_bus_message(self, bus, message):
        t = message.type
        if t == Gst.MessageType.EOS:
            print("End-Of-Stream reached")
            self.stopped = True
        elif t == Gst.MessageType.ERROR:
            err, debug = message.parse_error()
            print(f"Error received from element {message.src.get_name()}: {err.message}")
            print(f"Debugging information: {debug if debug else 'None'}")
            self.stopped = True

    def __iter__(self):
        return self

    def __next__(self):
        if self.stopped:
            raise StopIteration

        # Poll the bus for messages
        while True:
            message = self.bus.timed_pop_filtered(0, Gst.MessageType.ANY)
            if not message:
                break
            self.on_bus_message(self.bus, message)

        # Pull a sample from the appsink
        appsink = self.pipeline.get_by_name('appsink')
        sample = appsink.emit('pull-sample')

        caps_format = sample.get_caps().get_structure(0)
        width = caps_format.get_value('width')
        height = caps_format.get_value('height')
        format_str = caps_format.get_value('format')
        print(f"Received sample with format: {format_str}, width: {width}, height: {height}")


        if not sample:
            # No sample, possibly EOS
            self.stopped = True
            raise StopIteration

        # Process the sample
        buffer = sample.get_buffer()
        caps_format = sample.get_caps().get_structure(0)
        width = caps_format.get_value('width')
        height = caps_format.get_value('height')
        success, map_info = buffer.map(Gst.MapFlags.READ)
        if success:
            frame_data = bytes(map_info.data)
            buffer.unmap(map_info)
            frame = np.frombuffer(frame_data, np.uint8).reshape((height, width, 3))  # BGRA format
            im0 = frame
            if self.transforms:
                im = self.transforms(im0)
            else:
                # Apply letterbox resize
                im = letterbox(im0, self.img_size, stride=self.stride, auto=self.auto)[0]
                print(f"---___--__- {im.shape}, {im.dtype}")
              #  cv2.imwrite(f'/factorymonitor/after/processed_{datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}.png', im)
                im = im[..., ::-1].transpose((2, 0, 1))  # BGR to RGB, HWC to CHW
                #cv2.imwrite(f'/factorymonitor/after/processed_{datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}.png', im[..., ::-1].transpose((2,0,1)))
                # im = im.astype(np.float32) / 255.0  # Normalize to [0, 1]
            im = np.ascontiguousarray(im)
            #im = np.expand_dims(im, axis=0)  # Add batch dimension
            im0 = [im0]  # Make im0 a list to match expected input format
            return self.sources, im, im0, None, ''
        else:
            # Failed to map buffer
            self.stopped = True
            raise StopIteration

    def __len__(self):
        return len(self.sources)

    def stop(self):
        self.stopped = True
        self.pipeline.set_state(Gst.State.NULL)

gstreamer version

$ gst-launch-1.0 --version
gst-launch-1.0 version 1.20.3
GStreamer 1.20.3
https://launchpad.net/distros/ubuntu/+source/gstreamer1.0

Connected via a ZED Link Duo Capture Card over GMSL2

I’m operating in 15W mode to meet the power requirements of my setup.

I’m getting the delay in appsink too