Custom RTSP Reconnect Logic Failures

Please provide complete information as applicable to your setup.

• Hardware Platform (GPU)
• DeepStream Version 7.0
• TensorRT Version 10.6.0
• NVIDIA GPU Driver Version - 560.35.03
• Issue Type( questions, new requirements, bugs)
**• How to reproduce the issue ,
Hi, currently I’m working on a video analytics project, and in our application, we use uridecodebin. However, after some time, we encounter an issue where the uridecodebin RTSP connection fails, causing the application to crash. To address this, I switched to using nvurisrcbin, but now I am facing an image dilation issue.

To resolve this, I plan to implement a custom reconnect logic. For example, if the stream doesn’t process a frame within a certain time period, I will remove the affected uridecodebin and re-add it. While this reconnection works fine, after some time, my downstream elements (such as multisink, etc.) stop receiving data, even though the application appears to continue functioning normally. I’m unsure why this happens.

import configparser
import sys
from urllib.parse import urlparse
import gi
from gi.repository import Gst, GLib
import time
import uuid
import logging
import os
import pytz
from datetime import datetime,timedelta
from utils import send_message_to_iothub,read_yaml,ntp_to_sydney_time,extract_rtsp_details,convert_pts_to_sydtimestamp,create_dynamic_path,get_current_epoch_in_syd,convert_syd_epoch_to_datetime_str,get_current_datetime_str
from threading import Lock
gi.require_version('Gst', '1.0')
import random
import json
start_time=time.time()
pipeline = None
fps_mutex = Lock()

import pyds
syd_tz = pytz.timezone('Australia/Sydney')

ntp_epoch = datetime(1970, 1, 1, tzinfo=pytz.UTC)
 
format = f"[    %(asctime)s     %(levelname)s      %(message)s   ]\n"


main_yaml_file_path = "main_config_reconnect_roi.yaml"
yaml_con = read_yaml(main_yaml_file_path)

streams = yaml_con["streams"]
log_path = yaml_con["log_path"]
no_of_sec_to_send_data = int(yaml_con["no_of_sec_send_msg"])
img_dir_name = yaml_con["img_dir_name"]

osd_process_mode = int(yaml_con["osd_process_mode"])
osd_display_text = int(yaml_con["osd_display_text"])


sync_inputs = int(yaml_con["streammux_sync_inputs"])
attach_sys_ts = int(yaml_con["streammux_attach_sys_ts"])
max_latency = int(yaml_con["streammux_max_latency"])
frame_duration = int(yaml_con["streammux_frame_duration"])
streammux_muxer_batch_timeout_usec = int(yaml_con["streammux_muxer_batch_timeout_usec"])

analytics_config_path = yaml_con["analytics_config_path"]
infer_config_path = yaml_con["infer_config_path"]
tracker_config_path = yaml_con["tracker_config_path"]


max_size_buffers = int(yaml_con["queue_max_size_buffers"])

im_quality = int(yaml_con["jpeg_encoder_quality"])

os.makedirs(img_dir_name,exist_ok=True)
logging.basicConfig(filename=log_path,level=logging.INFO,format=format)
number_sources = len(streams)


start_time_streammux = { i:None for i in range(number_sources)}
start_time_streammux_ = { i:None for i in range(number_sources)}
start_time_streammux_first_frame = { i:False for i in range(number_sources)}
 
src_ids = {}


start_time_ids = { i:get_current_epoch_in_syd() for i in range(number_sources)}
streammux = None


def on_pad_added(src, pad, sink_element):
    
    print(f"New pad {pad.get_name()} added from {src.get_name()}")

    sink_pad = sink_element.get_static_pad("sink")
    if not sink_pad:
        print(f"Error: Could not get sink pad for {sink_element.get_name()}")
        return

    if sink_pad.is_linked():
        print(f"Warning: Sink pad of {sink_element.get_name()} is already linked")
        return

  
    if pad.query_caps(None).is_any():
        print(f"Warning: Pad {pad.get_name()} does not have valid caps yet!")
        return

    link_status = pad.link(sink_pad)
    if link_status != Gst.PadLinkReturn.OK:
        print(f"Pad link failed: {link_status}")
    else:
        print(f"Successfully linked {pad.get_name()} {sink_element.get_name()}")

 
class GETFPS:
    def __init__(self,stream_id):
        global start_time
       
        self.start_time=start_time
        self.is_first=True
        self.frame_count=0
        self.stream_id=stream_id
 
    def update_fps(self):
        end_time = time.time()
        if self.is_first:
            self.start_time = end_time
            self.is_first = False
        else:
            global fps_mutex
            with fps_mutex:
                self.frame_count = self.frame_count + 1
 
    def get_fps(self):
 
        end_time = time.time()
        with fps_mutex:
 
            stream_fps = float(self.frame_count/(end_time - self.start_time))
 
            self.frame_count = 0
 
        self.start_time = end_time
 
        return round(stream_fps, 2)
 
    def print_data(self):
        print('frame_count=',self.frame_count)
        print('start_time=',self.start_time)
 
