Delay increase in rtsp over time

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU) Jetson
• DeepStream Version 6.3
• JetPack Version (valid for Jetson only) 5.1.3
• TensorRT Version
• NVIDIA GPU Driver Version (valid for GPU only)
• Issue Type( questions, new requirements, bugs)

I am using these elements for rtsp

for idx, camera in enumerate(self.cameras_list):
            logger.info(f"Creating source_bin {idx} \n")
            uri_name = camera.camera_uri
            #uri_name = "rtsp://127.0.0.1:8554/stream1"
            if "rtsp://" in uri_name:
                is_live = True
            source = Gst.ElementFactory.make("rtspsrc", f"rtspsrc{idx}")
            source.set_property("latency", 0)  #
            source.set_property("protocols", "tcp")
            source.set_property("location", uri_name)
            source.set_property('buffer-mode', 'none')
            
            depay = Gst.ElementFactory.make("rtph264depay", f"depay{idx}")
            source.connect('pad-added', on_pad_added, depay)

            h264parser = Gst.ElementFactory.make("h264parse", f"h264parse{idx}")
            decoder = Gst.ElementFactory.make("nvv4l2decoder", f"decoder{idx}")
            self.pipeline.add(source)
            #self.pipeline.add(queue)
            self.pipeline.add(depay)
            self.pipeline.add(h264parser)
            self.pipeline.add(decoder)

            source.link(depay)
            depay.link(h264parser)
            h264parser.link(decoder)

            padname = "sink_%u" % idx
            sinkpad = streammux.get_request_pad(padname)
            if not sinkpad:
                logger.error("Unable to create sink pad bin \n")
                raise Exception("Unable to create sink pad bin")
            srcpad = decoder.get_static_pad("src")
            if not srcpad:
                logger.error("Unable to create src pad bin \n")
                raise Exception("Unable to create src pad bin")
            srcpad.link(sinkpad)

            # add camera to self.stream_id_to_camera for easy access
            self.stream_id_to_camera[camera.stream_id] = camera

the rest of elements are same as default deepstream pipeline.

With the latency set to 0, everything works fine. However, when I increase the latency, the frames become pixelated, so I can modify this parameter.
The issue is after running the pipeline for 10 to 12 hours, there is a delay and pipeline is processing frames that are 1 to 2 minutes behind the real-time RTSP feed. Is there a way to resolve this issue?

How many RTSP streams are you working with ? Are they different RTSP sources or the same RTSP source?

I am using 2 different rtsp sources.

Have you set the nvstreammux parameters correctly?
DeepStream SDK FAQ - Intelligent Video Analytics / DeepStream SDK - NVIDIA Developer Forums

Can you show us the complete pipeline and parameters?

These are the elements I am using

import configparser
import math
import sys

from core import is_aarch64
from gi.repository import Gst
from utils.set_logger import logger


def create_pipeline() -> Gst.Pipeline:
    """Create a Gstreamer Pipeline

    Returns:
        Gst.Pipeline: Gstreamer Pipeline
    """
    logger.info("Creating Gst Pipeline \n")
    pipeline = Gst.Pipeline()
    if not pipeline:
        logger.error(" Unable to create Pipeline \n")
        sys.exit(1)
    else:
        return pipeline


