DeepStream Pipeline Camera Sync Issue After Prolonged Running

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU) Jetson
• DeepStream Version 6.3
• JetPack Version (valid for Jetson only) 5.1.3
• TensorRT Version
• NVIDIA GPU Driver Version (valid for GPU only)
• Issue Type( questions, new requirements, bugs)

I have a DeepStream pipeline based on the deepstream-test3 sample application, running two object detectors:

  1. A wheel detection model
  2. A nut detection model (which only runs when a wheel is detected)

The pipeline is deployed on a Jetson Xavier and processes video from two rtsp cameras:

  • One camera captures the left wheel of a truck
  • The other captures the right wheel

Performance Observations

  • When no wheels are detected, the pipeline runs at around 15 FPS, since the nut detection model is inactive.
  • When wheels and nuts are detected, the FPS drops to 8-11 FPS (expected, as the second model starts processing).

To ensure real-time processing, I have set the camera FPS to 8, so the pipeline should not drop below 8 FPS under normal conditions. However, if FPS drops momentarily, the pipeline compensates by processing additional frames in the next few seconds to catch up. The logs below illustrate this behavior:

2025-03-18 03:23:02,225 : INFO: PID 1: PERF: {0: 8.0, 1: 8.0}
2025-03-18 03:23:22,227 : INFO: PID 1: PERF: {0: 8.0, 1: 8.0}
2025-03-18 03:23:42,228 : INFO: PID 1: PERF: {0: 8.0, 1: 8.0}
2025-03-18 03:24:02,229 : INFO: PID 1: PERF: {0: 8.0, 1: 8.0}
2025-03-18 03:24:22,230 : INFO: PID 1: PERF: {0: 6.65, 1: 6.65}
2025-03-18 03:24:42,231 : INFO: PID 1: PERF: {0: 9.35, 1: 9.35}

Issue: Camera Sync Loss Over Time

After running continuously for 3-4 days, the two camera feeds become unsynchronized.

  • My dashboard logs images with timestamps.
  • The left-side wheel is detected ~30 seconds earlier than the right-side wheel, even though both cameras started in sync.

This desynchronization increases over time, even though FPS fluctuations are balanced. Can you guide me how to fix this issue.

Can you measure the GPU loading when there are “wheels and nuts are detected”?

03-18-2025 22:07:27 RAM 6769/14885MB (lfb 177x4MB) SWAP 660/40211MB (cached 1MB) CPU [34%@1420,38%@1420,43%@1420,46%@1420,30%@1420,29%@1420] EMC_FREQ 0% GR3D_FREQ 30% AUX@35.5C CPU@38C thermal@36.15C AO@35C GPU@37C PMIC@50C
03-18-2025 22:07:28 RAM 6769/14885MB (lfb 177x4MB) SWAP 660/40211MB (cached 1MB) CPU [29%@1420,31%@1420,66%@1420,36%@1420,31%@1420,32%@1420] EMC_FREQ 0% GR3D_FREQ 17% AUX@36C CPU@38C thermal@37.05C AO@35C GPU@37.5C PMIC@50C
03-18-2025 22:07:29 RAM 6709/14885MB (lfb 182x4MB) SWAP 660/40211MB (cached 1MB) CPU [34%@1420,43%@1420,38%@1420,32%@1420,56%@1420,32%@1420] EMC_FREQ 0% GR3D_FREQ 86% AUX@36C CPU@38C thermal@37.35C AO@35.5C GPU@37.5C PMIC@50C
03-18-2025 22:07:30 RAM 6708/14885MB (lfb 182x4MB) SWAP 660/40211MB (cached 1MB) CPU [43%@1420,33%@1420,27%@1420,34%@1420,43%@1420,33%@1420] EMC_FREQ 0% GR3D_FREQ 94% AUX@36C CPU@38.5C thermal@37.5C AO@35.5C GPU@38C PMIC@50C
03-18-2025 22:07:31 RAM 6708/14885MB (lfb 182x4MB) SWAP 660/40211MB (cached 1MB) CPU [34%@1420,45%@1420,30%@1420,44%@1420,30%@1420,41%@1420] EMC_FREQ 0% GR3D_FREQ 50% AUX@36C CPU@38.5C thermal@37.5C AO@35.5C GPU@38C PMIC@50C
03-18-2025 22:07:32 RAM 6706/14885MB (lfb 182x4MB) SWAP 660/40211MB (cached 1MB) CPU [43%@1420,32%@1420,23%@1420,37%@1420,29%@1420,31%@1420] EMC_FREQ 0% GR3D_FREQ 33% AUX@36C CPU@38.5C thermal@37.5C AO@36C GPU@38.5C PMIC@50C