def on_pad_added_1(src, pad,sink_element):
    print("==============="*4)
    pad_caps = pad.query_caps(None)
    pad_name = pad.get_name()
    pad_structure = pad_caps.to_string()

    print(f"New pad {pad_name} added with caps: {pad_structure}")
    print(f"New pad added by: {src.get_name()}")
    sink_pad = sink_element.get_static_pad("sink")
    
    if not sink_pad.is_linked():
        print(f"valve Sink pad linked to uridecodebin")
        pad.link(sink_pad)
    print("==============="*4)
 
def get_linked_element(pad):
    """Finds the element that a given pad is linked to."""
    
    if pad and pad.is_linked():
        # print(f"==================== pad {pad} is linked to uridecodebin ==================== ")
        peer_pad = pad.get_peer()  # Get the pad it is linked to
        if peer_pad:
            peer_element = peer_pad.get_parent_element()  # Get the element that owns the pad
            return peer_element.get_name() if peer_element else None
    return None


class PERF_DATA:
    def __init__(self, num_streams,q_list):
        self.perf_dict = {}   # FPS for each stream
        self.all_stream_fps = {}  
        self.q_list = q_list
 
        self.fps_mutex = Lock()   # Lock for thread safety
 
        for i in range(num_streams):
            self.all_stream_fps[f"stream{i}"] = GETFPS(i)
 
    def add_stream(self, stream_index):
        """ Add a new stream dynamically """
        with self.fps_mutex:
            if stream_index not in self.all_stream_fps:
                print(f"Adding new stream: {stream_index}")
                self.all_stream_fps[stream_index] = GETFPS(stream_index)
 
    def delete_stream(self, stream_index):
        with self.fps_mutex:
            if stream_index in self.all_stream_fps:
                print(f"Deleting stream: {stream_index}")
                self.all_stream_fps.pop(stream_index)
            else:
                print(f"Stream {stream_index} not found.")
 
 
    def update_fps(self, stream_index):
        """ Update FPS for the given stream index """
        with self.fps_mutex:
            if stream_index in self.all_stream_fps:
                self.all_stream_fps[stream_index].update_fps()
            else:
                print(f"Warning: Stream {stream_index} not registered. Ignoring update.")
 
    def perf_print_callback(self):
        
        with self.fps_mutex:
            self.perf_dict = {stream_index: stream.get_fps() for stream_index, stream in self.all_stream_fps.items()}
        print("\n**PERF Data : ", self.perf_dict, "\n")

        for q in self.q_list:
                # element = pipeline.get_by_name("your_element_name")
                if q:
                    state_return, state, pending = q.get_state(Gst.CLOCK_TIME_NONE)
                    print(f"Element {q.get_name()} is in state: {state.value_nick}")
                else:
                    print("Element not found")
                queue5 = q
                if queue5:
                    src_pad = queue5.get_static_pad("src")
                    peer_pad = src_pad.get_peer() if src_pad else None
                    if peer_pad:
                        print(f"{q.get_name()} is linked to {peer_pad.get_parent_element().get_name()}")
                    else:
                        print(f"{q.get_name()} is NOT linked to any element!")

                num_frames = q.get_property("current-level-buffers")
                print(f"=========================> {q.get_name()} {num_frames=}")

        # list_pipeline_elements(pipeline) 
 
        for k, v in self.perf_dict.items():
            src_id = src_ids[k]
            last_id = src_id['id']
            stream_id = src_id['streamid']
            rtsp_url = src_id['src']
            valve_element_name = f"valve_{stream_id}"
            valve = pipeline.get_by_name(valve_element_name)    
            sink_valve = valve.get_static_pad("sink")
            print(f"=============================> {valve_element_name} {get_linked_element(sink_valve)}" )  

            
            
            if not v:
                
                new_id = str(uuid.uuid1())
                del_add_rtspsrc("perf 0","perf 0",last_id,new_id,stream_id,rtsp_url)
                src_ids[k]['id'] = new_id

                valve_element_name = f"valve_{stream_id}"
                valve = pipeline.get_by_name(valve_element_name)    
                sink_valve = valve.get_static_pad("sink")
                current_value = valve.get_property("drop")
                print(f"==================> valve current value {current_value=}")
                 
        logging.info(f"pipeline alive data :  {self.perf_dict}")
        return True
 

def wait_for_null_state(element, timeout=3):

    print(f"Waiting for {element.get_name()} to reach NULL state...")
    
    element.set_state(Gst.State.NULL)

    success, state, _ = element.get_state(timeout * Gst.SECOND)

    if success == Gst.StateChangeReturn.SUCCESS and state == Gst.State.NULL:
        print(f"{element.get_name()} successfully reached NULL state.")
        return True
    else:
        print(f"Warning: {element.get_name()} did not reach NULL state within {timeout} seconds.")
        return False



