Implementing Custom Reconnection Logic for RTSP Streams in GStreamer

• Hardware Platform (Jetson / GPU)
• Hardware Platform - GPU
• DeepStream Version - 7.0
• TensorRT Version - 10.6.0
• NVIDIA GPU Driver Version - 560.35.03
• Issue Type - questions
• Question -

Hi, I’m currently working on a video analytics project, and I’ve encountered an issue in my application. I’m using uridecodebin as the source, but after some time, the RTSP connection is lost, causing the application to stop. To address this, I switched to nvurisrcbin, which handles reconnection well, but I’m facing a pixelation issue.

I’m now attempting to implement my own reconnection logic, which works fine. However, for example, if my application has been running for 5 days and the RTSP connection is lost, when I attempt to reconnect (by deleting and adding the new bin), I get the following error:

[2025-04-01 15:07:33] INFO: Trying to create a new RTSP source for stream stream_id=0…
[2025-04-01 15:07:33] INFO:Flushing streammux pad for stream 0…
[2025-04-01 15:07:33] INFO: Successfully replaced RTSP source for stream 0
[2025-04-01 15:07:35] INFO: pipeline alive data : {‘stream0’: 0.0}
[2025-04-01 15:07:35] ERROR: gst-resource-error-quark: Could not write to resource. (10)
[2025-04-01 15:07:35] ERROR: gst-resource-error-quark: Could not write to resource. (10)
[2025-04-01 15:07:39] ERROR: gst-core-error-quark: Your GStreamer installation is missing a plug-in. (12)
[2025-04-01 15:07:39] ERROR: gst-stream-error-quark: Internal data stream error. (1)
[2025-04-01 15:09:13] INFO: reconnect att exist in stream_id=0 6

import configparser
import sys
import gi
import time
import uuid
import logging
import os
import pytz
from urllib.parse import urlparse
from datetime import datetime
from gi.repository import Gst, GLib
from utils_prod import ntp_to_sydney_time,extract_rtsp_details,convert_pts_to_sydtimestamp,create_dynamic_path,get_current_epoch_in_syd,convert_syd_epoch_to_datetime_str,get_current_datetime_str, send_msg
from threading import Lock
gi.require_version('Gst', '1.0')
import pyds
import json



start_time=time.time()
pipeline = None
fps_mutex = Lock()


syd_tz = pytz.timezone('Australia/Sydney')
ntp_epoch = datetime(1970, 1, 1, tzinfo=pytz.UTC)


try:

    yaml_con = json.loads(os.environ["main_config"])

    streams = yaml_con["streams"]
    log_path = yaml_con["log_path"]
    no_of_sec_to_send_data = int(yaml_con["no_of_sec_send_msg"])
    img_dir_name = yaml_con["img_dir_name"]

    osd_process_mode = int(yaml_con["osd_process_mode"])
    osd_display_text = int(yaml_con["osd_display_text"])

    sync_inputs = int(yaml_con["streammux_sync_inputs"])
    attach_sys_ts = int(yaml_con["streammux_attach_sys_ts"])
    max_latency = int(yaml_con["streammux_max_latency"])
    frame_duration = int(yaml_con["streammux_frame_duration"])
    is_live = int(yaml_con["is_live"])
    frame_width = int(yaml_con["frame_width"])
    frame_height = int(yaml_con["frame_height"])
    drop_eos = int(yaml_con["drop_eos"])
    streammux_muxer_batch_timeout_usec = int(yaml_con["streammux_muxer_batch_timeout_usec"])

    analytics_config_path = yaml_con["analytics_config_path"]
    infer_config_path = yaml_con["infer_config_path"]


    redis_container_name = yaml_con["redis_container_name"]
    redis_port = int(yaml_con["redis_port"])
    redis_key = yaml_con["redis_key"]
    redis_retry = 5
    PGIE_CLASS_ID_FACE = 2

    max_size_buffers = int(yaml_con["queue_max_size_buffers"])
    im_quality = int(yaml_con["jpeg_encoder_quality"])

    reconnect_interval = int(yaml_con["reconnect_interval"])

    is_add_face_blur = int(yaml_con.get("is_add_face_blur",1))
    face_blur_opacity = float(yaml_con.get("face_blur_opacity",0.8))

    application_quit_ther = float(yaml_con.get("application_quit_ther",0.5))
    reconnect_att = float(yaml_con.get("reconnect_att",5))

    blob_base_img_path = yaml_con["blob_base_img_path"]

    class_dict = {0:"person",1:"bag",2:"face"}


except Exception as e:
    print(f"Error reading config file: {e}")
    sys.exit(1)


if not all([ os.path.exists(p) for p in [analytics_config_path,infer_config_path]]):
    raise FileNotFoundError(f"One or more of the required analytics, inference, or file not found: {analytics_config_path}, {infer_config_path}")


os.makedirs(img_dir_name,exist_ok=True)


class SydneyFormatter(logging.Formatter):
    """Custom logging formatter to use Sydney timezone."""
    
    def formatTime(self, record, datefmt=None):
        sydney_time = datetime.fromtimestamp(record.created, syd_tz)
        return sydney_time.strftime("%Y-%m-%d %H:%M:%S")