The GPU loading is a little bit high. Have you used nvtracker between PGIE and SGIE?

Yes, I am using tracker between pgie and sgie

Have you set the nvstreammux parameters correctly? DeepStream SDK FAQ - Intelligent Video Analytics / DeepStream SDK - NVIDIA Developer Forums

I am using the default values for streammux
batched-push-timeout → 4000000

Please read and follow DeepStream SDK FAQ - #34 by yuweiw

I have reviewed the settings, and almost all the things are similar but I am still facing the same issue.

Have you set “sync-inputs” property of nvstreammux as True with your case?

Can you show us the complete pipeline and configurations?

Here are the elements

import configparser
import math
import sys

from core import is_aarch64
from gi.repository import Gst
from utils.set_logger import logger


def create_pipeline() -> Gst.Pipeline:
    """Create a Gstreamer Pipeline

    Returns:
        Gst.Pipeline: Gstreamer Pipeline
    """
    logger.info("Creating Gst Pipeline \n")
    pipeline = Gst.Pipeline()
    if not pipeline:
        logger.error(" Unable to create Pipeline \n")
        sys.exit(1)
    else:
        return pipeline


def create_source_bin(uri: str, index: int) -> Gst.Bin:
    """Create a Gstreamer Source Bin

    Args:
        uri (str): input uri
        index (int): index of the source bin

    Raises:
        Exception: if unable to create source bin
        Exception: if unable to create uri decode bin

    Returns:
        Gst.Bin: Gstreamer Source Bin
    """
    logger.info("Creating source bin")

    bin_name = f"source-bin-{index:02d}"
    logger.info(bin_name)
    nbin = Gst.Bin.new(bin_name)
    if not nbin:
        logger.error(" Unable to create source bin \n")
        raise Exception("Unable to create source bin")

    uri_decode_bin = Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
    uri_decode_bin.set_property("file-loop", 0)
    uri_decode_bin.set_property("cudadec-memtype", 0)
    uri_decode_bin.set_property('latency', 0)
    uri_decode_bin.set_property("rtsp-reconnect-interval", 10)
    # uri_decode_bin.set_property("rtsp-reconnect-attempts", 50)

    if not uri_decode_bin:
        logger.error(" Unable to create uri decode bin \n")
        raise Exception("Unable to create uri decode bin")

    uri_decode_bin.set_property("uri", uri)
    uri_decode_bin.connect("pad-added", cb_newpad, nbin)
    uri_decode_bin.connect("child-added", decodebin_child_added, nbin)

    Gst.Bin.add(nbin, uri_decode_bin)
    bin_pad = nbin.add_pad(
        Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC)
    )
    if not bin_pad:
        logger.error(" Failed to add ghost pad in source bin \n")
        return None
    return nbin


def on_pad_added(element, element_src_pad, data):
    print("In cb_newpad\n")
    caps = element_src_pad.get_current_caps()
    str = caps.get_structure(0)
    name = str.get_name()
    depay_elem = data

    media = str.get_string("media")
    is_video = media == 'video'
    if 'x-rtp' in name and is_video is True:
        print('Start binding RTSP')
        sinkpad = depay_elem.get_static_pad("sink")
        state = element_src_pad.link(sinkpad)
        if state != Gst.PadLinkReturn.OK:
            print('Unable to link depay loader to rtsp src')
        else:
            print('Binding RTSP successfully')
    else:
        print('Cannot be bound,get_name=', name, ' , media=', media)


def decodebin_child_added(child_proxy, Object, name, user_data) -> None:
    logger.info(f"Decodebin child added: {name} \n")
    if name.find("decodebin") != -1:
        Object.connect("child-added", decodebin_child_added, user_data)

    if not is_aarch64() and name.find("nvv4l2decoder") != -1:
        Object.set_property("cudadec-memtype", 2)

    if "source" in name:
        source_element = child_proxy.get_by_name("source")
        if source_element.find_property("drop-on-latency") is not None:
            Object.set_property("drop-on-latency", True)


def cb_newpad(decodebin, decoder_src_pad, data) -> None:
    logger.info("In cb_newpad\n")
    caps = decoder_src_pad.get_current_caps()
    if not caps:
        caps = decoder_src_pad.query_caps()
    gststruct = caps.get_structure(0)
    gstname = gststruct.get_name()
    source_bin = data
    features = caps.get_features(0)

    if gstname.find("video") != -1:
        if features.contains("memory:NVMM"):
            bin_ghost_pad = source_bin.get_static_pad("src")
            if not bin_ghost_pad.set_target(decoder_src_pad):
                logger.error(
                    "Failed to link decoder src pad to source bin ghost pad\n"
                )
        else:
            logger.error(
                " Error: Decodebin did not pick nvidia decoder plugin.\n"
            )