def create_source_bin(uri: str, index: int) -> Gst.Bin:
    """Create a Gstreamer Source Bin

    Args:
        uri (str): input uri
        index (int): index of the source bin

    Raises:
        Exception: if unable to create source bin
        Exception: if unable to create uri decode bin

    Returns:
        Gst.Bin: Gstreamer Source Bin
    """
    logger.info("Creating source bin")

    bin_name = f"source-bin-{index:02d}"
    logger.info(bin_name)
    nbin = Gst.Bin.new(bin_name)
    if not nbin:
        logger.error(" Unable to create source bin \n")
        raise Exception("Unable to create source bin")

    uri_decode_bin = Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
    uri_decode_bin.set_property("file-loop", 0)
    uri_decode_bin.set_property("cudadec-memtype", 0)
    uri_decode_bin.set_property('latency', 0)
    uri_decode_bin.set_property("rtsp-reconnect-interval", 30)
    #uri_decode_bin.set_property("rtsp-reconnect-attempts", 50)

    if not uri_decode_bin:
        logger.error(" Unable to create uri decode bin \n")
        raise Exception("Unable to create uri decode bin")

    uri_decode_bin.set_property("uri", uri)
    uri_decode_bin.connect("pad-added", cb_newpad, nbin)
    uri_decode_bin.connect("child-added", decodebin_child_added, nbin)

    Gst.Bin.add(nbin, uri_decode_bin)
    bin_pad = nbin.add_pad(
        Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC)
    )
    if not bin_pad:
        logger.error(" Failed to add ghost pad in source bin \n")
        return None
    return nbin


def on_pad_added(element, element_src_pad, data):
    print("In cb_newpad\n")
    caps = element_src_pad.get_current_caps()
    str = caps.get_structure(0)
    name = str.get_name()
    depay_elem = data

    media = str.get_string("media")
    is_video = media == 'video'
    if 'x-rtp' in name and is_video is True:
        print('Start binding RTSP')
        sinkpad = depay_elem.get_static_pad("sink")
        state = element_src_pad.link(sinkpad)
        if state != Gst.PadLinkReturn.OK:
            print('Unable to link depay loader to rtsp src')
        else:
            print('Binding RTSP successfully')
    else:
        print('Cannot be bound,get_name=', name, ' , media=', media)


def decodebin_child_added(child_proxy, Object, name, user_data) -> None:
    logger.info(f"Decodebin child added: {name} \n")
    if name.find("decodebin") != -1:
        Object.connect("child-added", decodebin_child_added, user_data)

    if not is_aarch64() and name.find("nvv4l2decoder") != -1:
        Object.set_property("cudadec-memtype", 2)

    if "source" in name:
        source_element = child_proxy.get_by_name("source")
        if source_element.find_property("drop-on-latency") is not None:
            Object.set_property("drop-on-latency", True)


def cb_newpad(decodebin, decoder_src_pad, data) -> None:
    logger.info("In cb_newpad\n")
    caps = decoder_src_pad.get_current_caps()
    if not caps:
        caps = decoder_src_pad.query_caps()
    gststruct = caps.get_structure(0)
    gstname = gststruct.get_name()
    source_bin = data
    features = caps.get_features(0)

    if gstname.find("video") != -1:
        if features.contains("memory:NVMM"):
            bin_ghost_pad = source_bin.get_static_pad("src")
            if not bin_ghost_pad.set_target(decoder_src_pad):
                logger.error(
                    "Failed to link decoder src pad to source bin ghost pad\n"
                )
        else:
            logger.error(
                " Error: Decodebin did not pick nvidia decoder plugin.\n"
            )


def create_streammux(
    batch_size: int, input_height: int, input_width: int
) -> Gst.Element:
    """Create a Gstreamer Streammux Element

    Args:
        batch_size (int): batch size for streammux
        input_height (int): input_height for streammux
        input_width (int): input_width for streammux

    Returns:
        Gst.Element: Gstreamer Streammux Element
    """
    logger.info("Creating streamux \n ")
    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        logger.error(" Unable to create NvStreamMux \n")
        sys.exit(1)

    streammux.set_property("width", input_width)
    streammux.set_property("height", input_height)
    streammux.set_property("batch-size", batch_size)
    streammux.set_property("batched-push-timeout", 4000000)

    return streammux