logging.basicConfig(
    level=logging.INFO,
    format="[%(asctime)s] %(levelname)s: %(message)s",
    handlers=[logging.FileHandler(log_path), logging.StreamHandler()]
)

for handler in logging.getLogger().handlers:
    handler.setFormatter(SydneyFormatter("[%(asctime)s] %(levelname)s: %(message)s"))

logger = logging.getLogger(__name__)


number_sources = len(streams)
start_time_streammux = { i:None for i in range(number_sources)}
start_time_streammux_ = { i:None for i in range(number_sources)}
start_time_streammux_first_frame = { i:False for i in range(number_sources)}
start_time_ids = { i:get_current_epoch_in_syd() for i in range(number_sources)}
src_ids = {}
lag_dict = {i:{ "data":{"cumulative_line_cross_count":0} ,"ds":0,"is_first":True,"data_num":0} for i in range(number_sources)} 
streammux = None
is_first = True

def on_pad_added(src, pad, sink_element):
    
    # print(f"New pad {pad.get_name()} added from {src.get_name()}")

    sink_pad = sink_element.get_static_pad("sink")
    if not sink_pad:
        msg = (f"Error: Could not get sink pad for {sink_element.get_name()}")
        logger.error(msg)
        return

    if sink_pad.is_linked():
        msg = (f"Warning: Sink pad of {sink_element.get_name()} is already linked")
        logger.warning(msg)
        return
  
    if pad.query_caps(None).is_any():
        msg = (f"Warning: Pad {pad.get_name()} does not have valid caps yet!")
        logger.warning(msg)
        return

    link_status = pad.link(sink_pad)
    if link_status != Gst.PadLinkReturn.OK:
        msg = f"Pad link failed: {link_status}"
        logger.error(msg)
    else:
        msg = (f"Successfully linked {pad.get_name()} {sink_element.get_name()}")
        logger.info(msg)

 
class GETFPS:
    def __init__(self,stream_id):
        global start_time
       
        self.start_time=start_time
        self.is_first=True
        self.frame_count=0
        self.stream_id=stream_id
 
    def update_fps(self):
        end_time = time.time()
        if self.is_first:
            self.start_time = end_time
            self.is_first = False
        else:
            global fps_mutex
            with fps_mutex:
                self.frame_count = self.frame_count + 1
 
    def get_fps(self):
 
        end_time = time.time()
        with fps_mutex:
 
            stream_fps = float(self.frame_count/(end_time - self.start_time))
 
            self.frame_count = 0
 
        self.start_time = end_time
 
        return round(stream_fps, 2)
 
    def print_data(self):
        print('frame_count=',self.frame_count)
        print('start_time=',self.start_time)
 

 
def get_linked_element(pad):
    
    if pad and pad.is_linked():
        peer_pad = pad.get_peer()
        if peer_pad:
            peer_element = peer_pad.get_parent_element()
            return peer_element.get_name() if peer_element else None
    return None


class PERF_DATA:
    def __init__(self, active_streams,main_loop):
        self.perf_dict = {} 
        self.all_stream_fps = {}  
 
        self.fps_mutex = Lock()   # Lock for thread safety
        self.main_loop = main_loop

        for i in active_streams:
            self.all_stream_fps[f"stream{i}"] = GETFPS(i)
  
 
    def update_fps(self, stream_index):
        """ Update FPS for the given stream index """
        with self.fps_mutex:
            if stream_index in self.all_stream_fps:
                self.all_stream_fps[stream_index].update_fps()
            else:
                print(f"Warning: Stream {stream_index} not registered. Ignoring update.")
 
    def perf_print_callback(self):
        
        with self.fps_mutex:
            self.perf_dict = {stream_index: stream.get_fps() for stream_index, stream in self.all_stream_fps.items()}
        print("\n**PERF Data : ", self.perf_dict, "\n")

 
        deactive_streams = 0
        for k, v in self.perf_dict.items():
            src_id = src_ids[k]
            last_id = src_id['id']
            stream_id = src_id['streamid']
            rtsp_url = src_id['src']
            
            
            if not v:
                
                if src_ids[k]["no_of_att"] > reconnect_att:
                    logger.info(f"reconnect att exist in {stream_id=} {src_ids[k]['no_of_att']}")
                    deactive_streams+=1
                    continue
                valve_element_name = f"valve_{stream_id}"
                valve = pipeline.get_by_name(valve_element_name)    
                # sink_valve = valve.get_static_pad("sink")
                # new_id = last_id
                new_id = str(uuid.uuid1())
                del_add_rtspsrc(last_id,new_id,stream_id,rtsp_url)
                src_ids[k]['id'] = new_id
                valve.set_property("drop", False)
                time.sleep(2)
                src_ids[k]["no_of_att"]+=1
                
        if (deactive_streams/number_sources) > application_quit_ther:
            logger.info(f"XXXXXXXXXXXXXXXXXXXXXXXXXX no of deactivate streams {deactive_streams} and no of streams {number_sources} ther {(deactive_streams/number_sources)} > {application_quit_ther} so quit the application XXXXXXXXXXXXXXXXXXXXXXXXXX")
            self.main_loop.quit()

        logger.info(f"pipeline alive data :  {self.perf_dict}")
        return True
 