def del_add_rtspsrc_1(return_type, msg, old_id, new_id, stream_id, rtsp_url):
    logging.error(f"{return_type} : {msg}")
    print(f"Trying to create a new RTSP source ... {old_id=}")

    old_uridecodebin_name = f"uridecodebin_{old_id}"
    old_uridecodebin = pipeline.get_by_name(old_uridecodebin_name)

    valve_element_name = f"valve_{stream_id}"
    valve = pipeline.get_by_name(valve_element_name)

    valve.set_property("drop", True)

    if not old_uridecodebin:
        print(f"Error: Could not find uridecodebin_{old_id} in the pipeline.")
        return
    
    src_pad = old_uridecodebin.get_static_pad("src_0")
    if src_pad and src_pad.is_linked():
        print(f"unlink the valve rtspsrc {old_uridecodebin_name} and valve ")
        src_pad.unlink(valve.get_static_pad("sink"))

    if wait_for_null_state(old_uridecodebin):

        old_uridecodebin.set_state(Gst.State.NULL)
    
        pipeline.remove(old_uridecodebin)

    # new_uridecodebin = create_rtsp_source(rtsp_url, new_id, stream_id)

    new_uridecodebin = Gst.ElementFactory.make("uridecodebin", f"uridecodebin_{new_id}")
    new_uridecodebin.set_property("uri", rtsp_url)

    new_uridecodebin.connect("pad-added", on_pad_added,valve)

    pipeline.add(new_uridecodebin)

    new_uridecodebin.set_state(Gst.State.READY)
    new_uridecodebin.sync_state_with_parent()
    time.sleep(0.1)
    new_uridecodebin.set_state(Gst.State.PLAYING)
    valve.set_property("drop", False)
    print(f"Successfully replaced RTSP source for stream {stream_id}")




def list_pipeline_elements(pipeline):
    if not pipeline:
        print(" Error: Pipeline is None!")
        return

    print("-" * 50)

    print("Listing all elements in the pipeline:\n")
    
    for element in pipeline.iterate_elements():
        element_name = element.get_name()
        # element_type = Gst.ElementFactory.get_element_name(element)
        print(f"Element: {element_name}")

        state_return, state, pending = element.get_state(Gst.CLOCK_TIME_NONE)
        print(f"Element {element.get_name()} is in state: {state.value_nick}")

        # Check linked pads
        for pad in element.iterate_pads():
            pad_name = pad.get_name()
            peer_pad = pad.get_peer()
            peer_element_name = peer_pad.get_parent_element().get_name() if peer_pad else "None"
            print(f"Pad: {pad_name} Linked to: {peer_element_name}")

        print("-" * 50)



def del_add_rtspsrc(return_type, msg, old_id, new_id, stream_id, rtsp_url):
    logging.error(f"{return_type} : {msg}")
    print(f"Trying to create a new RTSP source ... {old_id=}")

    old_uridecodebin_name = f"uridecodebin_{old_id}"
    old_uridecodebin = pipeline.get_by_name(old_uridecodebin_name)

    valve = pipeline.get_by_name(f"valve_{stream_id}")
    valve.set_property("drop", True)

    if not old_uridecodebin:
        print(f"Error: Could not find uridecodebin_{old_id} in the pipeline.")
        return

    queue5 = pipeline.get_by_name(f"queue5_{stream_id}")
    queue5.set_state(Gst.State.NULL)  # Flush stuck frames
    queue5.set_state(Gst.State.PLAYING)

    src_pad = old_uridecodebin.get_static_pad("src_0")
    if src_pad and src_pad.is_linked():
        print(f"Unlinking old uridecodebin: {old_uridecodebin_name}")
        src_pad.unlink(valve.get_static_pad("sink"))

    old_uridecodebin.set_state(Gst.State.NULL)
    pipeline.remove(old_uridecodebin)

    new_uridecodebin = Gst.ElementFactory.make("uridecodebin", f"uridecodebin_{new_id}")
    new_uridecodebin.set_property("uri", rtsp_url)
    new_uridecodebin.connect("pad-added", on_pad_added, valve)

    new_uridecodebin.set_state(Gst.State.READY)
    pipeline.add(new_uridecodebin)
    new_uridecodebin.sync_state_with_parent()
    time.sleep(0.1)
    new_uridecodebin.set_state(Gst.State.PLAYING)

    valve.set_property("drop", False)
    print(f"Successfully replaced RTSP source for stream {stream_id}")




