I am trying to build a DeepStream 7.0 Python application with multiple RTSP cameras using nvstreammux and a tiler. I want the application to automatically reconnect cameras if they go offline and then come back online.
I have implemented a try_readd_offline_sources() function to periodically check for offline sources and re-add them. However, it is not reliably reconnecting cameras when they are stopped and restarted.
Here’s a simplified version of my code for the pipeline and reconnect logic:
import sys
import gi
import math
from gi.repository import Gst, GLib, GstRtspServerGst.init(None)
loop = None
pipeline = None
streammux = NoneMAX_NUM_SOURCES = 4
MUXER_OUTPUT_WIDTH = 1280
MUXER_OUTPUT_HEIGHT = 720
TILED_OUTPUT_WIDTH = 1280
TILED_OUTPUT_HEIGHT = 720
MUXER_BATCH_TIMEOUT_USEC = 40000RTP_PORT = 5400
RTSP_PORT = 8554
RTSP_MOUNT = “/ds-test”
RTSP_ENCODING_NAME = “H264”
RTSP_PAYLOAD = 96g_source_bin_list = [None] * MAX_NUM_SOURCES
g_source_uri_list = [None] * MAX_NUM_SOURCES
g_source_name_list = [None] * MAX_NUM_SOURCES
g_source_enabled = [False] * MAX_NUM_SOURCES
g_num_sources = 0def log(msg):
print(msg, flush=True)def cb_newpad(decodebin, pad, data):
global streammux
source_id = int(data)
caps = pad.get_current_caps()
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()
if “video” in gstname:
pad_name = f"sink_{source_id}"
sinkpad = streammux.get_request_pad(pad_name)
if sinkpad:
pad.link(sinkpad)
uri = g_source_uri_list[source_id] if g_source_uri_list[source_id] else “unknown”
log(f"Decodebin linked: source_id={source_id} uri={uri} → {pad_name}")def decodebin_child_added(child_proxy, obj, name, user_data):
try:
if “rtspsrc” in name:
if obj.find_property(‘latency’) is not None:
obj.set_property(“latency”, 200)
if “source” in name:
if obj.find_property(‘drop-on-latency’) is not None:
obj.set_property(“drop-on-latency”, True)
except Exception:
pass
def create_uridecode_bin(index, uri):
bin_name = f"source-bin-{index:02d}"
uridec = Gst.ElementFactory.make(“uridecodebin”, bin_name)
uridec.set_property(“uri”, uri)# Safe child-added callback def child_added_cb(child_proxy, obj, name, user_data): if "rtspsrc" in name: try: if obj.find_property("latency") is not None: obj.set_property("latency", 400) # increase buffer if obj.find_property("protocols") is not None: obj.set_property("protocols", 4) # TCP transport except Exception as e: log(f"Warning: Could not set rtspsrc props: {e}") uridec.connect("pad-added", cb_newpad, index) uridec.connect("child-added", child_added_cb, index) g_source_bin_list[index] = uridec g_source_uri_list[index] = uri g_source_enabled[index] = True return uridecdef stop_release_source(source_id):
global pipeline, streammux, g_source_bin_list, g_source_enabled, g_num_sources
src_bin = g_source_bin_list[source_id]
if src_bin:
src_bin.set_state(Gst.State.NULL)
try:
pipeline.remove(src_bin)
except Exception:
pass
pad_name = f"sink_{source_id}"
try:
sinkpad = streammux.get_static_pad(pad_name)
if sinkpad:
sinkpad.send_event(Gst.Event.new_flush_stop(False))
streammux.release_request_pad(sinkpad)
except Exception:
pass
g_source_bin_list[source_id] = None
g_source_enabled[source_id] = False
if g_num_sources > 0:
g_num_sources -= 1def try_readd_offline_sources(user_data=None):
global pipeline, g_source_uri_list, g_source_bin_list, g_source_enabled, g_num_sources, tiler
for sid in range(MAX_NUM_SOURCES):
uri = g_source_uri_list[sid]
if uri and not g_source_enabled[sid]:
log(f"Attempting to re-add source {sid} → {uri}“)
stop_release_source(sid)
src_bin = create_uridecode_bin(sid, uri)
pipeline.add(src_bin)
r = src_bin.set_state(Gst.State.PLAYING)
if r in (Gst.StateChangeReturn.SUCCESS, Gst.StateChangeReturn.ASYNC, Gst.StateChangeReturn.NO_PREROLL):
g_source_enabled[sid] = True
g_num_sources += 1
log(f"Source {sid} re-added successfully”)
# update tiler layout
rows = int(math.sqrt(g_num_sources))
cols = int(math.ceil(g_num_sources / rows))
tiler.set_property(“rows”, rows)
tiler.set_property(“columns”, cols)
log(f"Tiler updated {rows}x{cols}“)
else:
pipeline.remove(src_bin)
log(f"Failed to re-add source {sid}”)
return Truedef bus_call(bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
log(“End-of-stream”)
loop.quit()
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
log(f"ERROR from {message.src.get_name()}: {err.message} debug={debug}")
return Truedef build_pipeline(initial_uris):
global pipeline, streammux, g_num_sourcespipeline = Gst.Pipeline.new("dst-pipeline") # nvstreammux streammux = Gst.ElementFactory.make("nvstreammux", "streammux") streammux.set_property("width", MUXER_OUTPUT_WIDTH) streammux.set_property("height", MUXER_OUTPUT_HEIGHT) streammux.set_property("batch-size", MAX_NUM_SOURCES) streammux.set_property("batched-push-timeout", MUXER_BATCH_TIMEOUT_USEC) streammux.set_property("live-source", True) pipeline.add(streammux) # tiler tiler = Gst.ElementFactory.make("nvmultistreamtiler", "tiler") pipeline.add(tiler) # nvvidconv + nvosd nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "nvvidconv") nvosd = Gst.ElementFactory.make("nvdsosd", "nvosd") pipeline.add(nvvidconv) pipeline.add(nvosd) # encoder -> rtppay -> udpsink conv_post = Gst.ElementFactory.make("nvvideoconvert", "conv_postosd") encoder = Gst.ElementFactory.make("nvv4l2h264enc", "h264-enc") encoder.set_property("bitrate", 2000000) rtppay = Gst.ElementFactory.make("rtph264pay", "rtppay") rtppay.set_property("pt", RTSP_PAYLOAD) udpsink = Gst.ElementFactory.make("udpsink", "udpsink") udpsink.set_property("host", "127.0.0.1") udpsink.set_property("port", RTP_PORT) udpsink.set_property("sync", False) pipeline.add(conv_post) pipeline.add(encoder) pipeline.add(rtppay) pipeline.add(udpsink) # link elements streammux.link(tiler) tiler.link(nvvidconv) nvvidconv.link(nvosd) nvosd.link(conv_post) conv_post.link(encoder) encoder.link(rtppay) rtppay.link(udpsink) # add sources for i, uri in enumerate(initial_uris[:MAX_NUM_SOURCES]): src_bin = create_uridecode_bin(i, uri) g_source_bin_list[i] = src_bin pipeline.add(src_bin) r = src_bin.set_state(Gst.State.PLAYING) if r in (Gst.StateChangeReturn.SUCCESS, Gst.StateChangeReturn.ASYNC): g_source_enabled[i] = True g_num_sources += 1 log(f"Initial source {i} added -> {uri}") if g_num_sources > 0: rows = int(math.sqrt(g_num_sources)) cols = int(math.ceil(g_num_sources / rows)) tiler.set_property("rows", rows) tiler.set_property("columns", cols) log(f"Tiler rows={rows} columns={cols}") return udpsinkdef start_rtsp_server():
server = GstRtspServer.RTSPServer.new()
server.props.service = str(RTSP_PORT)
server.attach(None)
factory = GstRtspServer.RTSPMediaFactory.new()
caps = f"application/x-rtp, media=video, clock-rate=90000, encoding-name=(string){RTSP_ENCODING_NAME}, payload={RTSP_PAYLOAD}"
launch = f’( udpsrc name=pay0 port={RTP_PORT} buffer-size=524288 caps=“{caps}” )’
factory.set_launch(launch)
factory.set_shared(True)
server.get_mount_points().add_factory(RTSP_MOUNT, factory)
log(f"RTSP server ready at rtsp://:{RTSP_PORT}{RTSP_MOUNT}")
return serverdef main(argv):
global loop
if len(argv) < 2:
print(“Usage: python3 deepstream_rtsp_multisrc_runtime.py [uri2 …]”)
sys.exit(1)initial_uris = argv[1:] loop = GLib.MainLoop() build_pipeline(initial_uris) start_rtsp_server() bus = pipeline.get_bus() bus.add_signal_watch() bus.connect("message", bus_call, loop) pipeline.set_state(Gst.State.PLAYING) GLib.timeout_add_seconds(1, try_readd_offline_sources) try: loop.run() except KeyboardInterrupt: log("Interrupted by user") finally: pipeline.set_state(Gst.State.NULL) log("Exiting")if name == “main”:
main(sys.argv)