def wait_for_null_state(element, timeout=3):

    # print(f"Waiting for {element.get_name()} to reach NULL state...")
    
    element.set_state(Gst.State.NULL)

    success, state, _ = element.get_state(timeout * Gst.SECOND)

    if success == Gst.StateChangeReturn.SUCCESS and state == Gst.State.NULL:
        # msg = (f"{element.get_name()} successfully reached NULL state.")
        return True
    else:
        msg = (f"Warning: {element.get_name()} did not reach NULL state within {timeout} seconds.")
        logger.warning(msg)
        return False

def flush_ele(stream_id):

    streammux = pipeline.get_by_name("streammuxer")

    
    if not streammux :
        msg = (f"[ERROR] Streammux or Demux not found in pipeline")
        logger.error(msg)
        return
    
    streammux_sink_pad = streammux.get_static_pad(f"sink_{stream_id}")


    if streammux_sink_pad:
        msg = (f"🛑 Flushing streammux pad for stream {stream_id}...")
        logger.info(msg)
        streammux_sink_pad.send_event(Gst.Event.new_flush_start())
        time.sleep(0.1)
        streammux_sink_pad.send_event(Gst.Event.new_flush_stop(True))
    
    return

def del_add_rtspsrc(old_id, new_id, stream_id, rtsp_url):
    msg = f"Trying to create a new RTSP source for stream {stream_id=}"
    logger.info(msg)

    bin_name = f"source-bin-{old_id}"
    old_uridecodebin = pipeline.get_by_name(bin_name)
    

    if not old_uridecodebin:
        msg = f"[ERROR] No existing uridecodebin found for stream {stream_id}."
        logger.error(msg)
        return
    
    valve = pipeline.get_by_name(f"valve_{stream_id}")
    valve.set_property("drop", True)
    valve_sink = valve.get_static_pad("sink")

    src_pad = old_uridecodebin.get_static_pad("src")
    if src_pad and src_pad.is_linked():
        
        src_pad.unlink(valve_sink)

    old_uridecodebin.send_event(Gst.Event.new_eos())

    wait_for_null_state(old_uridecodebin)

    flush_ele(stream_id)

    pipeline.remove(old_uridecodebin)
    
    new_uridecodebin = create_source_bin(new_id, stream_id,rtsp_url)
    if not new_uridecodebin:
        msg = f"[ERROR] Failed to create new uridecodebin for stream {stream_id}."
        logger.error(msg)
        return
    
    pipeline.add(new_uridecodebin)

    new_uridecodebin_src_pad=new_uridecodebin.get_static_pad("src")
    
    if new_uridecodebin_src_pad.link(valve_sink) != Gst.PadLinkReturn.OK:
        msg = f"Error linking {new_uridecodebin.get_name()} to {valve.get_name()}"
        logger.error(msg)
        return

    new_uridecodebin.set_state(Gst.State.READY)
    time.sleep(0.1)
    new_uridecodebin.sync_state_with_parent()
    new_uridecodebin.set_state(Gst.State.PLAYING)
    msg = f"Successfully replaced RTSP source for stream {stream_id}"
    logger.info(msg)



def cb_newpad(decodebin, decoder_src_pad,data):
    print("In cb_newpad\n")
    caps=decoder_src_pad.get_current_caps()
    gststruct=caps.get_structure(0)
    gstname=gststruct.get_name()
    source_bin=data
    features=caps.get_features(0)


    print("gstname=",gstname)
    if(gstname.find("video")!=-1):
        
        print("features=",features)
        if features.contains("memory:NVMM"):
            # Get the source bin ghost pad
            bin_ghost_pad=source_bin.get_static_pad("src")
            if not bin_ghost_pad.set_target(decoder_src_pad):
                msg = "Failed to link decoder src pad to source bin ghost pad"
                logger.error(msg)
                sys.stderr.write(msg)
        else:
            msg = " Error: Decodebin did not pick nvidia decoder plugin"
            logger.error(msg)
            sys.stderr.write(msg)

def decodebin_child_added(child_proxy,Object,name,user_data):
    msg = ("Decodebin child added:", name, "\n")
    logger.info(msg)
    if(name.find("decodebin") != -1):
        Object.connect("child-added",decodebin_child_added,user_data)


def on_pad_added(decodebin, pad, data):
    # print(f"Pad added: {pad.get_name()} (RTSP connection successful)")
    data["is_ready"] = True