def demux_src_pad_buffer_probe(pad, info, user_data):

    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer")
        return Gst.PadProbeReturn.DROP

    try:
        batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
        l_frame = batch_meta.frame_meta_list
        while l_frame:
            try:
                frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
                stream_id = frame_meta.pad_index
                pts = frame_meta.buf_pts
                # curr_time = start_time_streammux.get(stream_id, 0) + (pts / 1e9)
                curr_time = start_time_streammux_.get(stream_id, 0)
                # ntp_sec = frame_meta.ntp_timestamp / 1e9
                # curr_time = pts

                syd_time_pts_ = convert_pts_to_sydtimestamp(pts, start_time_streammux[stream_id])
                display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
                display_meta.num_labels = 1
                text_params = display_meta.text_params[0]
                # text_params.display_text = f"buf ts : {syd_time_pts_}   ntp ts : {ntp_to_sydney_time(ntp_sec)}"
                text_params.display_text = f"buf ts : {syd_time_pts_}"
                text_params.x_offset = 50
                text_params.y_offset = 50
                text_params.font_params.font_name = "Serif"
                text_params.font_params.font_size = 12
                text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
                text_params.set_bg_clr = 1
                text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
                pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)

                l_user = frame_meta.frame_user_meta_list
                while l_user:
                    try:
                        user_meta = pyds.NvDsUserMeta.cast(l_user.data)
                        if user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type(
                            "NVIDIA.DSANALYTICSFRAME.USER_META"
                        ):
                            user_meta_data = pyds.NvDsAnalyticsFrameMeta.cast(user_meta.user_meta_data)

                            obj_in_roi = user_meta_data.objInROIcnt
                            display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
                            no_of_labels = len(obj_in_roi)
                            display_meta.num_labels = no_of_labels
                            start_x,start_y = 50,80
                            for i,(label_id,count) in enumerate(obj_in_roi.items()):
                                
                                text_params = display_meta.text_params[i]
                                text_params.display_text = f"{label_id} : {count}"
                                text_params.x_offset = start_x
                                text_params.y_offset = start_y
                                text_params.font_params.font_name = "Serif"
                                text_params.font_params.font_size = 12
                                text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
                                text_params.set_bg_clr = 1
                                text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
                                start_y+=30

                            pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)


                    except Exception as e:
                        print(f"Error processing user metadata: {e}")
                    
                    try:
                        l_user = l_user.next
                    except StopIteration:
                        break

            except Exception as e:
                print(f"Error processing frame: {e}")
            
            try:
                l_frame = l_frame.next
            except StopIteration:
                break

    except Exception as e:
        print(f"Error processing buffer: {e}")
        return Gst.PadProbeReturn.DROP

    stream_index = "stream{0}".format(stream_id)
    perf_data.update_fps(stream_index)
    return Gst.PadProbeReturn.OK

def create_dynamic_path(epoch_sec, base_path, ip_port):
    # Convert Unix epoch seconds to a datetime object
    converted_time = datetime.utcfromtimestamp(epoch_sec).replace(tzinfo=pytz.utc)
    now = converted_time.astimezone(syd_tz)
    # Construct the dynamic path
    dynamic_path = os.path.join(
        base_path,
        ip_port,
        f"{now.year}_year",
        f"{now.month:02d}_month",
        f"{now.day:02d}_day",
        f"{now.hour:02d}_hour",
        # f"{now.minute:02d}_minute",
    )

    os.makedirs(dynamic_path, exist_ok=True)
    return dynamic_path

 
 
def bus_call(bus, message, loop):
   
    t = message.type
    if t == Gst.MessageType.EOS:
        print(f"\n ======================= End-of-stream =======================")
        # logging.error(f"END OF STREAM : {err}")
        # loop.quit()
    elif t==Gst.MessageType.WARNING:
        err, debug = message.parse_warning()
        logging.debug(f"{err}")
        print("Warning: %s: %s" % (err, debug))
    elif t == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        logging.error(f"{err}")
        print("Error: %s: %s" % (err, debug))
       
    return True
 
 
 
def valve_prob(pad, info, user_data):
    # print("inside the valve probe")
    stream_id = user_data["stream_id"]
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer")
        return Gst.PadProbeReturn.OK
 
    try:
        stream_index = "stream{0}".format(stream_id)
        perf_data.update_fps(stream_index)
    except Exception as e:
        print(f"Error accessing batch metadata: {e}")
        return Gst.PadProbeReturn.OK
 
    return Gst.PadProbeReturn.OK
 
def make_element(element_name, i):
    element = Gst.ElementFactory.make(element_name, element_name)
    if not element:
        sys.stderr.write(" Unable to create {0}".format(element_name))
    element.set_property("name", "{0}-{1}".format(element_name, str(i)))
    return element
 
 
def format_location_callback(splitmuxsink, fragment_id, video_dir_path, stream_id):
    # Get the current date and time
    current_datetime = convert_syd_epoch_to_datetime_str(get_current_epoch_in_syd(),"date_time")
    # Construct the filename with the current datetime
    return os.path.join(video_dir_path, f"stream_{stream_id}_rtsp_out_{current_datetime}_uri.mp4")
 