def create_pgie(pgie_file_path: str, number_sources: int) -> Gst.Element:
    """Create a Gstreamer Primary Inference Element

    Args:
        pgie_file_path (str): config file path for pgie
        number_sources (int): number of sources

    Raises:
        Exception: if unable to create pgie

    Returns:
        Gst.Element: Gstreamer Primary Inference Element
    """
    logger.info("Creating Pgie \n ")
    pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    if not pgie:
        logger.error(" Unable to create pgie \n")
        raise Exception("Unable to create pgie")

    pgie.set_property("config-file-path", pgie_file_path)

    pgie_batch_size = pgie.get_property("batch-size")
    if pgie_batch_size != number_sources:
        logger.warn(
            f"PGIE WARNING: Overriding infer-config batch-size: "
            f"{pgie_batch_size} with number of sources "
            f"{number_sources} \n",
        )
        pgie.set_property("batch-size", number_sources)

    return pgie


def create_tracker(
    tracker_config_txt_path: str, tracker_config_yml_path: str
) -> Gst.Element:
    """Create a Gstreamer Tracker Element

    Args:
        tracker_config_txt_path (str): tracker txt config
        tracker_config_yml_path (str): tracker yml config

    Raises:
        Exception: if unable to create tracker

    Returns:
        Gst.Element: Gstreamer Tracker Element
    """
    logger.info("Creating nvtracker \n ")
    tracker = Gst.ElementFactory.make("nvtracker", "tracker")
    if not tracker:
        logger.error(" Unable to create tracker \n")
        raise Exception("Unable to create tracker")

    config = configparser.ConfigParser()
    config.read(tracker_config_txt_path)
    config.sections()

    for key in config["tracker"]:
        if key == "tracker-width":
            tracker_width = config.getint("tracker", key)
            tracker.set_property("tracker-width", tracker_width)
        elif key == "tracker-height":
            tracker_height = config.getint("tracker", key)
            tracker.set_property("tracker-height", tracker_height)
        elif key == "gpu-id":
            tracker_gpu_id = config.getint("tracker", key)
            tracker.set_property("gpu_id", tracker_gpu_id)
        elif key == "ll-lib-file":
            tracker_ll_lib_file = config.get("tracker", key)
            tracker.set_property("ll-lib-file", tracker_ll_lib_file)
        # elif key == "ll-config-file":
        #     tracker_ll_config_file = config.get("tracker", key)
        #     tracker.set_property("ll-config-file", tracker_ll_config_file)

    tracker.set_property("ll-config-file", tracker_config_yml_path)

    return tracker


def create_sgie(sgie_file_path: str, number_sources: int) -> Gst.Element:
    """Create a Gstreamer Secondary Inference Element

    Args:
        sgie_file_path (str): config file path for pgie
        number_sources (int): number of sources

    Raises:
        Exception: if unable to create sgie

    Returns:
        Gst.Element: Gstreamer Secondary Inference Element
    """
    logger.info("Creating Sgie \n ")
    sgie = Gst.ElementFactory.make("nvinfer", "secondary-inference")
    if not sgie:
        logger.error(" Unable to create sgie \n")
        raise Exception("Unable to create sgie")

    sgie.set_property("config-file-path", sgie_file_path)

    sgie_batch_size = sgie.get_property("batch-size")
    if sgie_batch_size != number_sources:
        logger.warn(
            f"SGIE WARNING: Overriding infer-config batch-size: "
            f"{sgie_batch_size} with number of sources "
            f"{number_sources} \n",
        )
        sgie.set_property("batch-size", number_sources)

    return sgie


def create_videoconverter(index=0) -> Gst.Element:
    """Create a Gstreamer Video Converter Element

    Args:
        index (int, optional): Defaults to 0.

    Raises:
        Exception: If unable to create nvvidconv1

    Returns:
        Gst.Element: Gstreamer Video Converter Element
    """
    logger.info("Creating nvvidconv1 \n ")

    if index:
        element_name = "convertor1"
    else:
        element_name = "convertor"
    nvvidconv = Gst.ElementFactory.make("nvvideoconvert", element_name)
    if not nvvidconv:
        logger.error(" Unable to create nvvidconv1 \n")
        raise Exception("Unable to create nvvidconv1")
    return nvvidconv