def test_rtsp_connection(uri, stream_id, timeout=10):

    pipeline = Gst.Pipeline.new("test-pipeline")
    uridecodebin = Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")

    if not uridecodebin:
        msg = f"Failed to create uridecodebin {stream_id=}"
        logger.error(msg)
        return False

    uridecodebin.set_property("uri", uri)
    d = {"is_ready": False}
    uridecodebin.connect("pad-added", on_pad_added,d )

    fakesink = Gst.ElementFactory.make("fakesink","fakesink")
   
    pipeline.add(uridecodebin)
    pipeline.add(fakesink)
    uridecodebin.link(fakesink)

    
    pipeline.set_state(Gst.State.PLAYING)
    
    start_time = time.time()
    while time.time() - start_time < timeout:

        success, state, _ = pipeline.get_state(Gst.CLOCK_TIME_NONE)
        if d["is_ready"]:
            msg = f"RTSP stream is active {stream_id=}"
            logger.info(msg)
            pipeline.set_state(Gst.State.NULL)  
            return True,uridecodebin

        time.sleep(0.5) 
        print(f"wating for frame .... {time.time() - start_time}")
    
    msg = f"RTSP connection failed (No `pad-added` event) {stream_id=}"
    logger.error(msg)
    pipeline.set_state(Gst.State.NULL) 
    return False,None


def create_source_bin(index ,stream_id, uri):
    

    uri_decode_bin=Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
    if is_first:
        is_suc,uri_decode_bin = test_rtsp_connection(uri,stream_id)
        if not is_suc:
            msg = (f"Error creating source {stream_id=}")
            logger.error(msg)
            return False
    
    bin_name=f"source-bin-{index}"
    nbin=Gst.Bin.new(bin_name)
    if not nbin:
        msg = f" Unable to create source bin {bin_name=}"
        logger.error(msg)
        sys.stderr.write(msg)

    
    if not uri_decode_bin:
        msg = " Unable to create uri decode bin "
        logger.error(msg)
        sys.stderr.write(msg)
    
    uri_decode_bin.set_property("uri",uri)
    
    uri_decode_bin.connect("pad-added",cb_newpad,nbin)
    uri_decode_bin.connect("child-added",decodebin_child_added,nbin)

    Gst.Bin.add(nbin,uri_decode_bin)
    bin_pad=nbin.add_pad(Gst.GhostPad.new_no_target("src",Gst.PadDirection.SRC))
    if not bin_pad:
        msg = f" Failed to add ghost pad in source bin "
        logger.error(msg)
        sys.stderr.write(msg)
        return None
    return nbin


def pgie_src_pad_prob(pad, info, user_data):
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        msg = f"Unable to get GstBuffer pgie src pad prob"
        logger.error(msg)
        return Gst.PadProbeReturn.DROP

    try:
        batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
        l_frame = batch_meta.frame_meta_list
        while l_frame:
            try:
                frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)

                l_obj=frame_meta.obj_meta_list
                while l_obj:
                    try:
                        obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)
                    except StopIteration:
                        break

                    if is_add_face_blur:
        
                        if (obj_meta.class_id == PGIE_CLASS_ID_FACE):
            
                            obj_meta.rect_params.border_width = 1
                            obj_meta.rect_params.has_bg_color = 1
            
                            obj_meta.rect_params.bg_color.red = 1.0   
                            obj_meta.rect_params.bg_color.green = 0.8 
                            obj_meta.rect_params.bg_color.blue = 0.6  
            
                            obj_meta.rect_params.bg_color.alpha = face_blur_opacity

                            obj_meta.tracker_confidence = 1.0 
                            obj_meta.object_id = 999999  
        
                    try:
                        l_obj=l_obj.next
                    except StopIteration:
                        break 

                
            except Exception as e:
                msg = (f"Error processing frame pgie src pad prob : {e}")
                logger.error(msg)
            
            try:
                l_frame = l_frame.next
            except StopIteration:
                break

    except Exception as e:
        msg = (f"Error processing buffer pgie src pad prob : {e}")
        logger.error(msg)
        return Gst.PadProbeReturn.DROP

    return Gst.PadProbeReturn.OK


def demux_src_pad_buffer_probe(pad, info, user_data):

    gst_buffer = info.get_buffer()
    if not gst_buffer:
        msg = "Unable to get GstBuffer demux src pad prob"
        logger.error(msg)
        return Gst.PadProbeReturn.DROP

    try:
        batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
        l_frame = batch_meta.frame_meta_list
        while l_frame:
            try:
                frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
                stream_id = frame_meta.pad_index
                pts = frame_meta.buf_pts
                syd_time_pts_ = convert_pts_to_sydtimestamp(pts, start_time_streammux[stream_id])
                display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
                display_meta.num_labels = 1
                text_params = display_meta.text_params[0]
                text_params.display_text = f"buf ts : {syd_time_pts_}"
                text_params.x_offset = 50
                text_params.y_offset = 50
                text_params.font_params.font_name = "Serif"
                text_params.font_params.font_size = 12
                text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
                text_params.set_bg_clr = 1
                text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
                pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)

                l_user = frame_meta.frame_user_meta_list
                while l_user:
                    try:
                        user_meta = pyds.NvDsUserMeta.cast(l_user.data)
                        if user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type(
                            "NVIDIA.DSANALYTICSFRAME.USER_META"
                        ):
                            user_meta_data = pyds.NvDsAnalyticsFrameMeta.cast(user_meta.user_meta_data)

                            obj_in_roi = user_meta_data.objInROIcnt
                            display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
                            no_of_labels = len(obj_in_roi)
                            display_meta.num_labels = no_of_labels
                            start_x,start_y = 50,80
                            for i,(label_id,count) in enumerate(obj_in_roi.items()):
                                
                                text_params = display_meta.text_params[i]
                                text_params.display_text = f"{label_id} : {count}"
                                text_params.x_offset = start_x
                                text_params.y_offset = start_y
                                text_params.font_params.font_name = "Serif"
                                text_params.font_params.font_size = 12
                                text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
                                text_params.set_bg_clr = 1
                                text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
                                start_y+=30

                            pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)

                    except Exception as e:
                        msg = (f"Error processing demux src pad prob user metadata: {e}")
                        logger.error(msg)
                    
                    try:
                        l_user = l_user.next
                    except StopIteration:
                        break

            except Exception as e:

                msg = f"Error processing demux src pad prob  frame: {e}"
                logger.error(msg)
                    
            try:
                l_frame = l_frame.next
            except StopIteration:
                break

    except Exception as e:
        msg = f"Error processing demux src pad prob  buffer: {e}"
        logger.error(msg)
        # return Gst.PadProbeReturn.DROP

    return Gst.PadProbeReturn.OK

 
 
