I see, I tried to make a program that automatically reconnects to a stream when it goes back online but I am not sure it is reconnecting properly due to the probe function on pgie not printing frame info after reconnection. The experiment I set was based on using a stable rtsp stream and a local one on the deepstream_rtsp code. To simulate a lost connection, I stop the local stream and start it again after sometime. Here is the code I am using:
import sys
import time
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import pyds
# --- Global State ---
pipeline = None
streammux = None
source_bins_info = {}
reconnection_success_logged = set()
class SourceInfo:
"""Helper class to store information about a source bin."""
def __init__(self, index, uri, bin_element, muxer_sink_pad):
self.index = index
self.uri = uri
self.bin = bin_element
self.muxer_sink_pad = muxer_sink_pad
self.reconnect_scheduled = False
self.last_frame_num = -1 # Add this for tracking
def pgie_sink_pad_buffer_probe(pad, info, u_data):
"""
Analyzes metadata to verify stream reconnection and processing.
"""
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer")
return Gst.PadProbeReturn.OK
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
if not batch_meta:
return Gst.PadProbeReturn.OK
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
# Cast the data to NvDsFrameMeta
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
source_id = frame_meta.source_id
frame_num = frame_meta.frame_num
source_info = source_bins_info.get(source_id)
if not source_info:
l_frame = l_frame.next
continue
# If a reconnect was scheduled and we haven't logged its success yet...
if source_info.reconnect_scheduled and source_id not in reconnection_success_logged:
print(f"✅ SUCCESS: Source {source_id} reconnected. Processing resumed at frame number {frame_num}.")
# Log that we've confirmed this source, to avoid repeated messages
reconnection_success_logged.add(source_id)
source_info.last_frame_num = frame_num
# Optional: Print regular processing info to show the pipeline is active. Source ID different than 1 because I only want to see if
if source_id != 1:
print(f" -> Processing: Source ID={source_id}, Frame Num={frame_num}")
# Move to the next frame in the batch
l_frame = l_frame.next
return Gst.PadProbeReturn.OK
def decodebin_child_added(child_proxy, obj, name, user_data):
"""
Callback to configure the underlying source element inside uridecodebin.
This is crucial for setting timeouts for live streams.
We check for 'source' as it's more generic than 'rtspsrc'.
"""
if 'source' in name:
print(f"✅ Found source element: '{name}'. Applying live stream properties...")
obj.set_property("tcp-timeout", 5 * 1000 * 1000)
obj.set_property("latency", 2000)
obj.set_property("drop-on-latency", True)
def cb_newpad(decodebin, decoder_src_pad, data):
"""
Dynamic pad callback to set the target of the source bin's ghost pad.
"""
caps = decoder_src_pad.get_current_caps()
if not caps:
caps = decoder_src_pad.query_caps()
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()
source_bin = data
if gstname.startswith("video/x-raw"):
ghost_pad = source_bin.get_static_pad("src")
if ghost_pad.get_target():
print("Ghost pad already has a target. Ignoring new pad.")
return
print(f"✅ Setting target for ghost pad: linking to '{decoder_src_pad.get_name()}'")
if not ghost_pad.set_target(decoder_src_pad):
sys.stderr.write("Failed to set ghost pad target.\n")
def create_source_bin(index, uri):
"""
Creates a Gst.Bin that contains a uridecodebin to handle a single stream.
"""
bin_name = f"source-bin-{index}"
print(f"Creating bin: {bin_name} for URI: {uri}")
nbin = Gst.Bin.new(bin_name)
uri_decode_bin = Gst.ElementFactory.make("uridecodebin", f"uri-decode-bin-{index}")
if not nbin or not uri_decode_bin:
sys.stderr.write("Failed to create source bin elements.\n")
return None
uri_decode_bin.set_property("uri", uri)
uri_decode_bin.connect("pad-added", cb_newpad, nbin)
uri_decode_bin.connect("child-added", decodebin_child_added, nbin)
Gst.Bin.add(nbin, uri_decode_bin)
ghost_pad = Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC)
nbin.add_pad(ghost_pad)
return nbin
def reconnect_source(user_data):
"""
The core reconnection logic, now with robust error handling.
"""
source_info = user_data
bin_name = source_info.bin.get_name()
print(f"\n--- Attempting to reconnect source {source_info.index} ({bin_name}) ---\n")
try:
source_info.bin.set_state(Gst.State.NULL)
streammux.release_request_pad(source_info.muxer_sink_pad)
pipeline.remove(source_info.bin)
# A small delay helps ensure all resources are released before re-adding
time.sleep(0.5)
new_source_bin = create_source_bin(source_info.index, source_info.uri)
if not new_source_bin:
sys.stderr.write(f"❗️ Failed to recreate source bin for index {source_info.index}.\n")
return GLib.SOURCE_REMOVE
pipeline.add(new_source_bin)
sinkpad_template = streammux.get_pad_template("sink_%u")
new_muxer_sink_pad = streammux.request_pad(sinkpad_template, f"sink_{source_info.index}", None)
if not new_muxer_sink_pad:
raise RuntimeError(f"❗️ Failed to get a new sink pad from streammux for source {source_info.index}.")
srcpad = new_source_bin.get_static_pad("src")
if srcpad.link(new_muxer_sink_pad) != Gst.PadLinkReturn.OK:
raise RuntimeError(f"❗️ Failed to link new source bin {source_info.index} to streammux.")
source_info.bin = new_source_bin
source_info.muxer_sink_pad = new_muxer_sink_pad
new_source_bin.sync_state_with_parent()
print(f"\n✅ Reconnection logic complete for source {source_info.index}. Waiting for data... ---\n")
except Exception as e:
print(f"❌ CRITICAL: An error occurred during reconnection for source {source_info.index}: {e}")
finally:
print(f"Resetting reconnection state for source {source_info.index}.")
source_info.reconnect_scheduled = False
= return GLib.SOURCE_REMOVE
def bus_call(bus, message, loop):
"""
Callback to process messages from the GStreamer bus.
"""
t = message.type
msg_src = message.src
def find_source_bin_index_from_element(element):
parent = element
while parent is not None:
for index, info in source_bins_info.items():
if parent == info.bin:
return index
parent = parent.get_parent()
return -1 # Not found in any of our tracked source bins
if t == Gst.MessageType.EOS:
if msg_src == pipeline:
print("End-of-stream received from pipeline. Quitting.")
loop.quit()
else:
print(f"-> Received EOS from '{msg_src.get_name()}'")
source_index = find_source_bin_index_from_element(msg_src)
if source_index != -1:
info = source_bins_info[source_index]
if not info.reconnect_scheduled:
print(f"EOS is from source {source_index}. Scheduling reconnection in 5 seconds...")
info.reconnect_scheduled = True
reconnection_success_logged.discard(source_index)
GLib.timeout_add_seconds(5, reconnect_source, info)
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(f"❗️ ERROR from element '{msg_src.get_name()}': {err.message}")
print(f" Debug details: {debug}")
source_index = find_source_bin_index_from_element(msg_src)
if source_index != -1:
info = source_bins_info[source_index]
if not info.reconnect_scheduled:
print(f"Error is from source {source_index}. Scheduling reconnection in 5 seconds...")
info.reconnect_scheduled = True
# Reset the success log for this source
reconnection_success_logged.discard(source_index)
GLib.timeout_add_seconds(5, reconnect_source, info)
return True
def main():
global pipeline, streammux, source_bins_info
stream_uris = [
"rtsp://127.0.0.1:8552/ds-test-0",
"rtsp://link_to_a_stable_stream" #Possible to use a video file too
]
Gst.init(None)
print("Creating Pipeline...")
pipeline = Gst.Pipeline.new("reconnect-test-pipeline")
streammux = Gst.ElementFactory.make("nvstreammux", "stream-muxer")
pgie = Gst.ElementFactory.make("nvinfer", "primary-gie")
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "nvvid-converter")
sink = Gst.ElementFactory.make("fakesink", "sink")
if not pipeline or not streammux or not pgie or not nvvidconv or not sink:
sys.stderr.write("Failed to create core elements.\n")
sys.exit(1)
streammux.set_property('width', 1280)
streammux.set_property('height', 720)
streammux.set_property('batch-size', len(stream_uris))
streammux.set_property('batched-push-timeout', 40000)
streammux.set_property('live-source', 1)
pgie.set_property('config-file-path', "config_pgie_yolo_det.txt")
print("Adding elements to Pipeline...")
pipeline.add(streammux)
pipeline.add(pgie)
pipeline.add(nvvidconv)
pipeline.add(sink)
print("Linking elements in the Pipeline...")
streammux.link(pgie)
pgie.link(nvvidconv)
nvvidconv.link(sink)
pgiesinkpad = pgie.get_static_pad("sink")
if not pgiesinkpad:
sys.stderr.write("Unable to get sink pad of pgie\n")
else:
pgiesinkpad.add_probe(Gst.PadProbeType.BUFFER, pgie_sink_pad_buffer_probe, 0)
print("Added probe to PGIE sink pad.")
print("Creating and adding source bins to pipeline...")
for i, uri in enumerate(stream_uris):
source_bin = create_source_bin(i, uri)
if not source_bin:
continue
pipeline.add(source_bin)
sinkpad_template = streammux.get_pad_template("sink_%u")
muxer_sink_pad = streammux.request_pad(sinkpad_template, f"sink_{i}", None)
srcpad = source_bin.get_static_pad("src")
srcpad.link(muxer_sink_pad)
source_bins_info[i] = SourceInfo(i, uri, source_bin, muxer_sink_pad)
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
print("Starting pipeline...")
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except KeyboardInterrupt:
print("Ctrl+C pressed. Exiting.")
print("Stopping pipeline.")
pipeline.set_state(Gst.State.NULL)
if __name__ == '__main__':
main()
I wonder what else I have to modify to properly resume getting frame info in the pgie probe or if it is possible at all.