def create_caps_and_filter() -> Gst.Element:
    """Create a Gstreamer Caps and Filter Element

    Raises:
        Exception: If unable to get the caps filter1

    Returns:
        Gst.Element: Gstreamer Caps and Filter Element
    """
    logger.info("Creating filter1 \n ")
    caps1 = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
    filter1 = Gst.ElementFactory.make("capsfilter", "filter1")
    if not filter1 or not caps1:
        logger.error(" Unable to get the caps filter1 \n")
        raise Exception("Unable to get the caps filter1")
    filter1.set_property("caps", caps1)
    return filter1


def create_tiler(
    number_sources: int, input_height: int, input_width: int
) -> Gst.Element:
    """Creates a tiler element

    Args:
        number_sources (int): number of input sources
        input_height (int): height for the tiler
        input_width (int): width for the tiler

    Raises:
        Exception: if unable to create tiler

    Returns:
        Gst.Element: Gstreamer Tiler Element
    """
    logger.info("Creating tiler \n ")
    tiler = Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
    if not tiler:
        logger.error(" Unable to create tiler \n")
        raise Exception("Unable to create tiler")

    tiler_rows = int(math.sqrt(number_sources))
    tiler_columns = int(math.ceil((1.0 * number_sources) / tiler_rows))
    tiler.set_property("rows", tiler_rows)
    tiler.set_property("columns", tiler_columns)
    tiler.set_property("width", input_width)
    tiler.set_property("height", input_height)

    return tiler


def create_onscreendisplay() -> Gst.Element:
    """Creates a on screen display element

    Raises:
        Exception: if unable to create nvosd

    Returns:
        Gst.Element: Gstreamer On Screen Display Element
    """
    logger.info("Creating nvosd \n ")
    nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
    if not nvosd:
        logger.error(" Unable to create nvosd \n")
        raise Exception("Unable to create nvosd")
    return nvosd


def create_sink(display: bool) -> Gst.Element:
    """Creates a sink element

    Args:
        display (bool): whether to display the output

    Raises:
        Exception: if unable to create nv3dsink
        Exception: if unable to create egl sink
        Exception: if unable to create Fakesink sink

    Returns:
        Gst.Element: Gstreamer Sink Element
    """
    if display:
        if is_aarch64():
            logger.info("Creating nv3dsink \n")
            sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
            if not sink:
                logger.error(" Unable to create nv3dsink \n")
                raise Exception("Unable to create nv3dsink")
        else:
            logger.info("Creating EGLSink \n")
            sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
            if not sink:
                logger.error(" Unable to create egl sink \n")
                raise Exception("Unable to create egl sink")
    else:
        logger.info("Creating Fakesink \n")
        sink = Gst.ElementFactory.make("fakesink", "fakesink")
        if not sink:
            logger.error(" Unable to create Fakesink sink \n")
            raise Exception("Unable to create Fakesink sink")

    sink.set_property("sync", 0)
    sink.set_property("qos", 0)

    return sink

Here is the pipeline