def bus_call(bus, message, loop):
   
    t = message.type
    if t == Gst.MessageType.EOS:
        msg = (f"======================= End-of-stream =======================")
        logger.error(msg)
        # loop.quit()
    elif t==Gst.MessageType.WARNING:
        err, debug = message.parse_warning()
        logger.debug(f"{err}")
        print("Warning: %s: %s" % (err, debug))
    elif t == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        logger.error(f"{err}")
        # print("Error: %s: %s" % (err, debug))

    return True
 
 
def valve_prob(pad, info, user_data):
    stream_id = user_data["stream_id"]
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        msg = f"Unable to get GstBuffer valve prob"
        logger.error(msg)
        return Gst.PadProbeReturn.OK
 
    try:
        stream_index = "stream{0}".format(stream_id)
        perf_data.update_fps(stream_index)
    except Exception as e:
        msg = f"Error accessing batch valve prob metadata: {e} "
        logger.error(msg)
        return Gst.PadProbeReturn.OK
 
    return Gst.PadProbeReturn.OK
 
def make_element(element_name, i):
    element = Gst.ElementFactory.make(element_name, element_name)
    if not element:
        sys.stderr.write(" Unable to create {0}".format(element_name))
    element.set_property("name", "{0}-{1}".format(element_name, str(i)))
    return element
 
 
def format_location_callback(splitmuxsink, fragment_id, video_dir_path, stream_id):
    # Get the current date and time
    current_datetime = convert_syd_epoch_to_datetime_str(get_current_epoch_in_syd(),"date_time")
    # Construct the filename with the current datetime
    return os.path.join(video_dir_path, f"stream_{stream_id}_rtsp_out_{current_datetime}_uri.mp4")
 

