Below is a simple example for your reference. If there is no data for 3 seconds, it is considered that a reconnection has occurred.
Or you can wait for the next version to be released
diff --git a/apps/deepstream-test3/deepstream_test_3.py b/apps/deepstream-test3/deepstream_test_3.py
index 75a64d5..16eb8b7 100755
--- a/apps/deepstream-test3/deepstream_test_3.py
+++ b/apps/deepstream-test3/deepstream_test_3.py
@@ -145,17 +145,48 @@ def cb_newpad(decodebin, decoder_src_pad,data):
else:
sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")
+buffer_time = 0
+reconnect_interval = 3
+reconnect_count = 0
+max_reconnect_count = 3
+loop = None
+
+def rtspsrc_monitor_probe_func(pad, info, u_data):
+ if info.type & Gst.PadProbeType.BUFFER:
+ global buffer_time
+ buffer_time = time.time()
+ if info.type & Gst.PadProbeType.EVENT_DOWNSTREAM:
+ pass
+
+ return Gst.PadProbeReturn.OK
+
def decodebin_child_added(child_proxy,Object,name,user_data):
print("Decodebin child added:", name, "\n")
- if(name.find("decodebin") != -1):
+ if (name.find("decodebin") != -1):
Object.connect("child-added",decodebin_child_added,user_data)
+ if "depay" in name:
+ print(f"depay ......")
+ depay_element = child_proxy.get_by_name("depay")
+ depay_element_sinkpad = depay_element.get_static_pad("sink")
+ GLib.timeout_add_seconds(1, watch_source_status, None)
+ depay_element_sinkpad.add_probe(Gst.PadProbeType.BUFFER | Gst.PadProbeType.EVENT_DOWNSTREAM, rtspsrc_monitor_probe_func, 0)
+
if "source" in name:
source_element = child_proxy.get_by_name("source")
if source_element.find_property('drop-on-latency') != None:
Object.set_property("drop-on-latency", True)
-
+def watch_source_status(user_data):
+ global buffer_time
+ current_time = time.time()
+ delta = current_time - buffer_time
+ print(f"delta {delta}")
+ if delta > reconnect_interval:
+ reconnect_count += 1
+ if reconnect_count > max_reconnect_count:
+ print("reconnect too many ")
+ return True
def create_source_bin(index,uri):
print("Creating source bin")
@@ -171,11 +202,12 @@ def create_source_bin(index,uri):
# Source element for reading from the uri.
# We will use decodebin and let it figure out the container format of the
# stream and the codec and plug the appropriate demux and decode plugins.
- if file_loop:
+ if file_loop or True:
# use nvurisrcbin to enable file-loop
uri_decode_bin=Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
- uri_decode_bin.set_property("file-loop", 1)
+ # uri_decode_bin.set_property("file-loop", 1)
uri_decode_bin.set_property("cudadec-memtype", 0)
+ uri_decode_bin.set_property('rtsp-reconnect-interval', reconnect_interval)
else:
uri_decode_bin=Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
if not uri_decode_bin:
@@ -186,6 +218,7 @@ def create_source_bin(index,uri):
# callback once a new pad for raw data has beed created by the decodebin
uri_decode_bin.connect("pad-added",cb_newpad,nbin)
uri_decode_bin.connect("child-added",decodebin_child_added,nbin)
+ # uri_decode_bin.connect("deep-element-added",decodebin_deep_element_added,nbin)
# We need to create a ghost pad for the source bin which will act as a proxy
# for the video decoder src pad. The ghost pad will not have a target right
@@ -386,7 +419,7 @@ def main(args, requested_pgie=None, config=None, disable_probe=False):
print(i, ": ", source)
print("Starting pipeline \n")
- # start play back and listed to events
+ # start play back and listed to events
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()