Issues with Dynamic Multi-Camera RTSP Source Add/Remove in DeepStream Python

I am building a DeepStream Python app to handle multiple RTSP cameras dynamically (add/remove at runtime) and stream via RTSP.

Issues observed:

  1. Adding multiple cameras dynamically sometimes fails — nvstreammux pad linking issues.
  2. Sources can get stuck in CONNECTING or OFFLINE indefinitely.
  3. Removing a camera may cause pipeline stalls or EOS events.
  4. Re-adding a camera causes video breakup or freezes.
  5. Object tracking with nvtracker behaves inconsistently when cameras are added/removed repeatedly.

Environment:

  • DeepStream 7.x, Python 3.8+, Jetson/X86, GStreamer 1.18+

Code (dynamic multi-camera add/remove + RTSP streaming):

import sys
import gi
import math
import time
from gi.repository import Gst, GLib, GstRtspServer

gi.require_version(‘Gst’, ‘1.0’)
gi.require_version(‘GstRtspServer’, ‘1.0’)
Gst.init(None)

loop = None
pipeline = None
streammux = None
tiler = None

MAX_NUM_SOURCES = 4
MUXER_OUTPUT_WIDTH = 1280
MUXER_OUTPUT_HEIGHT = 720
TILED_OUTPUT_WIDTH = 1280
TILED_OUTPUT_HEIGHT = 720
MUXER_BATCH_TIMEOUT_USEC = 40000

RTP_PORT = 5400
RTSP_PORT = 8554
RTSP_MOUNT = “/ds-test”
RTSP_ENCODING_NAME = “H264”
RTSP_PAYLOAD = 96

g_source_bin_list = [None]*MAX_NUM_SOURCES
g_source_uri_list = [None]*MAX_NUM_SOURCES
g_source_enabled = [False]*MAX_NUM_SOURCES
g_source_status = [“OFFLINE”]*MAX_NUM_SOURCES
g_source_connect_start_time = [0.0]*MAX_NUM_SOURCES
g_source_cleanup_in_progress = [False]*MAX_NUM_SOURCES
g_num_sources = 0

CONNECT_TIMEOUT_SEC = 30
RECONNECT_INTERVAL_SEC = 20

def log(msg):
print(f"[{time.strftime(‘%H:%M:%S’)}] {msg}", flush=True)

def update_tiler_layout():
global tiler, g_num_sources
if tiler and g_num_sources > 0:
rows = int(math.sqrt(g_num_sources))
cols = int(math.ceil(g_num_sources / rows))
tiler.set_property(“rows”, rows)
tiler.set_property(“columns”, cols)
log(f"Tiler updated to {rows}x{cols} for {g_num_sources} active sources")
elif tiler:
tiler.set_property(“rows”, 1)
tiler.set_property(“columns”, 1)

def cb_newpad(decodebin, pad, index):
global streammux
caps = pad.get_current_caps()
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()
if “video” in gstname:
pad_name = f"sink_{index}"
sinkpad = streammux.get_request_pad(pad_name)
if sinkpad:
pad.link(sinkpad)
log(f"Source {index} pad-linked successfully")
g_source_status[index] = “LIVE”
g_source_connect_start_time[index] = 0.0

def create_uridecode_bin(index, uri):
bin_name = f"source-bin-{index:02d}"
uridec = Gst.ElementFactory.make(“uridecodebin”, bin_name)
uridec.set_property(“uri”, uri)

def child_added_cb(child_proxy, obj, name, user_data):
    if "rtspsrc" in name:
        try:
            if obj.find_property("latency") is not None:
                obj.set_property("latency", 800)  # increased buffer
            if obj.find_property("protocols") is not None:
                obj.set_property("protocols", 4)  # force TCP
            if obj.find_property("drop-on-latency") is not None:
                obj.set_property("drop-on-latency", True)
            if obj.find_property("do-retransmission") is not None:
                obj.set_property("do-retransmission", False)
        except Exception as e:
            log(f"Warning: Could not set rtspsrc props: {e}")

uridec.connect("pad-added", cb_newpad, index)
uridec.connect("child-added", child_added_cb, index)
g_source_uri_list[index] = uri
return uridec