def frame_filter_pad_probe(pad, info, user_data):

    global lag_dict
    base_img_path = user_data["base_img_path"]
    cam_id_ip_port = user_data["cam_id"]
    blob_base_img_path = user_data["blob_base_img_path"]

    gst_buffer = info.get_buffer()
    if not gst_buffer:
        msg = "Unable to get GstBuffer framefilter prob "
        logger.error(msg)
        return Gst.PadProbeReturn.OK

    try:
        batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
        l_frame = batch_meta.frame_meta_list

        while l_frame:
            try:
                frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
                stream_id = frame_meta.pad_index

                pts = frame_meta.buf_pts
                curr_time = start_time_streammux_.get(stream_id, 0)
                syd_time_pts_hhmmss = convert_syd_epoch_to_datetime_str(curr_time, "hhmmss")
                current_yymmdd = get_current_datetime_str("yymmdd")
                current_hhmmss = get_current_datetime_str("hhmmss")
                frame_number = frame_meta.frame_num
                ntp_sec = frame_meta.ntp_timestamp / 1e9
                syd_time_ntp_hhmmss = ntp_to_sydney_time(ntp_sec,"hhmmss")
                syd_time_ntp_sql = ntp_to_sydney_time(ntp_sec)
                image_folder_path,blob_path = create_dynamic_path(curr_time, base_img_path, cam_id_ip_port)

                image_name = f"CAM-{cam_id_ip_port[:7]}_DATE-{current_yymmdd}_PTS-{syd_time_pts_hhmmss}_NTP-{syd_time_ntp_hhmmss}_SYS-{current_hhmmss}.jpg"

                image_save_path = os.path.join(image_folder_path, image_name)

                # print(f"Saving image: {image_save_path}, stream_id: {stream_id}")
                multifilesink = pad.get_parent_element()
                multifilesink.set_property("location", image_save_path)

                l_user = frame_meta.frame_user_meta_list
                while l_user:
                    try:
                        user_meta = pyds.NvDsUserMeta.cast(l_user.data)
                        if user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type(
                            "NVIDIA.DSANALYTICSFRAME.USER_META"
                        ):
                            user_meta_data = pyds.NvDsAnalyticsFrameMeta.cast(user_meta.user_meta_data)
                            objInROIcnt = user_meta_data.objInROIcnt
                            objLCCumCnt = user_meta_data.objLCCumCnt
                            objLCCurrCnt = user_meta_data.objLCCurrCnt
                            num_rects = frame_meta.num_obj_meta
                            if  lag_dict[stream_id]["is_first"]:
                                lag_dict[stream_id]["data"] = {"cumulative_line_cross_count":objLCCumCnt}
                                lag_dict[stream_id]["is_first"] = False
                            syd_time_pts_ = convert_syd_epoch_to_datetime_str(curr_time, "datetime_sql")
                            data = {
                                "stream_id": stream_id,
                                "camera_name": cam_id_ip_port,
                                "unique_number_uuid": 10, 
                                "batch_id": frame_meta.batch_id,
                                "frame_num": frame_number,
                                "buf_pts": pts,
                                "ntp_timestamp": frame_meta.ntp_timestamp,
                                "source_id": frame_meta.source_id,
                                "object_count": {class_dict.get(int(cls_id),cls_id):count for cls_id,count in user_meta_data.objCnt.items()},
                                "Number_of_objects": num_rects,
                                "object_in_ROI_count": objInROIcnt,
                                "cumulative_line_cross_count": user_meta_data.objLCCumCnt,
                                "current_line_cross_count": objLCCurrCnt,
                                "overcrowding_status": user_meta_data.ocStatus,
                                "unique_id": user_meta_data.unique_id,
                                "PTS_TIME": syd_time_pts_,
                                "NTP_TS": syd_time_ntp_sql,
                                "blob_image_path":os.path.join(blob_base_img_path,blob_path,image_name),
                                "reconnect_id": src_ids[f'stream{stream_id}']["id"],
                                "data_num":lag_dict[stream_id]["data_num"]
                            }

                            diff_lag = {k: {k_:(v_-v[k_]) for k_,v_ in data[k].items()}  for k, v in lag_dict[stream_id]["data"].items()}["cumulative_line_cross_count"]

                            data["lag_diff_cumulative_line_cross_count"] = diff_lag

                            lag_dict[stream_id]["data"]["cumulative_line_cross_count"] = data["cumulative_line_cross_count"]
                            lag_dict[stream_id]["ds"] = curr_time
                            lag_dict[stream_id]["data_num"]+=1

                            send_msg(data,redis_key)
                           
                    except Exception as e:
                        msg = f"Error processing framefilter prob user metadata: {e}"
                        logger.error(msg)
                    
                    try:
                        l_user = l_user.next
                    except StopIteration:
                        break

            except Exception as e:
                msg = f"Error processing framefilter prob  frame metadata: {e}"
                logger.error(msg)

            try:
                l_frame = l_frame.next
            except StopIteration:
                break

    except Exception as e:
        msg = f"Error accessing framefilter prob  batch metadata: {e}"
        logger.error(msg)
        # return Gst.PadProbeReturn.OK

    return Gst.PadProbeReturn.OK



def dummy_prob(pad, info, user_data):
    print(f" inside the {user_data} prob ")
    return Gst.PadProbeReturn.OK

def streammux_prob(pad, info, user_data):

    global streammux_first_frame_processed
    global start_time_streammux
    global start_time_streammux_

    stream_id = user_data.get('stream_id')
    no_of_sec_to_predict = user_data.get('no_of_sec_to_predict')
    if not start_time_streammux_first_frame[stream_id]:
        start_time_streammux[stream_id] = get_current_epoch_in_syd()
        start_time_streammux_[stream_id] = get_current_epoch_in_syd()
        start_time_streammux_first_frame[stream_id] = True


    gst_buffer = info.get_buffer()
    if not gst_buffer:
        msg = f"Unable to get GstBuffer streammux "
        logger.error(msg)
        return Gst.PadProbeReturn.OK

    try:
        pts = start_time_streammux[stream_id] + gst_buffer.pts/1000000000.0
        if (pts - start_time_streammux_[stream_id] ) < no_of_sec_to_predict:
            return Gst.PadProbeReturn.DROP
        start_time_streammux_[stream_id] =  pts

    except Exception as e:
        msg = f"Error accessing streammux prob batch metadata: {e}"
        logger.error(msg)
        return Gst.PadProbeReturn.OK
    
    msg = f"frame flow from streammux prob to downstream elements"
    logger.info(msg)
    return Gst.PadProbeReturn.OK


 
