RTSP sources do not automatically reconnect in DeepStream Python pipeline in multiple camera

I am trying to build a DeepStream 7.0 Python application with multiple RTSP cameras using nvstreammux and a tiler. I want the application to automatically reconnect cameras if they go offline and then come back online.

I have implemented a try_readd_offline_sources() function to periodically check for offline sources and re-add them. However, it is not reliably reconnecting cameras when they are stopped and restarted.

Here’s a simplified version of my code for the pipeline and reconnect logic:

import sys
import gi
import math
from gi.repository import Gst, GLib, GstRtspServer

Gst.init(None)
loop = None
pipeline = None
streammux = None

MAX_NUM_SOURCES = 4
MUXER_OUTPUT_WIDTH = 1280
MUXER_OUTPUT_HEIGHT = 720
TILED_OUTPUT_WIDTH = 1280
TILED_OUTPUT_HEIGHT = 720
MUXER_BATCH_TIMEOUT_USEC = 40000

RTP_PORT = 5400
RTSP_PORT = 8554
RTSP_MOUNT = “/ds-test”
RTSP_ENCODING_NAME = “H264”
RTSP_PAYLOAD = 96

g_source_bin_list = [None] * MAX_NUM_SOURCES
g_source_uri_list = [None] * MAX_NUM_SOURCES
g_source_name_list = [None] * MAX_NUM_SOURCES
g_source_enabled = [False] * MAX_NUM_SOURCES
g_num_sources = 0

def log(msg):
print(msg, flush=True)

def cb_newpad(decodebin, pad, data):
global streammux
source_id = int(data)
caps = pad.get_current_caps()
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()
if “video” in gstname:
pad_name = f"sink_{source_id}"
sinkpad = streammux.get_request_pad(pad_name)
if sinkpad:
pad.link(sinkpad)
uri = g_source_uri_list[source_id] if g_source_uri_list[source_id] else “unknown”
log(f"Decodebin linked: source_id={source_id} uri={uri} → {pad_name}")

def decodebin_child_added(child_proxy, obj, name, user_data):
try:
if “rtspsrc” in name:
if obj.find_property(‘latency’) is not None:
obj.set_property(“latency”, 200)
if “source” in name:
if obj.find_property(‘drop-on-latency’) is not None:
obj.set_property(“drop-on-latency”, True)
except Exception:
pass
def create_uridecode_bin(index, uri):
bin_name = f"source-bin-{index:02d}"
uridec = Gst.ElementFactory.make(“uridecodebin”, bin_name)
uridec.set_property(“uri”, uri)

# Safe child-added callback
def child_added_cb(child_proxy, obj, name, user_data):
    if "rtspsrc" in name:
        try:
            if obj.find_property("latency") is not None:
                obj.set_property("latency", 400)  # increase buffer
            if obj.find_property("protocols") is not None:
                obj.set_property("protocols", 4)  # TCP transport
        except Exception as e:
            log(f"Warning: Could not set rtspsrc props: {e}")

uridec.connect("pad-added", cb_newpad, index)
uridec.connect("child-added", child_added_cb, index)

g_source_bin_list[index] = uridec
g_source_uri_list[index] = uri
g_source_enabled[index] = True
return uridec

def stop_release_source(source_id):
global pipeline, streammux, g_source_bin_list, g_source_enabled, g_num_sources
src_bin = g_source_bin_list[source_id]
if src_bin:
src_bin.set_state(Gst.State.NULL)
try:
pipeline.remove(src_bin)
except Exception:
pass
pad_name = f"sink_{source_id}"
try:
sinkpad = streammux.get_static_pad(pad_name)
if sinkpad:
sinkpad.send_event(Gst.Event.new_flush_stop(False))
streammux.release_request_pad(sinkpad)
except Exception:
pass
g_source_bin_list[source_id] = None
g_source_enabled[source_id] = False
if g_num_sources > 0:
g_num_sources -= 1