def create_streammux(
    batch_size: int, input_height: int, input_width: int
) -> Gst.Element:
    """Create a Gstreamer Streammux Element

    Args:
        batch_size (int): batch size for streammux
        input_height (int): input_height for streammux
        input_width (int): input_width for streammux

    Returns:
        Gst.Element: Gstreamer Streammux Element
    """
    logger.info("Creating streamux \n ")
    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        logger.error(" Unable to create NvStreamMux \n")
        sys.exit(1)

    streammux.set_property("width", input_width)
    streammux.set_property("height", input_height)
    streammux.set_property("batch-size", batch_size)
    streammux.set_property("batched-push-timeout", 4000000)

    return streammux


def create_pgie(pgie_file_path: str, number_sources: int) -> Gst.Element:
    """Create a Gstreamer Primary Inference Element

    Args:
        pgie_file_path (str): config file path for pgie
        number_sources (int): number of sources

    Raises:
        Exception: if unable to create pgie

    Returns:
        Gst.Element: Gstreamer Primary Inference Element
    """
    logger.info("Creating Pgie \n ")
    pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    if not pgie:
        logger.error(" Unable to create pgie \n")
        raise Exception("Unable to create pgie")

    pgie.set_property("config-file-path", pgie_file_path)

    pgie_batch_size = pgie.get_property("batch-size")
    if pgie_batch_size != number_sources:
        logger.warn(
            f"PGIE WARNING: Overriding infer-config batch-size: "
            f"{pgie_batch_size} with number of sources "
            f"{number_sources} \n",
        )
        pgie.set_property("batch-size", number_sources)

    return pgie


def create_tracker(
    tracker_config_txt_path: str, tracker_config_yml_path: str
) -> Gst.Element:
    """Create a Gstreamer Tracker Element

    Args:
        tracker_config_txt_path (str): tracker txt config
        tracker_config_yml_path (str): tracker yml config

    Raises:
        Exception: if unable to create tracker

    Returns:
        Gst.Element: Gstreamer Tracker Element
    """
    logger.info("Creating nvtracker \n ")
    tracker = Gst.ElementFactory.make("nvtracker", "tracker")
    if not tracker:
        logger.error(" Unable to create tracker \n")
        raise Exception("Unable to create tracker")

    config = configparser.ConfigParser()
    config.read(tracker_config_txt_path)
    config.sections()

    for key in config["tracker"]:
        if key == "tracker-width":
            tracker_width = config.getint("tracker", key)
            tracker.set_property("tracker-width", tracker_width)
        elif key == "tracker-height":
            tracker_height = config.getint("tracker", key)
            tracker.set_property("tracker-height", tracker_height)
        elif key == "gpu-id":
            tracker_gpu_id = config.getint("tracker", key)
            tracker.set_property("gpu_id", tracker_gpu_id)
        elif key == "ll-lib-file":
            tracker_ll_lib_file = config.get("tracker", key)
            tracker.set_property("ll-lib-file", tracker_ll_lib_file)
        # elif key == "ll-config-file":
        #     tracker_ll_config_file = config.get("tracker", key)
        #     tracker.set_property("ll-config-file", tracker_ll_config_file)

    tracker.set_property("ll-config-file", tracker_config_yml_path)

    return tracker


def create_sgie(sgie_file_path: str, number_sources: int) -> Gst.Element:
    """Create a Gstreamer Secondary Inference Element

    Args:
        sgie_file_path (str): config file path for pgie
        number_sources (int): number of sources

    Raises:
        Exception: if unable to create sgie

    Returns:
        Gst.Element: Gstreamer Secondary Inference Element
    """
    logger.info("Creating Sgie \n ")
    sgie = Gst.ElementFactory.make("nvinfer", "secondary-inference")
    if not sgie:
        logger.error(" Unable to create sgie \n")
        raise Exception("Unable to create sgie")

    sgie.set_property("config-file-path", sgie_file_path)

    sgie_batch_size = sgie.get_property("batch-size")
    if sgie_batch_size != number_sources:
        logger.warn(
            f"SGIE WARNING: Overriding infer-config batch-size: "
            f"{sgie_batch_size} with number of sources "
            f"{number_sources} \n",
        )
        sgie.set_property("batch-size", number_sources)

    return sgie