def main():

    global perf_data, pipeline, src_ids, streammux, number_sources,is_first
    
    Gst.init(None)

    pipeline_name = f"rtsp-pipeline-{uuid.uuid1()}"
    msg = f"pipeline create start {pipeline_name=} ..."
    logger.info(msg)
    pipeline = Gst.Pipeline.new(pipeline_name)

    src_ids = {f'stream{i}': {'id': None, 'src': None, 'streamid': i,"no_of_att":0} for i in range(number_sources)}

    cam_ids = {stream_id:extract_rtsp_details(uri_name) for stream_id,uri_name in enumerate(streams)}

    streammux = Gst.ElementFactory.make("nvstreammux", "streammuxer")
    if not streammux:
        msg = "Error: Could not create streammuxer!"
        logger.error(msg)
        return
    streammux.set_property('batch-size', number_sources)
    streammux.set_property('batched-push-timeout', streammux_muxer_batch_timeout_usec)
    streammux.set_property("sync-inputs",sync_inputs)
    streammux.set_property("attach-sys-ts",attach_sys_ts)
    streammux.set_property("max-latency",max_latency)
    streammux.set_property("frame-duration",frame_duration)
    streammux.set_property("width",frame_width)
    streammux.set_property("height",frame_height)
    streammux.set_property("live-source",is_live)
    streammux.set_property("drop-pipeline-eos",drop_eos)
    pipeline.add(streammux)


    for src_id, src in enumerate(streams):

        last_id = str(uuid.uuid1())
        src_ids[f'stream{src_id}']['id'] = last_id
        src_ids[f'stream{src_id}']['src'] = src

        bin = create_source_bin(last_id,src_id,src)
        if not bin:
            msg = f"Error: Could not create source bin for {src_id}"
            logger.error(msg)
            del src_ids[f'stream{src_id}']
            number_sources-=1
            continue

        msg = (f"uridecodebin_{last_id} created for {src_id=}")
        logger.info(msg)
        valve = Gst.ElementFactory.make("valve", f"valve_{src_id}")
        queue = Gst.ElementFactory.make("queue", f"queue_{src_id}")

        pipeline.add(bin)
        pipeline.add(valve)
        pipeline.add(queue)

        bin_src = bin.get_static_pad("src")

        user_data={"stream_id":src_id}
        
        
        sink_pad = valve.get_static_pad("sink")
        sink_pad.add_probe(Gst.PadProbeType.BUFFER, valve_prob, user_data)


        padname=f"sink_{src_id}"
        valve_src_pad = valve.get_static_pad("src")
        valve_sink_pad = valve.get_static_pad("sink")

        if not valve_src_pad or not valve_sink_pad:
            msg = (f"Error: Could not get src pad for valve_{src_id} or sink_{src_id}")
            logger.error(msg)
            return
        
        streammux_sinkpad= streammux.request_pad_simple(padname)
        if not streammux_sinkpad:
            msg = (f"Error: Could not get sink pad for streammux_{src_id}")
            logger.error(msg)
            return
        user_meta = {"stream_id":src_id,'no_of_sec_to_predict':no_of_sec_to_send_data}
        streammux_sinkpad.add_probe(Gst.PadProbeType.BUFFER, streammux_prob, user_meta)
        
        queue_sink_pad = queue.get_static_pad("sink")
        queue_src_pad = queue.get_static_pad("src")

        bin_src.link(valve_sink_pad)
        valve_src_pad.link(queue_sink_pad)
        queue_src_pad.link(streammux_sinkpad)

    active_streams = [ src["streamid"] for src in src_ids.values()]
    is_first = False

    if not number_sources:
        msg = f"no activate streams {number_sources=}"
        logger.error(msg)
        sys.exit(1)
    

    pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    pgie.set_property('config-file-path', infer_config_path)
    pgie_batch_size=pgie.get_property("batch-size")
    if(pgie_batch_size != number_sources):
        msg = ("WARNING: Overriding infer-config batch-size",pgie_batch_size," with number of sources ", number_sources)
        logger.warning(msg)
        pgie.set_property("batch-size",number_sources)

    nvanalytics = Gst.ElementFactory.make("nvdsanalytics", "analytics")
    if not nvanalytics:
        msg = " Unable to create nvanalytics "
        logger.error(msg)
        sys.stderr.write(msg)
    nvanalytics.set_property("config-file", analytics_config_path)

    nvstreamdemux = Gst.ElementFactory.make("nvstreamdemux", "nvstreamdemux")
    if not nvstreamdemux:
        msg = " Unable to create nvstreamdemux "
        logger.error(msg)
        sys.stderr.write(msg)


    queue1 = Gst.ElementFactory.make("queue", "queue1")
    queue2 = Gst.ElementFactory.make("queue", "queue2")
    queue3 = Gst.ElementFactory.make("queue", "queue3")
    

    for q in [queue1,queue2,queue3]:
        q.set_property("max-size-buffers", max_size_buffers)
        q.set_property("leaky", True)
        pipeline.add(q)

    pgie_src = pgie.get_static_pad("src")
    pgie_src.add_probe(Gst.PadProbeType.BUFFER, pgie_src_pad_prob, 0)
    
    pipeline.add(pgie)
    pipeline.add(nvanalytics)
    pipeline.add(nvstreamdemux)

    streammux.link(queue1)
    queue1.link(pgie)
    pgie.link(queue2)
    queue2.link(nvanalytics)
    nvanalytics.link(queue3)
    queue3.link(nvstreamdemux)

    for i in active_streams:

        queue4_ = Gst.ElementFactory.make("queue", f"queue__{i}")
    
        pipeline.add(queue4_)

        queue4_.set_property("max-size-buffers", max_size_buffers)
        queue4_.set_property("leaky", 2)

        nvvideoconvert =  Gst.ElementFactory.make("nvvideoconvert", f"nvvideoconvert_{i}")
        # nvvideoconvertsink = nvvideoconvert.get_static_pad("sink")
        # nvvideoconvert = make_element("capsfilter", i)
        pipeline.add(nvvideoconvert)

        nvdsosd = Gst.ElementFactory.make("nvdsosd", f"nvdsosd_{i}")
        pipeline.add(nvdsosd)
        nvdsosd.set_property("process-mode", osd_process_mode)
        nvdsosd.set_property("display-text", osd_display_text)
        
        padname = "src_%u" % i
        demuxsrcpad = nvstreamdemux.request_pad_simple(padname)
        if not demuxsrcpad:
            sys.stderr.write("Unable to create demux src pad \n")
        
        queuesinkpad = queue4_.get_static_pad("sink")
        if not queuesinkpad:
            sys.stderr.write("Unable to create queue sink pad \n")
        demuxsrcpad.link(queuesinkpad)


        queue5 = Gst.ElementFactory.make("queue", f"queue5_{i}")
        pipeline.add(queue5)

        queue5.set_property("max-size-buffers", max_size_buffers)
        queue5.set_property("leaky", 2)

        queue5_src = queue5.get_static_pad("src")

        queue5_src.add_probe(Gst.PadProbeType.BUFFER, dummy_prob, "queue5 src")

        nvvidconv2 = Gst.ElementFactory.make("nvvideoconvert", f"nvvideoconvert1_{i}")

        pipeline.add(nvvidconv2)

        jpegenc = Gst.ElementFactory.make("nvjpegenc", f"jpeg-encoder_{i}")
        jpegenc.set_property("quality", im_quality)

        if not jpegenc:
            msg = "Unable to create nvjpegenc "
            logger.error(msg)
            sys.stderr.write(msg)

        pipeline.add(jpegenc)

        multifilesink = Gst.ElementFactory.make("multifilesink", f"multi-file-sink_{i}")
        if not multifilesink:
            msg = " Unable to create multifilesink "
            logger.error(msg)
            sys.stderr.write(msg)

        multifilesink.set_property("post-messages", True)

        sinkpad = multifilesink.get_static_pad("sink")

        probe_data = {
            "cam_id": cam_ids[i],
            "base_img_path":img_dir_name,
            "no_of_sec_send_msg":no_of_sec_to_send_data,
            "blob_base_img_path":blob_base_img_path
            
        }

        pipeline.add(multifilesink)

        queue4_.link(nvvideoconvert)
        nvvideoconvert.link(nvdsosd)
        nvdsosd.link(queue5)
        queue5.link(nvvidconv2)
        nvvidconv2.link(jpegenc)
        jpegenc.link(multifilesink)


        jpegenc_sinkpad = jpegenc.get_static_pad("sink")
        jpegenc_sinkpad.add_probe(Gst.PadProbeType.BUFFER, dummy_prob, "jpegenc_sinkpad")

        nvvidconv2_sinkpad = nvvidconv2.get_static_pad("sink")
        nvvidconv2_sinkpad.add_probe(Gst.PadProbeType.BUFFER, dummy_prob, "nvvidconv2_sinkpad")

        sinkpad = multifilesink.get_static_pad("sink")
        if not sinkpad:
            msg = "Unable to get sink pad of multifilesink"
            logger.error(msg)
            sys.stderr.write(msg)
            sys.exit(1)
        else:
            sinkpad.add_probe(Gst.PadProbeType.BUFFER, frame_filter_pad_probe, probe_data)

        demuxsrcpad.add_probe(Gst.PadProbeType.BUFFER, demux_src_pad_buffer_probe, probe_data)


    msg = "Pipeline setup complete"
    logger.info(msg)

    # Get the bus
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    loop = GLib.MainLoop()

    perf_data = PERF_DATA(active_streams,loop)
    GLib.timeout_add(reconnect_interval, perf_data.perf_print_callback)
    bus.connect("message", bus_call, loop)
 
    # Start the pipeline
    pipeline.set_state(Gst.State.PLAYING)
    msg = "Pipeline is playing..."
    logger.info(msg)
 
    try:
        loop.run()
    except KeyboardInterrupt:
        msg = "Pipeline interrupted by user."
        logger.info(msg)
    finally:
        
        pipeline.set_state(Gst.State.NULL)
        msg = "Pipeline stopped."
        logger.info(msg)
 
if __name__ == "__main__":
    
    main()
  1. if using nvurisrcbin, could you elaborate on " I’m facing a pixelation issue."?
  2. If you want to implement own reconnection logic, please refer to deepstream-app opensource code. It already supports RTSP reconnection functionality. deepstream-app uses rtspsrc_monitor_probe_func to monitor src’s data receiving. it will reconnect rtsp srouce in watch_source_status when not receiving data in a specific time.