I am building a DeepStream Python app to handle multiple RTSP cameras dynamically (add/remove at runtime) and stream via RTSP.
Issues observed:
- Adding multiple cameras dynamically sometimes fails —
nvstreammuxpad linking issues. - Sources can get stuck in
CONNECTINGorOFFLINEindefinitely. - Removing a camera may cause pipeline stalls or EOS events.
- Re-adding a camera causes video breakup or freezes.
- Object tracking with
nvtrackerbehaves inconsistently when cameras are added/removed repeatedly.
Environment:
- DeepStream 7.x, Python 3.8+, Jetson/X86, GStreamer 1.18+
Code (dynamic multi-camera add/remove + RTSP streaming):
import sys
import gi
import math
import time
from gi.repository import Gst, GLib, GstRtspServergi.require_version(‘Gst’, ‘1.0’)
gi.require_version(‘GstRtspServer’, ‘1.0’)
Gst.init(None)loop = None
pipeline = None
streammux = None
tiler = NoneMAX_NUM_SOURCES = 4
MUXER_OUTPUT_WIDTH = 1280
MUXER_OUTPUT_HEIGHT = 720
TILED_OUTPUT_WIDTH = 1280
TILED_OUTPUT_HEIGHT = 720
MUXER_BATCH_TIMEOUT_USEC = 40000RTP_PORT = 5400
RTSP_PORT = 8554
RTSP_MOUNT = “/ds-test”
RTSP_ENCODING_NAME = “H264”
RTSP_PAYLOAD = 96g_source_bin_list = [None]*MAX_NUM_SOURCES
g_source_uri_list = [None]*MAX_NUM_SOURCES
g_source_enabled = [False]*MAX_NUM_SOURCES
g_source_status = [“OFFLINE”]*MAX_NUM_SOURCES
g_source_connect_start_time = [0.0]*MAX_NUM_SOURCES
g_source_cleanup_in_progress = [False]*MAX_NUM_SOURCES
g_num_sources = 0CONNECT_TIMEOUT_SEC = 30
RECONNECT_INTERVAL_SEC = 20def log(msg):
print(f"[{time.strftime(‘%H:%M:%S’)}] {msg}", flush=True)def update_tiler_layout():
global tiler, g_num_sources
if tiler and g_num_sources > 0:
rows = int(math.sqrt(g_num_sources))
cols = int(math.ceil(g_num_sources / rows))
tiler.set_property(“rows”, rows)
tiler.set_property(“columns”, cols)
log(f"Tiler updated to {rows}x{cols} for {g_num_sources} active sources")
elif tiler:
tiler.set_property(“rows”, 1)
tiler.set_property(“columns”, 1)def cb_newpad(decodebin, pad, index):
global streammux
caps = pad.get_current_caps()
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()
if “video” in gstname:
pad_name = f"sink_{index}"
sinkpad = streammux.get_request_pad(pad_name)
if sinkpad:
pad.link(sinkpad)
log(f"Source {index} pad-linked successfully")
g_source_status[index] = “LIVE”
g_source_connect_start_time[index] = 0.0def create_uridecode_bin(index, uri):
bin_name = f"source-bin-{index:02d}"
uridec = Gst.ElementFactory.make(“uridecodebin”, bin_name)
uridec.set_property(“uri”, uri)def child_added_cb(child_proxy, obj, name, user_data): if "rtspsrc" in name: try: if obj.find_property("latency") is not None: obj.set_property("latency", 800) # increased buffer if obj.find_property("protocols") is not None: obj.set_property("protocols", 4) # force TCP if obj.find_property("drop-on-latency") is not None: obj.set_property("drop-on-latency", True) if obj.find_property("do-retransmission") is not None: obj.set_property("do-retransmission", False) except Exception as e: log(f"Warning: Could not set rtspsrc props: {e}") uridec.connect("pad-added", cb_newpad, index) uridec.connect("child-added", child_added_cb, index) g_source_uri_list[index] = uri return uridecdef stop_release_source(source_id):
global pipeline, streammux, g_source_bin_list, g_source_enabled, g_num_sources
was_active = g_source_enabled[source_id]
src_bin = g_source_bin_list[source_id]
if src_bin:
src_bin.set_state(Gst.State.NULL)
try:
pad_name = f"sink_{source_id}"
sinkpad = streammux.get_static_pad(pad_name)
if sinkpad:
sinkpad.send_event(Gst.Event.new_flush_stop(False))
streammux.release_request_pad(sinkpad)
pipeline.remove(src_bin)
except Exception: pass
g_source_bin_list[source_id] = None
g_source_enabled[source_id] = False
g_source_connect_start_time[source_id] = 0.0
g_source_status[source_id] = “OFFLINE”
g_source_cleanup_in_progress[source_id] = False
if was_active and g_num_sources > 0:
g_num_sources -= 1
update_tiler_layout()
return GLib.SOURCE_REMOVEdef handle_source_status(user_data=None):
global pipeline, g_source_uri_list, g_source_bin_list, g_source_enabled, g_num_sources
now = time.time()
for sid in range(MAX_NUM_SOURCES):
uri = g_source_uri_list[sid]
if not uri:
continue
# Cleanup if stuck
if g_source_enabled[sid] and g_source_status[sid].startswith(“CONNECTING”):
if g_source_connect_start_time[sid] > 0.0 and (now - g_source_connect_start_time[sid]) > CONNECT_TIMEOUT_SEC:
log(f"*** WATCHDOG ALERT: Source {sid} stuck. Cleaning up.“)
g_source_cleanup_in_progress[sid] = True
GLib.idle_add(stop_release_source, sid)
continue
# Re-add offline sources
if not g_source_enabled[sid] and g_source_bin_list[sid] is None:
last_attempt = g_source_connect_start_time[sid]
if last_attempt == 0.0 or (now - last_attempt) >= RECONNECT_INTERVAL_SEC:
log(f"Re-adding source {sid} → {uri}”)
g_source_status[sid] = “RETRYING”
g_source_connect_start_time[sid] = now
try:
src_bin = create_uridecode_bin(sid, uri)
g_source_bin_list[sid] = src_bin
pipeline.add(src_bin)
r = src_bin.set_state(Gst.State.PLAYING)
if r in [Gst.StateChangeReturn.SUCCESS, Gst.StateChangeReturn.ASYNC, Gst.StateChangeReturn.NO_PREROLL]:
g_source_enabled[sid] = True
g_num_sources += 1
g_source_status[sid] = “CONNECTING”
update_tiler_layout()
else:
log(f"Failed to start source {sid} immediately, will retry.“)
src_bin.set_state(Gst.State.NULL)
pipeline.remove(src_bin)
g_source_bin_list[sid] = None
g_source_status[sid] = “OFFLINE”
except Exception as e:
log(f"Exception re-adding source {sid}: {e}”)
g_source_status[sid] = “OFFLINE”
g_source_bin_list[sid] = None
return Truedef bus_call(bus, message, loop):
t = message.type
src_name = message.src.get_name() if message.src else “unknown”
if src_name.startswith(“source-bin-”):
sid = int(src_name.split(“-”)[-1])
if t == Gst.MessageType.EOS or t == Gst.MessageType.ERROR:
g_source_cleanup_in_progress[sid] = True
GLib.idle_add(stop_release_source, sid)
return True
return Truedef print_all_statuses(user_data=None):
log(“-”*30)
log(f"CAMERA STATUS REPORT (Active Sources: {g_num_sources})“)
now = time.time()
for i in range(MAX_NUM_SOURCES):
status = g_source_status[i]
if g_source_cleanup_in_progress[i]:
status = “CLEANING UP”
elapsed = “”
if status.startswith((“CONNECTING”, “RETRYING”)) and g_source_connect_start_time[i] > 0.0:
elapsed = f”({int(now - g_source_connect_start_time[i])}s)"
uri_short = g_source_uri_list[i].split(‘@’)[-1] if g_source_uri_list[i] else “N/A”
print(f" Source {i:02d}: {status} {elapsed:<6} | URI: {uri_short}“)
log(”-"*30)
return Truedef build_pipeline(initial_uris):
global pipeline, streammux, tiler, g_num_sources
pipeline = Gst.Pipeline.new(“dst-pipeline”)
streammux = Gst.ElementFactory.make(“nvstreammux”, “streammux”)
streammux.set_property(“width”, MUXER_OUTPUT_WIDTH)
streammux.set_property(“height”, MUXER_OUTPUT_HEIGHT)
streammux.set_property(“batch-size”, MAX_NUM_SOURCES)
streammux.set_property(“batched-push-timeout”, MUXER_BATCH_TIMEOUT_USEC)
streammux.set_property(“live-source”, True)
pipeline.add(streammux)tiler = Gst.ElementFactory.make("nvmultistreamtiler", "tiler") tiler.set_property("width", TILED_OUTPUT_WIDTH) tiler.set_property("height", TILED_OUTPUT_HEIGHT) pipeline.add(tiler) nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "nvvidconv") nvosd = Gst.ElementFactory.make("nvdsosd", "nvosd") pipeline.add(nvvidconv) pipeline.add(nvosd) conv_post = Gst.ElementFactory.make("nvvideoconvert", "conv_postosd") encoder = Gst.ElementFactory.make("nvv4l2h264enc", "h264-enc") encoder.set_property("bitrate", 2000000) rtppay = Gst.ElementFactory.make("rtph264pay", "rtppay") rtppay.set_property("pt", RTSP_PAYLOAD) udpsink = Gst.ElementFactory.make("udpsink", "udpsink") udpsink.set_property("host", "0.0.0.0") # serve to LAN udpsink.set_property("port", RTP_PORT) udpsink.set_property("sync", False) udpsink.set_property("async", False) pipeline.add(conv_post) pipeline.add(encoder) pipeline.add(rtppay) pipeline.add(udpsink) streammux.link(tiler) tiler.link(nvvidconv) nvvidconv.link(nvosd) nvosd.link(conv_post) conv_post.link(encoder) encoder.link(rtppay) rtppay.link(udpsink) for i, uri in enumerate(initial_uris[:MAX_NUM_SOURCES]): src_bin = create_uridecode_bin(i, uri) g_source_bin_list[i] = src_bin pipeline.add(src_bin) r = src_bin.set_state(Gst.State.PLAYING) now = time.time() if r in [Gst.StateChangeReturn.SUCCESS, Gst.StateChangeReturn.ASYNC, Gst.StateChangeReturn.NO_PREROLL]: g_source_enabled[i] = True g_num_sources += 1 g_source_status[i] = "CONNECTING" g_source_connect_start_time[i] = now else: g_source_status[i] = "OFFLINE" pipeline.remove(src_bin) g_source_bin_list[i] = None update_tiler_layout() return udpsinkdef start_rtsp_server():
server = GstRtspServer.RTSPServer.new()
server.props.service = str(RTSP_PORT)
server.attach(None)
factory = GstRtspServer.RTSPMediaFactory.new()
caps = f"application/x-rtp, media=video, clock-rate=90000, encoding-name=(string){RTSP_ENCODING_NAME}, payload={RTSP_PAYLOAD}"
launch = f’( udpsrc name=pay0 port={RTP_PORT} buffer-size=524288 caps=“{caps}” )’
factory.set_launch(launch)
factory.set_shared(True)
server.get_mount_points().add_factory(RTSP_MOUNT, factory)
log(f"RTSP server ready at rtsp://:{RTSP_PORT}{RTSP_MOUNT}")
return serverdef main(argv):
global loop
if len(argv)<2:
print(“Usage: python3 app.py [uri2 …]”)
sys.exit(1)initial_uris = argv[1:] loop = GLib.MainLoop() build_pipeline(initial_uris) start_rtsp_server() bus = pipeline.get_bus() bus.add_signal_watch() bus.connect("message", bus_call, loop) GLib.timeout_add_seconds(2, handle_source_status) GLib.timeout_add_seconds(5, print_all_statuses) pipeline.set_state(Gst.State.PLAYING) log("Pipeline started. Waiting for streams...") try: loop.run() except KeyboardInterrupt: log("Interrupted by user. Exiting...") finally: pipeline.set_state(Gst.State.NULL) log("Pipeline stopped.")if name==“main”:
main(sys.argv)
Request:
Looking for guidance on best practices to dynamically add/remove multiple RTSP cameras reliably and avoid freezes/stuck sources in DeepStream Python.