def frame_filter_pad_probe(pad, info, user_data):

    print(f"=================================================== inside the multifilesink =================================================== ")

    base_img_path = user_data["base_img_path"]
    cam_id_ip_port = user_data["cam_id"]

    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer")
        return Gst.PadProbeReturn.OK

    try:
        batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
        l_frame = batch_meta.frame_meta_list

        while l_frame:
            try:
                frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
                stream_id = frame_meta.pad_index

                pts = frame_meta.buf_pts
                # curr_time = start_time_streammux.get(stream_id, 0) + (pts / 1e9)
                curr_time = start_time_streammux_.get(stream_id, 0)
                # syd_time_pts_file = convert_syd_epoch_to_datetime_str(curr_time, "datetime")
                syd_time_pts_hhmmss = convert_syd_epoch_to_datetime_str(curr_time, "hhmmss")
                current_yymmdd = get_current_datetime_str("yymmdd")
                current_hhmmss = get_current_datetime_str("hhmmss")
                frame_number = frame_meta.frame_num
                ntp_sec = frame_meta.ntp_timestamp / 1e9
                # syd_time_ntp_file = ntp_to_sydney_time(ntp_sec,"file_name")

                syd_time_ntp_hhmmss = ntp_to_sydney_time(ntp_sec,"hhmmss")
                syd_time_ntp_sql = ntp_to_sydney_time(ntp_sec)
                # image_folder_path = create_dynamic_path(ntp_sec,base_img_path, cam_id_ip_port)
                image_folder_path = create_dynamic_path(curr_time, base_img_path, cam_id_ip_port)

                image_name = f"CAM-{cam_id_ip_port[:7]}_DATE-{current_yymmdd}_PTS-{syd_time_pts_hhmmss}_NTP-{syd_time_ntp_hhmmss}_SYS-{current_hhmmss}.jpg"

                # image_name = f"{syd_time_ntp_file}_{frame_number}.jpg"
                # image_name = f"{syd_time_pts_file}_{frame_number}.jpg"
                image_save_path = os.path.join(image_folder_path, image_name)

                print(f"Saving image: {image_save_path}, stream_id: {stream_id}")
                multifilesink = pad.get_parent_element()
                multifilesink.set_property("location", image_save_path)

                l_user = frame_meta.frame_user_meta_list
                while l_user:
                    try:
                        user_meta = pyds.NvDsUserMeta.cast(l_user.data)
                        if user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type(
                            "NVIDIA.DSANALYTICSFRAME.USER_META"
                        ):
                            user_meta_data = pyds.NvDsAnalyticsFrameMeta.cast(user_meta.user_meta_data)
                            num_rects = frame_meta.num_obj_meta
                            syd_time_pts_ = convert_syd_epoch_to_datetime_str(curr_time, "datetime_sql")
                            data = {
                                "stream_id": stream_id,
                                "camera_name": cam_id_ip_port,
                                "unique_number_uuid": 10, 
                                "batch_id": frame_meta.batch_id,
                                "frame_num": frame_number,
                                "buf_pts": pts,
                                "ntp_timestamp": frame_meta.ntp_timestamp,
                                "source_id": frame_meta.source_id,
                                "object_count": user_meta_data.objCnt,
                                "Number_of_objects": num_rects,
                                "object_in_ROI_count": user_meta_data.objInROIcnt,
                                "cumulative_line_cross_count": user_meta_data.objLCCumCnt,
                                "current_line_cross_count": user_meta_data.objLCCurrCnt,
                                "overcrowding_status": user_meta_data.ocStatus,
                                "unique_id": user_meta_data.unique_id,
                                "PTS_TIME": syd_time_pts_,
                                "NTP_TS": syd_time_ntp_sql,
                            }

                            # p = os.path.join(base_json_path, f"{syd_time_ntp_file}_{stream_id}.json")
                            # with open(p, "w") as f:
                            #     json.dump(data, f)
                            # print(f"JSON saved: {p}")
                            print(data)
                            # send_message_to_iothub(json.dumps(data),stream_id,syd_time_pts_)
                    except Exception as e:
                        print(f"Error processing user metadata: {e}")
                    
                    try:
                        l_user = l_user.next
                    except StopIteration:
                        break

            except Exception as e:
                print(f"Error processing frame metadata: {e}")

            try:
                l_frame = l_frame.next
            except StopIteration:
                break

    except Exception as e:
        print(f"Error accessing batch metadata: {e}")
        return Gst.PadProbeReturn.OK

    return Gst.PadProbeReturn.OK



def dummy_prob(pad, info, user_data):
    print(f"================================================================= inside the {user_data} prob =================================================================")
    return Gst.PadProbeReturn.OK

def streammux_prob(pad, info, user_data):

    global streammux_first_frame_processed
    global start_time_streammux
    global start_time_streammux_

    stream_id = user_data.get('stream_id')
    no_of_sec_to_predict = user_data.get('no_of_sec_to_predict')
    if not start_time_streammux_first_frame[stream_id]:
        start_time_streammux[stream_id] = get_current_epoch_in_syd()
        start_time_streammux_[stream_id] = get_current_epoch_in_syd()
        start_time_streammux_first_frame[stream_id] = True


    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer")
        return Gst.PadProbeReturn.OK

    try:
        pts = start_time_streammux[stream_id] + gst_buffer.pts/1000000000.0
        # print(f"pts time === {gst_buffer.pts/1000000000.0}")
        # print(f"start time{start_time_streammux_[stream_id]=}")
        if (pts - start_time_streammux_[stream_id] ) < no_of_sec_to_predict:
            return Gst.PadProbeReturn.DROP
        start_time_streammux_[stream_id] =  pts

    except Exception as e:
        print(f"Error accessing batch metadata: {e}")
        return Gst.PadProbeReturn.OK
    print(f"=================================== Gst.PadProbeReturn.OK ===================================")
    return Gst.PadProbeReturn.OK


 