def run_pipeline(self) -> None:
        """Create and run the pipeline."""

        GObject.threads_init()
        Gst.init(None)
        is_live = False

        self.pipeline = create_pipeline()

        streammux = create_streammux(
            batch_size=len(self.cameras_list),
            input_height=self.configs.muxer_output_height,
            input_width=self.configs.muxer_output_width,
        )
        self.pipeline.add(streammux)
        #source_bins = []
        for idx, camera in enumerate(self.cameras_list):
            logger.info(f"Creating source_bin {idx} \n")
            uri_name = camera.camera_uri
            #uri_name = "rtsp://127.0.0.1:8554/stream1"
            if "rtsp://" in uri_name:
                is_live = True
            source = Gst.ElementFactory.make("rtspsrc", f"rtspsrc{idx}")
            source.set_property("latency", 0)  #
            source.set_property("protocols", "tcp")
            source.set_property("location", uri_name)
            source.set_property('buffer-mode', 'none')
            
            depay = Gst.ElementFactory.make("rtph264depay", f"depay{idx}")
            source.connect('pad-added', on_pad_added, depay)

            h264parser = Gst.ElementFactory.make("h264parse", f"h264parse{idx}")
            decoder = Gst.ElementFactory.make("nvv4l2decoder", f"decoder{idx}")
            self.pipeline.add(source)
            #self.pipeline.add(queue)
            self.pipeline.add(depay)
            self.pipeline.add(h264parser)
            self.pipeline.add(decoder)

            source.link(depay)
            depay.link(h264parser)
            h264parser.link(decoder)

            padname = "sink_%u" % idx
            sinkpad = streammux.get_request_pad(padname)
            if not sinkpad:
                logger.error("Unable to create sink pad bin \n")
                raise Exception("Unable to create sink pad bin")
            srcpad = decoder.get_static_pad("src")
            if not srcpad:
                logger.error("Unable to create src pad bin \n")
                raise Exception("Unable to create src pad bin")
            srcpad.link(sinkpad)

            # add camera to self.stream_id_to_camera for easy access
            self.stream_id_to_camera[camera.stream_id] = camera

            # add camera to self.stream_id_to_gate for easy access

        pgie = create_pgie(
            self.configs.wheel_detector_config_file, len(self.cameras_list)
        )

        tracker = create_tracker(
            self.configs.tracker_txt_path, self.configs.tracker_yaml_path
        )

        sgie = create_sgie(
            self.configs.nuts_detector_config_file, len(self.cameras_list)
        )

        nvvidconv1 = create_videoconverter(1)

        filter1 = create_caps_and_filter()

        tiler = create_tiler(
            number_sources=len(self.cameras_list),
            input_height=self.configs.tiler_height,
            input_width=self.configs.tiler_width,
        )

        nvvidconv = create_videoconverter()

        nvosd = create_onscreendisplay()

        sink = create_sink(display=False)

        if is_live:
            logger.info("Atleast one of the sources is live")
            streammux.set_property("live-source", 1)

        if not is_aarch64():
            mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
            streammux.set_property("nvbuf-memory-type", mem_type)
            nvvidconv.set_property("nvbuf-memory-type", mem_type)
            nvvidconv1.set_property("nvbuf-memory-type", mem_type)
            tiler.set_property("nvbuf-memory-type", mem_type)

        logger.info("Adding elements to Pipeline \n")
        self.pipeline.add(pgie)
        self.pipeline.add(tracker)
        self.pipeline.add(sgie)
        self.pipeline.add(tiler)
        self.pipeline.add(nvvidconv)
        self.pipeline.add(filter1)
        self.pipeline.add(nvvidconv1)
        self.pipeline.add(nvosd)
        self.pipeline.add(sink)

        logger.info("Linking elements in the Pipeline \n")
        streammux.link(pgie)
        pgie.link(tracker)
        tracker.link(sgie)
        sgie.link(nvvidconv1)
        nvvidconv1.link(filter1)
        filter1.link(tiler)
        tiler.link(nvvidconv)
        nvvidconv.link(nvosd)
        nvosd.link(sink)

        loop = GLib.MainLoop()
        bus = self.pipeline.get_bus()
        bus.add_signal_watch()
        bus.connect("message", bus_call, loop)

        sink_pad = tiler.get_static_pad("sink")
        if not sink_pad:
            logger.error(" Unable to get src pad \n")
            sys.exit(1)
        else:
            sink_pad.add_probe(
                Gst.PadProbeType.BUFFER, self.tiler_sink_pad_buffer_probe, 0
            )
            GLib.timeout_add(20000, self.perf_data.perf_print_callback)

        # List the sources
        logger.info("Now playing...")
        for idx, source in enumerate(self.cameras_list):
            logger.info(f"Playing {idx} : {source.camera_uri}")