def create_videoconverter(index=0) -> Gst.Element:
    """Create a Gstreamer Video Converter Element

    Args:
        index (int, optional): Defaults to 0.

    Raises:
        Exception: If unable to create nvvidconv1

    Returns:
        Gst.Element: Gstreamer Video Converter Element
    """
    logger.info("Creating nvvidconv1 \n ")

    if index:
        element_name = "convertor1"
    else:
        element_name = "convertor"
    nvvidconv = Gst.ElementFactory.make("nvvideoconvert", element_name)
    if not nvvidconv:
        logger.error(" Unable to create nvvidconv1 \n")
        raise Exception("Unable to create nvvidconv1")
    return nvvidconv


def create_caps_and_filter() -> Gst.Element:
    """Create a Gstreamer Caps and Filter Element

    Raises:
        Exception: If unable to get the caps filter1

    Returns:
        Gst.Element: Gstreamer Caps and Filter Element
    """
    logger.info("Creating filter1 \n ")
    caps1 = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
    filter1 = Gst.ElementFactory.make("capsfilter", "filter1")
    if not filter1 or not caps1:
        logger.error(" Unable to get the caps filter1 \n")
        raise Exception("Unable to get the caps filter1")
    filter1.set_property("caps", caps1)
    return filter1


def create_tiler(
    number_sources: int, input_height: int, input_width: int
) -> Gst.Element:
    """Creates a tiler element

    Args:
        number_sources (int): number of input sources
        input_height (int): height for the tiler
        input_width (int): width for the tiler

    Raises:
        Exception: if unable to create tiler

    Returns:
        Gst.Element: Gstreamer Tiler Element
    """
    logger.info("Creating tiler \n ")
    tiler = Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
    if not tiler:
        logger.error(" Unable to create tiler \n")
        raise Exception("Unable to create tiler")

    tiler_rows = int(math.sqrt(number_sources))
    tiler_columns = int(math.ceil((1.0 * number_sources) / tiler_rows))
    tiler.set_property("rows", tiler_rows)
    tiler.set_property("columns", tiler_columns)
    tiler.set_property("width", input_width)
    tiler.set_property("height", input_height)

    return tiler


def create_onscreendisplay() -> Gst.Element:
    """Creates a on screen display element

    Raises:
        Exception: if unable to create nvosd

    Returns:
        Gst.Element: Gstreamer On Screen Display Element
    """
    logger.info("Creating nvosd \n ")
    nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
    if not nvosd:
        logger.error(" Unable to create nvosd \n")
        raise Exception("Unable to create nvosd")
    return nvosd


def create_sink(display: bool) -> Gst.Element:
    """Creates a sink element

    Args:
        display (bool): whether to display the output

    Raises:
        Exception: if unable to create nv3dsink
        Exception: if unable to create egl sink
        Exception: if unable to create Fakesink sink

    Returns:
        Gst.Element: Gstreamer Sink Element
    """
    if display:
        if is_aarch64():
            logger.info("Creating nv3dsink \n")
            sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
            if not sink:
                logger.error(" Unable to create nv3dsink \n")
                raise Exception("Unable to create nv3dsink")
        else:
            logger.info("Creating EGLSink \n")
            sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
            if not sink:
                logger.error(" Unable to create egl sink \n")
                raise Exception("Unable to create egl sink")
    else:
        logger.info("Creating Fakesink \n")
        sink = Gst.ElementFactory.make("fakesink", "fakesink")
        if not sink:
            logger.error(" Unable to create Fakesink sink \n")
            raise Exception("Unable to create Fakesink sink")

    sink.set_property("sync", 0)
    sink.set_property("qos", 0)

    return sink

Here is main pieline