def main():
    global perf_data, pipeline, src_ids, streammux

    q_list = []

    perf_data = PERF_DATA(number_sources,q_list)

    Gst.init(None)

    pipeline = Gst.Pipeline.new(f"rtsp-pipeline-{uuid.uuid1()}")
    GLib.timeout_add(10000, perf_data.perf_print_callback)

    src_ids = {f'stream{i}': {'id': None, 'src': None, 'streamid': i} for i in range(number_sources)}

    cam_ids = {stream_id:extract_rtsp_details(uri_name) for stream_id,uri_name in enumerate(streams)}

    streammux = Gst.ElementFactory.make("nvstreammux", "streammuxer")
    if not streammux:
        print("Error: Could not create streammuxer!")
        return
    streammux.set_property('batch-size', number_sources)
    streammux.set_property('batched-push-timeout', streammux_muxer_batch_timeout_usec)
    streammux.set_property("sync-inputs",sync_inputs)
    streammux.set_property("attach-sys-ts",attach_sys_ts)
    streammux.set_property("max-latency",max_latency)
    streammux.set_property("frame-duration",frame_duration)
    pipeline.add(streammux)


    for src_id, src in enumerate(streams):

        last_id = str(uuid.uuid1())
        src_ids[f'stream{src_id}']['id'] = last_id
        src_ids[f'stream{src_id}']['src'] = src

        uridecodebin = Gst.ElementFactory.make("uridecodebin", f"uridecodebin_{last_id}")
        uridecodebin.set_property("uri", src)
        
        print(f"uridecodebin_{last_id} created for {src}")
        valve = Gst.ElementFactory.make("valve", f"valve_{src_id}")
        uridecodebin.connect("pad-added", on_pad_added,valve) 
        user_data={"stream_id":src_id}
        pipeline.add(valve)
        queue = Gst.ElementFactory.make("queue", f"queue_{src_id}")
        pipeline.add(queue)
        q_list.append(queue)
        sink_pad = valve.get_static_pad("sink")
        print(f"================= valve created ============== ")
        sink_pad.add_probe(Gst.PadProbeType.BUFFER, valve_prob, user_data)
        print(f"================= valve perob added ============== ")    

        pipeline.add(uridecodebin)   

        padname=f"sink_{src_id}"
        valve_src_pad = valve.get_static_pad("src")
        if not valve_src_pad:
            print(f"Error: Could not get src pad for valve_{src_id}")
            return
        
        streammux_sinkpad= streammux.request_pad_simple(padname)
        if not streammux_sinkpad:
            print(f"Error: Could not get sink pad for streammux_{src_id}")
            return
        print(f"================================> {no_of_sec_to_send_data=}")
        user_meta = {"stream_id":src_id,'no_of_sec_to_predict':no_of_sec_to_send_data}
        streammux_sinkpad.add_probe(Gst.PadProbeType.BUFFER, streammux_prob, user_meta)
        
        queue_sink_pad = queue.get_static_pad("sink")
        queue_src_pad = queue.get_static_pad("src")
        valve_src_pad.link(queue_sink_pad)
        queue_src_pad.link(streammux_sinkpad)


    pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    pgie.set_property('config-file-path', infer_config_path)
    pgie_batch_size=pgie.get_property("batch-size")
    # pgie_sink = pgie.get_static_pad("sink")
    # nvanalytics_sink = nvanalytics.get_static_pad("sink")
    if(pgie_batch_size != number_sources):
        print("WARNING: Overriding infer-config batch-size",pgie_batch_size," with number of sources ", number_sources," \n")
        pgie.set_property("batch-size",number_sources)

    tracker = Gst.ElementFactory.make("nvtracker", "tracker")
    if not tracker:
        sys.stderr.write(" Unable to create tracker \n")

    config = configparser.ConfigParser()
    config.read(tracker_config_path)
    config.sections()
 
    for key in config['tracker']:
        if key == 'tracker-width' :
            tracker_width = config.getint('tracker', key)
            tracker.set_property('tracker-width', tracker_width)
        if key == 'tracker-height' :
            tracker_height = config.getint('tracker', key)
            tracker.set_property('tracker-height', tracker_height)
        if key == 'gpu-id' :
            tracker_gpu_id = config.getint('tracker', key)
            tracker.set_property('gpu_id', tracker_gpu_id)
        if key == 'll-lib-file' :
            tracker_ll_lib_file = config.get('tracker', key)
            tracker.set_property('ll-lib-file', tracker_ll_lib_file)
        if key == 'll-config-file' :
            tracker_ll_config_file = config.get('tracker', key)
            tracker.set_property('ll-config-file', tracker_ll_config_file)

    nvanalytics = Gst.ElementFactory.make("nvdsanalytics", "analytics")
    if not nvanalytics:
        sys.stderr.write(" Unable to create nvanalytics \n")
    nvanalytics.set_property("config-file", analytics_config_path)


    nvstreamdemux = Gst.ElementFactory.make("nvstreamdemux", "nvstreamdemux")
    if not nvstreamdemux:
        sys.stderr.write(" Unable to create nvstreamdemux \n")



    queue1 = Gst.ElementFactory.make("queue", "queue1")
    queue2 = Gst.ElementFactory.make("queue", "queue2")
    queue3 = Gst.ElementFactory.make("queue", "queue3")
    

    for q in [queue1,queue2,queue3]:
        q.set_property("max-size-buffers", max_size_buffers)
        q.set_property("leaky", True)
        pipeline.add(q)
        q_list.append(q)

    # queue1_sink = queue1.get_static_pad("sink")
    # queue1_sink.add_probe(Gst.PadProbeType.BUFFER, queue1_prob, 0)
    # pgie_sink.add_probe(Gst.PadProbeType.BUFFER, pgie_prob, 0)
    # nvanalytics_sink.add_probe(Gst.PadProbeType.BUFFER, nvanalytics_prob, 0)
    
    pipeline.add(pgie)
    pipeline.add(nvanalytics)
    pipeline.add(nvstreamdemux)

    streammux.link(queue1)
    queue1.link(pgie)
    pgie.link(queue2)
    queue2.link(nvanalytics)
    nvanalytics.link(queue3)
    queue3.link(nvstreamdemux)

    for i in range(number_sources):

        queue4_ = Gst.ElementFactory.make("queue", f"queue__{i}")
        pipeline.add(queue4_)

        q_list.append(queue4_)

        queue4_.set_property("max-size-buffers", max_size_buffers)
        queue4_.set_property("leaky", 2)

        nvvideoconvert =  Gst.ElementFactory.make("nvvideoconvert", f"nvvideoconvert_{i}")
        nvvideoconvertsink = nvvideoconvert.get_static_pad("sink")
        # nvvideoconvert = make_element("capsfilter", i)
        pipeline.add(nvvideoconvert)

        nvdsosd = Gst.ElementFactory.make("nvdsosd", f"nvdsosd_{i}")
        pipeline.add(nvdsosd)
        nvdsosd.set_property("process-mode", osd_process_mode)
        nvdsosd.set_property("display-text", osd_display_text)
        
        padname = "src_%u" % i
        demuxsrcpad = nvstreamdemux.request_pad_simple(padname)
        if not demuxsrcpad:
            sys.stderr.write("Unable to create demux src pad \n")
        
 
        queuesinkpad = queue4_.get_static_pad("sink")
        if not queuesinkpad:
            sys.stderr.write("Unable to create queue sink pad \n")
        demuxsrcpad.link(queuesinkpad)

        demuxsrcpad.add_probe(Gst.PadProbeType.BUFFER, dummy_prob, f"demuxsrcpad")

        queue5 = Gst.ElementFactory.make("queue", f"queue5_{i}")
        pipeline.add(queue5)

        q_list.append(queue5)


        queue5.set_property("max-size-buffers", max_size_buffers)
        queue5.set_property("leaky", 2)

        queue5_src = queue5.get_static_pad("src")

        queue5_src.add_probe(Gst.PadProbeType.BUFFER, dummy_prob, "queue5 src")

        

        nvvidconv2 = Gst.ElementFactory.make("nvvideoconvert", f"nvvideoconvert1_{i}")

        
        # nvvidconv2 = make_element("capsfilter", f"capsfilter_{i}")
        pipeline.add(nvvidconv2)

        jpegenc = Gst.ElementFactory.make("nvjpegenc", f"jpeg-encoder_{i}")
        jpegenc.set_property("quality", im_quality)

        


        if not jpegenc:
            sys.stderr.write("Unable to create nvjpegenc \n")
        pipeline.add(jpegenc)

        multifilesink = Gst.ElementFactory.make("multifilesink", f"multi-file-sink_{i}")
        if not multifilesink:
            sys.stderr.write(" Unable to create multifilesink \n")

        multifilesink.set_property("post-messages", True)


        sinkpad = multifilesink.get_static_pad("sink")

        probe_data = {
            "cam_id": cam_ids[i],
            "base_img_path":img_dir_name,
            "no_of_sec_send_msg":no_of_sec_to_send_data
            
        }


        pipeline.add(multifilesink)

        queue4_.link(nvvideoconvert)

        nvvideoconvert.link(nvdsosd)
        nvdsosd.link(queue5)
        queue5.link(nvvidconv2)

        # queue5.link(nvvidconv2)
        nvvidconv2.link(jpegenc)
        jpegenc.link(multifilesink)

        # queue5_sinkpad = queue5.get_static_pad("sink")
        # queue5_srcpad = queue5.get_static_pad("src")

        # queue5_sinkpad.add_probe(Gst.PadProbeType.BUFFER, queue5_prob, 0)
        # queue5_srcpad.add_probe(Gst.PadProbeType.BUFFER, queue5_src_prob, 0)

        jpegenc_sinkpad = jpegenc.get_static_pad("sink")
        jpegenc_sinkpad.add_probe(Gst.PadProbeType.BUFFER, dummy_prob, "jpegenc_sinkpad")

        nvvidconv2_sinkpad = nvvidconv2.get_static_pad("sink")
        nvvidconv2_sinkpad.add_probe(Gst.PadProbeType.BUFFER, dummy_prob, "nvvidconv2_sinkpad")

        sinkpad = multifilesink.get_static_pad("sink")
        if not sinkpad:
            sys.stderr.write("Unable to get sink pad of multifilesink \n")
            sys.exit(1)
        else:
            sinkpad.add_probe(Gst.PadProbeType.BUFFER, frame_filter_pad_probe, probe_data)


    print("Pipeline setup complete")

    # Get the bus
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    loop = GLib.MainLoop()
    bus.connect("message", bus_call, loop)
 
    # Start the pipeline
    pipeline.set_state(Gst.State.PLAYING)
    print("Pipeline is playing...")
 
    try:
        loop.run()
    except KeyboardInterrupt:
        print("Pipeline interrupted by user.")
    finally:
        # Stop the pipeline
        pipeline.set_state(Gst.State.NULL)
        print("Pipeline stopped.")
 
 