def try_readd_offline_sources(user_data=None):
global pipeline, g_source_uri_list, g_source_bin_list, g_source_enabled, g_num_sources, tiler
for sid in range(MAX_NUM_SOURCES):
uri = g_source_uri_list[sid]
if uri and not g_source_enabled[sid]:
log(f"Attempting to re-add source {sid} → {uri}“)
stop_release_source(sid)
src_bin = create_uridecode_bin(sid, uri)
pipeline.add(src_bin)
r = src_bin.set_state(Gst.State.PLAYING)
if r in (Gst.StateChangeReturn.SUCCESS, Gst.StateChangeReturn.ASYNC, Gst.StateChangeReturn.NO_PREROLL):
g_source_enabled[sid] = True
g_num_sources += 1
log(f"Source {sid} re-added successfully”)
# update tiler layout
rows = int(math.sqrt(g_num_sources))
cols = int(math.ceil(g_num_sources / rows))
tiler.set_property(“rows”, rows)
tiler.set_property(“columns”, cols)
log(f"Tiler updated {rows}x{cols}“)
else:
pipeline.remove(src_bin)
log(f"Failed to re-add source {sid}”)
return True

def bus_call(bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
log(“End-of-stream”)
loop.quit()
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
log(f"ERROR from {message.src.get_name()}: {err.message} debug={debug}")
return True

def build_pipeline(initial_uris):
global pipeline, streammux, g_num_sources

pipeline = Gst.Pipeline.new("dst-pipeline")

# nvstreammux
streammux = Gst.ElementFactory.make("nvstreammux", "streammux")
streammux.set_property("width", MUXER_OUTPUT_WIDTH)
streammux.set_property("height", MUXER_OUTPUT_HEIGHT)
streammux.set_property("batch-size", MAX_NUM_SOURCES)
streammux.set_property("batched-push-timeout", MUXER_BATCH_TIMEOUT_USEC)
streammux.set_property("live-source", True)
pipeline.add(streammux)

# tiler
tiler = Gst.ElementFactory.make("nvmultistreamtiler", "tiler")
pipeline.add(tiler)
# nvvidconv + nvosd
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "nvvidconv")
nvosd = Gst.ElementFactory.make("nvdsosd", "nvosd")
pipeline.add(nvvidconv)
pipeline.add(nvosd)

# encoder -> rtppay -> udpsink
conv_post = Gst.ElementFactory.make("nvvideoconvert", "conv_postosd")
encoder = Gst.ElementFactory.make("nvv4l2h264enc", "h264-enc")
encoder.set_property("bitrate", 2000000)
rtppay = Gst.ElementFactory.make("rtph264pay", "rtppay")
rtppay.set_property("pt", RTSP_PAYLOAD)
udpsink = Gst.ElementFactory.make("udpsink", "udpsink")
udpsink.set_property("host", "127.0.0.1")
udpsink.set_property("port", RTP_PORT)
udpsink.set_property("sync", False)
pipeline.add(conv_post)
pipeline.add(encoder)
pipeline.add(rtppay)
pipeline.add(udpsink)

# link elements
streammux.link(tiler)
tiler.link(nvvidconv)
nvvidconv.link(nvosd)
nvosd.link(conv_post)
conv_post.link(encoder)
encoder.link(rtppay)
rtppay.link(udpsink)

# add sources
for i, uri in enumerate(initial_uris[:MAX_NUM_SOURCES]):
    src_bin = create_uridecode_bin(i, uri)
    g_source_bin_list[i] = src_bin
    pipeline.add(src_bin)
    r = src_bin.set_state(Gst.State.PLAYING)
    if r in (Gst.StateChangeReturn.SUCCESS, Gst.StateChangeReturn.ASYNC):
        g_source_enabled[i] = True
        g_num_sources += 1
        log(f"Initial source {i} added -> {uri}")

if g_num_sources > 0:
    rows = int(math.sqrt(g_num_sources))
    cols = int(math.ceil(g_num_sources / rows))
    tiler.set_property("rows", rows)
    tiler.set_property("columns", cols)
    log(f"Tiler rows={rows} columns={cols}")

return udpsink

def start_rtsp_server():
server = GstRtspServer.RTSPServer.new()
server.props.service = str(RTSP_PORT)
server.attach(None)
factory = GstRtspServer.RTSPMediaFactory.new()
caps = f"application/x-rtp, media=video, clock-rate=90000, encoding-name=(string){RTSP_ENCODING_NAME}, payload={RTSP_PAYLOAD}"
launch = f’( udpsrc name=pay0 port={RTP_PORT} buffer-size=524288 caps=“{caps}” )’
factory.set_launch(launch)
factory.set_shared(True)
server.get_mount_points().add_factory(RTSP_MOUNT, factory)
log(f"RTSP server ready at rtsp://:{RTSP_PORT}{RTSP_MOUNT}")
return server

def main(argv):
global loop
if len(argv) < 2:
print(“Usage: python3 deepstream_rtsp_multisrc_runtime.py [uri2 …]”)
sys.exit(1)

initial_uris = argv[1:]
loop = GLib.MainLoop()
build_pipeline(initial_uris)
start_rtsp_server()

bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)

pipeline.set_state(Gst.State.PLAYING)
GLib.timeout_add_seconds(1, try_readd_offline_sources)
try:
    loop.run()
except KeyboardInterrupt:
    log("Interrupted by user")
finally:
    pipeline.set_state(Gst.State.NULL)
    log("Exiting")

if name == “main”:
main(sys.argv)

In fact, I recommend nvurisrcbin/nvmultiurisrcbin.

Configure the following properties to reconnect when the link is lost.

  rtsp-reconnect-attempts: Set rtsp reconnect attempt value
                        flags: readable, writable, changeable only in NULL or READY state
                        Integer. Range: -2147483648 - 2147483647 Default: -1
  rtsp-reconnect-interval: Timeout in seconds to wait since last data was received from an RTSP source before forcing a reconnection. 0=disable timeout
                        flags: readable, writable, changeable only in NULL or READY state

There is no update from you for a period, assuming this is not an issue anymore. Hence we are closing this topic. If need further support, please open a new one. Thanks.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.