def run_pipeline(self) -> None:
        """Create and run the pipeline."""

        GObject.threads_init()
        Gst.init(None)
        is_live = False

        self.pipeline = create_pipeline()

        streammux = create_streammux(
            batch_size=len(self.cameras_list),
            input_height=self.configs.muxer_output_height,
            input_width=self.configs.muxer_output_width,
        )
        self.pipeline.add(streammux)
        source_bins = []
        for idx, camera in enumerate(self.cameras_list):
            logger.info(f"Creating source_bin {idx} \n")
            uri_name = camera.camera_uri
            if "rtsp://" in uri_name:
                is_live = True
            source_bin = create_source_bin(uri_name, idx)
            if not source_bin:
                logger.error("Unable to create source bin \n")
                raise Exception("Unable to create source bin")
            self.pipeline.add(source_bin)
            padname = "sink_%u" % idx
            sinkpad = streammux.get_request_pad(padname)
            if not sinkpad:
                logger.error("Unable to create sink pad bin \n")
                raise Exception("Unable to create sink pad bin")
            srcpad = source_bin.get_static_pad("src")
            if not srcpad:
                logger.error("Unable to create src pad bin \n")
                raise Exception("Unable to create src pad bin")
            srcpad.link(sinkpad)
            source_bins.append(source_bin)

            # add camera to self.stream_id_to_camera for easy access
            self.stream_id_to_camera[camera.stream_id] = camera

            # add camera to self.stream_id_to_gate for easy access

        pgie = create_pgie(
            self.configs.wheel_detector_config_file, len(self.cameras_list)
        )

        tracker = create_tracker(
            self.configs.tracker_txt_path, self.configs.tracker_yaml_path
        )

        sgie = create_sgie(
            self.configs.nuts_detector_config_file, 8
        )

        nvvidconv1 = create_videoconverter(1)

        filter1 = create_caps_and_filter()

        tiler = create_tiler(
            number_sources=len(self.cameras_list),
            input_height=self.configs.tiler_height,
            input_width=self.configs.tiler_width,
        )

        nvvidconv = create_videoconverter()

        nvosd = create_onscreendisplay()

        sink = create_sink(display=False)

        if is_live:
            logger.info("Atleast one of the sources is live")
            streammux.set_property("live-source", 1)

        if not is_aarch64():
            mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
            streammux.set_property("nvbuf-memory-type", mem_type)
            nvvidconv.set_property("nvbuf-memory-type", mem_type)
            nvvidconv1.set_property("nvbuf-memory-type", mem_type)
            tiler.set_property("nvbuf-memory-type", mem_type)

        logger.info("Adding elements to Pipeline \n")
        self.pipeline.add(pgie)
        self.pipeline.add(tracker)
        self.pipeline.add(sgie)
        self.pipeline.add(tiler)
        self.pipeline.add(nvvidconv)
        self.pipeline.add(filter1)
        self.pipeline.add(nvvidconv1)
        self.pipeline.add(nvosd)
        self.pipeline.add(sink)

        logger.info("Linking elements in the Pipeline \n")
        streammux.link(pgie)
        pgie.link(tracker)
        tracker.link(sgie)
        sgie.link(nvvidconv1)
        nvvidconv1.link(filter1)
        filter1.link(tiler)
        tiler.link(nvvidconv)
        nvvidconv.link(nvosd)
        nvosd.link(sink)

        loop = GLib.MainLoop()
        bus = self.pipeline.get_bus()
        bus.add_signal_watch()
        bus.connect("message", bus_call, loop)

        sink_pad = tiler.get_static_pad("sink")
        if not sink_pad:
            logger.error(" Unable to get src pad \n")
            sys.exit(1)
        else:
            sink_pad.add_probe(
                Gst.PadProbeType.BUFFER, self.tiler_sink_pad_buffer_probe, 0
            )
            GLib.timeout_add(20000, self.perf_data.perf_print_callback)

        # List the sources
        logger.info("Now playing...")
        for idx, source in enumerate(self.cameras_list):
            logger.info(f"Playing {idx} : {source.camera_uri}")

        
        logger.info("Starting pipeline \n")

        self.pipeline.set_state(Gst.State.PLAYING)
        Gst.debug_bin_to_dot_file(
            self.pipeline, Gst.DebugGraphDetails.ALL, "pipeline"
        )

        try:
            loop.run()
        except Exception as e:
            logger.error(f"Error: {e}")

        # cleanup
        logger.info("Exiting app\n")
        self.pipeline.set_state(Gst.State.NULL)

Why do you set the rtsp latency as zero? How can you guarantee the ethernet performance?

The “batched-push-timeout” is in us unit. Are you sure you want to set the timeout as 4 seconds? Why? Please follow DeepStream SDK FAQ - #34 by yuweiw

The nvstreammux property “sync-inputs” can be set to True if you want the two cameras been synchronized.

Is the FPS measured with “fakesink”?

I have tried different values for latency, but I was getting pixelated frames

Yes, FPS are measured with fakesink

The pixelated frames means there are payload packets lost during transfering through ethernet? Can you use TCP protocol instead of UDP protocol?

By using TCP the camera was getting disconnected again and again, but that issue is fixed by using latency. currently the cameras are not getting disconnected.

Is there any other issue to be discussed in this topic?

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.