if __name__ == "__main__":
    main()
1 Like

Could you describe the problem in detail?

Is it possible that the upstream rtsp source didn’t send data at this time?

Hi , for now code working fine but now the issue is my code working fine almost 1 day with rtsp reconnection custom logic but after that i got this error msg

8:21:01.251492041 10720 0x56402100c930 WARN nvinfer gstnvinfer.cpp:2423:gst_nvinfer_output_loop: error: Internal data stream error.
8:21:01.251525568 10720 0x56402100c930 WARN nvinfer gstnvinfer.cpp:2423:gst_nvinfer_output_loop: error: streaming stopped, reason error (-5)
Error: gst-stream-error-quark: Internal data stream error. (1): gstnvinfer.cpp(2423): gst_nvinfer_output_loop (): /GstPipeline:rtsp-pipeline-6c37c452-eef6-11ef-b63a-0242ac140003/GstNvInfer:primary-inference:
streaming stopped, reason error (-5)

ERROR default gstnvstreammux_pads.cpp:338:push: push failed [-5]

Could you add the GST_DEBUG=3 in the front of your command to open more Gstreamer log?

We still recommend using the reconnection mechanism that comes with nvurisrcbin. If there are some bugs, we can analyze and fix it.

Hi, sometimes my code works, and sometimes it doesn’t. For example, my downstream element doesn’t receive the data. After debugging, I can see that the data flow is correct, but after demuxing, the data gets stuck in the queue.

