Access `watch_source_status` from python binding

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU): dPU A40.
• DeepStream Version: 6.4-triton-multiarch.
• TensorRT Version: 8.6.1.6.
• NVIDIA GPU Driver Version (valid for GPU only): 535.146.02.
• Issue Type( questions, new requirements, bugs): questions.
• How to reproduce the issue ? (This is for bugs. Including which sample app is using, the configuration files content, the command line used and other details for reproducing)

How i can access watch_source_status from python binding to check if rtsp source is disconnected based on buffer’s time.?

Hi,

rtspsrc posts an error in the GStreamer bus when it gets disconnected. You can add a bus handler with Gst.Bus.add_signal_watch_full.

If you need to reconnect after rtsp is disconnected, it is recommended to use nvurisrcbin.
No python binding required.

I want to reconnect specific number of trying not all the time, Can do that via nvurisrcbin?

Yes, of course. View the definition of rtsp_reconnect_attempts through gst-inspect-1.0 nvurisrcbin

I don’t found rtsp_reconnect_attempts through gst-inspect-1.0 nvurisrcbin from my side, and after apply grep

Sorry, I made a mistake, this is a feature that will be released in the next version.

There is a workaround for the moment.

After reconnecting for the specified number of times, set rtsp-reconnect-interval to 0

How I can do this?

Count the number of reconnections in your app, and then write code such as nvurisrcbin.set_property('rtsp-reconnect-interval', 0)

How I can do this via deepstream_test3 python code?

Below is a simple example for your reference. If there is no data for 3 seconds, it is considered that a reconnection has occurred.
Or you can wait for the next version to be released

diff --git a/apps/deepstream-test3/deepstream_test_3.py b/apps/deepstream-test3/deepstream_test_3.py
index 75a64d5..16eb8b7 100755
--- a/apps/deepstream-test3/deepstream_test_3.py
+++ b/apps/deepstream-test3/deepstream_test_3.py
@@ -145,17 +145,48 @@ def cb_newpad(decodebin, decoder_src_pad,data):
         else:
             sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")

+buffer_time = 0
+reconnect_interval = 3
+reconnect_count = 0
+max_reconnect_count = 3
+loop = None
+
+def rtspsrc_monitor_probe_func(pad, info, u_data):
+    if info.type & Gst.PadProbeType.BUFFER:
+        global buffer_time
+        buffer_time = time.time()
+    if info.type & Gst.PadProbeType.EVENT_DOWNSTREAM:
+        pass
+
+    return Gst.PadProbeReturn.OK
+
 def decodebin_child_added(child_proxy,Object,name,user_data):
     print("Decodebin child added:", name, "\n")
-    if(name.find("decodebin") != -1):
+    if (name.find("decodebin") != -1):
         Object.connect("child-added",decodebin_child_added,user_data)

+    if "depay" in name:
+        print(f"depay ......")
+        depay_element = child_proxy.get_by_name("depay")
+        depay_element_sinkpad = depay_element.get_static_pad("sink")
+        GLib.timeout_add_seconds(1, watch_source_status, None)
+        depay_element_sinkpad.add_probe(Gst.PadProbeType.BUFFER | Gst.PadProbeType.EVENT_DOWNSTREAM, rtspsrc_monitor_probe_func, 0)
+
     if "source" in name:
         source_element = child_proxy.get_by_name("source")
         if source_element.find_property('drop-on-latency') != None:
             Object.set_property("drop-on-latency", True)

-
+def watch_source_status(user_data):
+    global buffer_time
+    current_time = time.time()
+    delta = current_time - buffer_time
+    print(f"delta {delta}")
+    if delta > reconnect_interval:
+        reconnect_count += 1
+        if reconnect_count > max_reconnect_count:
+            print("reconnect too many ")
+    return True

 def create_source_bin(index,uri):
     print("Creating source bin")
@@ -171,11 +202,12 @@ def create_source_bin(index,uri):
     # Source element for reading from the uri.
     # We will use decodebin and let it figure out the container format of the
     # stream and the codec and plug the appropriate demux and decode plugins.
-    if file_loop:
+    if file_loop or True:
         # use nvurisrcbin to enable file-loop
         uri_decode_bin=Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
-        uri_decode_bin.set_property("file-loop", 1)
+        # uri_decode_bin.set_property("file-loop", 1)
         uri_decode_bin.set_property("cudadec-memtype", 0)
+        uri_decode_bin.set_property('rtsp-reconnect-interval', reconnect_interval)
     else:
         uri_decode_bin=Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
     if not uri_decode_bin:
@@ -186,6 +218,7 @@ def create_source_bin(index,uri):
     # callback once a new pad for raw data has beed created by the decodebin
     uri_decode_bin.connect("pad-added",cb_newpad,nbin)
     uri_decode_bin.connect("child-added",decodebin_child_added,nbin)
+    # uri_decode_bin.connect("deep-element-added",decodebin_deep_element_added,nbin)

     # We need to create a ghost pad for the source bin which will act as a proxy
     # for the video decoder src pad. The ghost pad will not have a target right
@@ -386,7 +419,7 @@ def main(args, requested_pgie=None, config=None, disable_probe=False):
         print(i, ": ", source)

     print("Starting pipeline \n")
-    # start play back and listed to events
+    # start play back and listed to events
     pipeline.set_state(Gst.State.PLAYING)
     try:
         loop.run()