def stop_release_source(source_id):
global pipeline, streammux, g_source_bin_list, g_source_enabled, g_num_sources
was_active = g_source_enabled[source_id]
src_bin = g_source_bin_list[source_id]
if src_bin:
src_bin.set_state(Gst.State.NULL)
try:
pad_name = f"sink_{source_id}"
sinkpad = streammux.get_static_pad(pad_name)
if sinkpad:
sinkpad.send_event(Gst.Event.new_flush_stop(False))
streammux.release_request_pad(sinkpad)
pipeline.remove(src_bin)
except Exception: pass
g_source_bin_list[source_id] = None
g_source_enabled[source_id] = False
g_source_connect_start_time[source_id] = 0.0
g_source_status[source_id] = “OFFLINE”
g_source_cleanup_in_progress[source_id] = False
if was_active and g_num_sources > 0:
g_num_sources -= 1
update_tiler_layout()
return GLib.SOURCE_REMOVE

def handle_source_status(user_data=None):
global pipeline, g_source_uri_list, g_source_bin_list, g_source_enabled, g_num_sources
now = time.time()
for sid in range(MAX_NUM_SOURCES):
uri = g_source_uri_list[sid]
if not uri:
continue
# Cleanup if stuck
if g_source_enabled[sid] and g_source_status[sid].startswith(“CONNECTING”):
if g_source_connect_start_time[sid] > 0.0 and (now - g_source_connect_start_time[sid]) > CONNECT_TIMEOUT_SEC:
log(f"*** WATCHDOG ALERT: Source {sid} stuck. Cleaning up.“)
g_source_cleanup_in_progress[sid] = True
GLib.idle_add(stop_release_source, sid)
continue
# Re-add offline sources
if not g_source_enabled[sid] and g_source_bin_list[sid] is None:
last_attempt = g_source_connect_start_time[sid]
if last_attempt == 0.0 or (now - last_attempt) >= RECONNECT_INTERVAL_SEC:
log(f"Re-adding source {sid} → {uri}”)
g_source_status[sid] = “RETRYING”
g_source_connect_start_time[sid] = now
try:
src_bin = create_uridecode_bin(sid, uri)
g_source_bin_list[sid] = src_bin
pipeline.add(src_bin)
r = src_bin.set_state(Gst.State.PLAYING)
if r in [Gst.StateChangeReturn.SUCCESS, Gst.StateChangeReturn.ASYNC, Gst.StateChangeReturn.NO_PREROLL]:
g_source_enabled[sid] = True
g_num_sources += 1
g_source_status[sid] = “CONNECTING”
update_tiler_layout()
else:
log(f"Failed to start source {sid} immediately, will retry.“)
src_bin.set_state(Gst.State.NULL)
pipeline.remove(src_bin)
g_source_bin_list[sid] = None
g_source_status[sid] = “OFFLINE”
except Exception as e:
log(f"Exception re-adding source {sid}: {e}”)
g_source_status[sid] = “OFFLINE”
g_source_bin_list[sid] = None
return True

def bus_call(bus, message, loop):
t = message.type
src_name = message.src.get_name() if message.src else “unknown”
if src_name.startswith(“source-bin-”):
sid = int(src_name.split(“-”)[-1])
if t == Gst.MessageType.EOS or t == Gst.MessageType.ERROR:
g_source_cleanup_in_progress[sid] = True
GLib.idle_add(stop_release_source, sid)
return True
return True

def print_all_statuses(user_data=None):
log(“-”*30)
log(f"CAMERA STATUS REPORT (Active Sources: {g_num_sources})“)
now = time.time()
for i in range(MAX_NUM_SOURCES):
status = g_source_status[i]
if g_source_cleanup_in_progress[i]:
status = “CLEANING UP”
elapsed = “”
if status.startswith((“CONNECTING”, “RETRYING”)) and g_source_connect_start_time[i] > 0.0:
elapsed = f”({int(now - g_source_connect_start_time[i])}s)"
uri_short = g_source_uri_list[i].split(‘@’)[-1] if g_source_uri_list[i] else “N/A”
print(f" Source {i:02d}: {status} {elapsed:<6} | URI: {uri_short}“)
log(”-"*30)
return True

def build_pipeline(initial_uris):
global pipeline, streammux, tiler, g_num_sources
pipeline = Gst.Pipeline.new(“dst-pipeline”)
streammux = Gst.ElementFactory.make(“nvstreammux”, “streammux”)
streammux.set_property(“width”, MUXER_OUTPUT_WIDTH)
streammux.set_property(“height”, MUXER_OUTPUT_HEIGHT)
streammux.set_property(“batch-size”, MAX_NUM_SOURCES)
streammux.set_property(“batched-push-timeout”, MUXER_BATCH_TIMEOUT_USEC)
streammux.set_property(“live-source”, True)
pipeline.add(streammux)

tiler = Gst.ElementFactory.make("nvmultistreamtiler", "tiler")
tiler.set_property("width", TILED_OUTPUT_WIDTH)
tiler.set_property("height", TILED_OUTPUT_HEIGHT)
pipeline.add(tiler)

nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "nvvidconv")
nvosd = Gst.ElementFactory.make("nvdsosd", "nvosd")
pipeline.add(nvvidconv)
pipeline.add(nvosd)

conv_post = Gst.ElementFactory.make("nvvideoconvert", "conv_postosd")
encoder = Gst.ElementFactory.make("nvv4l2h264enc", "h264-enc")
encoder.set_property("bitrate", 2000000)
rtppay = Gst.ElementFactory.make("rtph264pay", "rtppay")
rtppay.set_property("pt", RTSP_PAYLOAD)
udpsink = Gst.ElementFactory.make("udpsink", "udpsink")
udpsink.set_property("host", "0.0.0.0")  # serve to LAN
udpsink.set_property("port", RTP_PORT)
udpsink.set_property("sync", False)
udpsink.set_property("async", False)
pipeline.add(conv_post)
pipeline.add(encoder)
pipeline.add(rtppay)
pipeline.add(udpsink)

streammux.link(tiler)
tiler.link(nvvidconv)
nvvidconv.link(nvosd)
nvosd.link(conv_post)
conv_post.link(encoder)
encoder.link(rtppay)
rtppay.link(udpsink)

for i, uri in enumerate(initial_uris[:MAX_NUM_SOURCES]):
    src_bin = create_uridecode_bin(i, uri)
    g_source_bin_list[i] = src_bin
    pipeline.add(src_bin)
    r = src_bin.set_state(Gst.State.PLAYING)
    now = time.time()
    if r in [Gst.StateChangeReturn.SUCCESS, Gst.StateChangeReturn.ASYNC, Gst.StateChangeReturn.NO_PREROLL]:
        g_source_enabled[i] = True
        g_num_sources += 1
        g_source_status[i] = "CONNECTING"
        g_source_connect_start_time[i] = now
    else:
        g_source_status[i] = "OFFLINE"
        pipeline.remove(src_bin)
        g_source_bin_list[i] = None

update_tiler_layout()
return udpsink

def start_rtsp_server():
server = GstRtspServer.RTSPServer.new()
server.props.service = str(RTSP_PORT)
server.attach(None)
factory = GstRtspServer.RTSPMediaFactory.new()
caps = f"application/x-rtp, media=video, clock-rate=90000, encoding-name=(string){RTSP_ENCODING_NAME}, payload={RTSP_PAYLOAD}"
launch = f’( udpsrc name=pay0 port={RTP_PORT} buffer-size=524288 caps=“{caps}” )’
factory.set_launch(launch)
factory.set_shared(True)
server.get_mount_points().add_factory(RTSP_MOUNT, factory)
log(f"RTSP server ready at rtsp://:{RTSP_PORT}{RTSP_MOUNT}")
return server

def main(argv):
global loop
if len(argv)<2:
print(“Usage: python3 app.py [uri2 …]”)
sys.exit(1)

initial_uris = argv[1:]
loop = GLib.MainLoop()
build_pipeline(initial_uris)
start_rtsp_server()

bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)

GLib.timeout_add_seconds(2, handle_source_status)
GLib.timeout_add_seconds(5, print_all_statuses)

pipeline.set_state(Gst.State.PLAYING)
log("Pipeline started. Waiting for streams...")

try:
    loop.run()
except KeyboardInterrupt:
    log("Interrupted by user. Exiting...")
finally:
    pipeline.set_state(Gst.State.NULL)
    log("Pipeline stopped.")

if name==“main”:
main(sys.argv)

Request:
Looking for guidance on best practices to dynamically add/remove multiple RTSP cameras reliably and avoid freezes/stuck sources in DeepStream Python.

There are two ways achieve this target.
1.Refer to this sample for add and delete sources at runtime

2.Using nvmultilurisrcbin, add/remove source via restful api. this plugin is more recommended.

There is no update from you for a period, assuming this is not an issue anymore. Hence we are closing this topic. If need further support, please open a new one. Thanks.