Both Cameras are 1920*1080 with 15 fps, pipeline normally runs at 15 fps but sometimes due to post-processing fps drops to 10 or 11 for may be 30 to 1 mins, after post-processing it again continue with 15 fps.

one camera is 20 to 30 seconds behind and second one is 90 to 120 seconds behind compared to real-time rtsp after running the pipeline for 10 to 12 hours

What kind of post-processing are you talking about?

Have you set the nvstreammux parameters correctly?
DeepStream SDK FAQ - Intelligent Video Analytics / DeepStream SDK - NVIDIA Developer Forums

I am using wheel detection model as pgie model. When ever there is a wheel it will check whether it is in the center of frame, and then there is a multiprocess that will store wheels of a truck retreive license plate from an endpoint, create a json file, and send json file to an endpoint.

I am using default parameter of deepstream sample application, can you guide me on that?

Are the postprocessing in the pad probe function of the pipeline?

Yes,
I am using another model for nut detection as sgie
there are two functions in pad probe

  1. 1st function check if there nuts are overlapping then drop those and draw the bounding boxes as nms threshold in sgie config file wasn’t working properly.
  2. second function check if the wheel is in the center and then store that along with tracking id of wheel.

The delay is caused by your extra post-processing. Please optimize your postprocessing to guarantee the processing can finish in 1/15 second.

I reviewed all the functions and these are the finding
When there are no detections, nothing is passed to sgie model hence the pipeline is running at 15 fps, but when there are detections the sgie model starts the processing as well and fps drops to 8 or 10
These are the actual fps of pipeline.
Now the problem is when there are detections I don’t want to lose a single frame , pipeline should be processing at 8 to 10 fps, but once there is no detection it should synchronize the feed with rtsp. Is there any way to that?

Please check your models’ performance.

  1. What is your SGIE’s batch size?
  2. Please check the GPU loading when there is detections.
  3. Can you use “trtexec” to measure the PGIE and SGIE models’ performance?

From your code, you are using “fakesink” as the output. How did you get the “delay”?
If you display the video locally in your device and use “sync=1” with your sink element, the sink element will drop the late frames. You will always see the frames in time.

What do you mean by “once there is no detection it should synchronize the feed with rtsp.”?

SGIE batch size is 4
I’ll check the performance of both model.

Yes, I am using the fakesink so I shouldn’t be getting the delay.

I am using the two axis cameras. I have verified this thing from the live stream dashboard that whenever there is a truck that will be detected 30 to 40 seconds late the pipeline in one camera and second camera is 90 to 150 seconds behind the real-time.

For Example:
There is a truck at 01:00:00 in live stream
Source_id 0 will detect it at 1:00:30 or may be 1:00:40
Source_id 1 will detect it at 1:02:00 or may be 1:03:00

These are the issues I am facing.

I have copied these timestamps from the pipeline just after restarting it
source_id 0 08:28:16 AM
source_id 1 08:28:20 AM 4 seconds is acceptable

but after running the pipeline for 10 to 12 hours it looks like this
source_id 0 07:50:57 AM
source_id 1 07:52:29 AM there is around 92 seconds difference

FPS are same for both cameras all the time there isn’t any difference in that, So why this delay is created overtime

What are the time? Where and how did you get the timestamps?

Do you mean after 10~12 hours, the timestamps of the frames in the same batch have more than two minutes differences?

Yes I have added the timestamps in pad probe function along with source_id
Yes the timestamps has the difference of more than two minutes

For Example
Frame number x of source_id 0 is 30 seconds behind the actual feed
Frame number x of source_id 1 is around 2 mins behind the actual feed.

What do you mean by “add”? You replace the original timestamps with your own timestamp?