Here are my logs:

this is my queue status

Element queue_0 is in state: playing
=========================> queue_0 num_frames=0
Element queue_1 is in state: playing
=========================> queue_1 num_frames=0
Element queue_2 is in state: playing
=========================> queue_2 num_frames=0
Element queue_3 is in state: playing
=========================> queue_3 num_frames=0
Element queue_4 is in state: playing
=========================> queue_4 num_frames=0
Element queue_5 is in state: playing
=========================> queue_5 num_frames=0
Element queue1 is in state: playing
=========================> queue1 num_frames=0
Element queue2 is in state: playing
=========================> queue2 num_frames=0
Element queue3 is in state: playing
=========================> queue3 num_frames=0
Element queue__0 is in state: playing
=========================> queue__0 num_frames=0
Element queue5_0 is in state: playing
=========================> queue5_0 num_frames=1
Element queue__1 is in state: playing
=========================> queue__1 num_frames=0
Element queue5_1 is in state: playing
=========================> queue5_1 num_frames=1
Element queue__2 is in state: playing
=========================> queue__2 num_frames=0
Element queue5_2 is in state: playing
=========================> queue5_2 num_frames=1
Element queue__3 is in state: playing
=========================> queue__3 num_frames=0
Element queue5_3 is in state: playing
=========================> queue5_3 num_frames=1
Element queue__4 is in state: playing
=========================> queue__4 num_frames=0
Element queue5_4 is in state: playing
=========================> queue5_4 num_frames=1
Element queue__5 is in state: playing
=========================> queue__5 num_frames=0
Element queue5_5 is in state: playing
=========================> queue5_5 num_frames=1
frames stuck in queue5_x but status is playing

After this queue, no data flow occurs to my downstream elements like multifilesink, etc. Why is that? Also, what is the correct approach to safely remove and add the uridecodebin?

As I attached before, it is possible that one of the plugins gets stuck inside because of the way it is used. You can modify your code to narrow it down. Like replacing the multifilesink to fakesink, reducing the number of plugins in your pipeline to troubleshoot this problem, etc…

You can refer to our sample runtime_source_add_delete.