Issue with RTSP Camera Reconnection: Pipeline Breaks When Camera Comes Back Instantly

Hello NVIDIA Devs,

System Information:

  • DeepStream Version: 8.0.0
  • CUDA Driver Version: 13.0
  • CUDA Runtime Version: 12.9
  • TensorRT Version: 10.9
  • cuDNN Version: 9.8
  • libNVWarp360 Version: 2.0.1d3
  • Operating System: Ubuntu 24.04.1 LTS
  • Distributor ID: Ubuntu
  • Release: 24.04
  • Codename: noble

I am working on a DeepStream project that involves dynamic runtime source addition and deletion for RTSP cameras. The system works well when a camera goes offline and comes back after 20-30 seconds, automatically reconnecting and resuming without any issues.

Problem:

The issue arises when a camera goes offline briefly (just for a few seconds) and comes back instantly. In this case, my pipeline breaks, and I am unable to reconnect successfully.

How can I resolve this problem and ensure that the pipeline remains stable even when a camera disconnects briefly?

What steps or strategies can be used to detect a brief disconnection and safely reconnect the camera without causing the pipeline to break or fail?

Current Approach:

I am using Python + GStreamer + DeepStream, and I have implemented a singleton class to manage runtime sources. My pipeline dynamically adds or removes sources based on camera availability, and I attempt reconnection on EOS or error messages.

Here is the relevant code snippet:


import gi
gi.require_version("Gst", "1.0")
gi.require_version("GstRtspServer", "1.0")
from gi.repository import GObject, Gst, GstRtspServer, GLib
from surveillanceManager.sm_rule_base import *
import time
import cv2

class SMRunTimeSourceAdditionMain(SMRuleBase):
    __instance = None

    @staticmethod
    def getInstance():
        if SMRunTimeSourceAdditionMain.__instance is None:
            SMRunTimeSourceAdditionMain()
        return SMRunTimeSourceAdditionMain.__instance

    def __init__(self):
        if SMRunTimeSourceAdditionMain.__instance is not None:
            raise Exception("This class is a singleton!")
        SMRunTimeSourceAdditionMain.__instance = self

        self.pipeline = None
        self.streammux = None
        self.g_source_bin_list = []
        self.g_source_uri_list = []

    def uridecode_bus_call(self, bus, message, loop):
        t = message.type
        if t == Gst.MessageType.EOS:
            sys.stdout.write("End-of-stream\n")
            loop.quit()
        elif t == Gst.MessageType.WARNING:
            err, debug = message.parse_warning()
            sys.stderr.write("Warning: %s: %s\n" % (err, debug))
        elif t == Gst.MessageType.ERROR:
            struct = message.get_structure()
            err, debug = message.parse_error()
            sys.stderr.write("Error: %s: %s\n" % (err, debug))
            loop.quit()
        elif t == Gst.MessageType.ELEMENT:
            struct = message.get_structure()
            if struct is not None and struct.has_name("stream-eos"):
                parsed, stream_id = struct.get_uint("stream-id")
                if parsed:
                    print("Got EOS from stream %d" % stream_id)
                    self.stop_release_source(stream_id)
        return True

    def cb_newpad_uridecode(self, decodebin, pad, data):
        caps = pad.get_current_caps()
        gststruct = caps.get_structure(0)
        gstname = gststruct.get_name()
        if "video" in gstname:
            source_id = data
            pad_name = "sink_%u" % source_id
            sinkpad = self.streammux.get_request_pad(pad_name)
            if pad.link(sinkpad) != Gst.PadLinkReturn.OK:
                sys.stderr.write("Failed to link decodebin to pipeline\n")

    def uridecode_decodebin_child_added(self, child_proxy, Object, name, user_data):
        if "decodebin" in name:
            Object.connect("child-added", self.uridecode_decodebin_child_added, user_data)

    def create_uridecode_bin(self, index, filename):
        bin_name = "source-bin-%02d" % index
        bin = Gst.ElementFactory.make("uridecodebin", bin_name)
        if not bin:
            sys.stderr.write("Unable to create uri decode bin\n")
            return None
        bin.set_property("uri", filename)
        bin.connect("pad-added", self.cb_newpad_uridecode, index)
        bin.connect("child-added", self.uridecode_decodebin_child_added, index)
        return bin

    def stop_release_source(self, source_id):
        try:
            state_return = self.g_source_bin_list[source_id].set_state(Gst.State.NULL)
            pad_name = "sink_%u" % source_id
            sinkpad = self.streammux.get_static_pad(pad_name)
            sinkpad.send_event(Gst.Event.new_flush_stop(False))
            self.streammux.release_request_pad(sinkpad)
            self.pipeline.remove(self.g_source_bin_list[source_id])
        except Exception as e:
            print("Exception in stop_release_source:", e)

    def add_sources(self, uri):
        if uri not in self.g_source_uri_list:
            source_id = len(self.g_source_bin_list)
        else:
            source_id = self.g_source_uri_list.index(uri)

        try:
            self.stop_release_source(source_id)
        except Exception as ex1:
            print("Unable to remove source:", str(ex1))

        try:
            source_bin = self.create_uridecode_bin(source_id, uri)
            if not source_bin:
                sys.stderr.write("Failed to create source bin\n")
                exit(1)
            if uri not in self.g_source_uri_list:
                self.g_source_bin_list.append(source_bin)
                self.g_source_uri_list.append(uri)
            else:
                self.g_source_bin_list[source_id] = source_bin

            self.pipeline.add(source_bin)
            state_return = source_bin.set_state(Gst.State.PLAYING)
            return True
        except Exception as ex:
            print("Error in source add:", ex)
            return False

main pipline code

	def execute(self, allCameraObjectListOfList, instancSingle):
      """
      Main execute function for the pipeline. This function is called in a thread.
      It contains logic for pipeline plugin configuration, model loading, linking, 
      and callback function.
  
      :param allCameraObjectListOfList: List containing all camera group objects.
      :param instancSingle: Singleton instance of the communication manager class.
      """
  
      # Initialize camera status dictionary
      cameralist = {}
      for camdata in instancSingle.cameraObjectList:
          cameralist.update({camdata.url: camdata.status})
  
      # Get the list of camera URLs
      cameraUrlList = self.getCameraList(allCameraObjectListOfList)
      self.isfirstPipelineFinish[0] = True
  
      # Initialize communication instance and check for configuration validity
      self.communicateInstance = instancSingle
      self.checkEffective()
  
      # Initialize face models if needed
      if self.isBlacklistedFaceEnable or self.isVipPersonEntry:
          initAllFaceModel()
  
      # Send an initial heartbeat
      self.interval = 2
      self.n.notify("READY=1")
      SMCallWatch.getInstance().send_heartbeat(datetime.now(), self.communicateInstance, 1)
  
      # Check if the camera list is empty
      if len(cameraUrlList) < 1:
          sys.stderr.write("usage: %s <uri1> [uri2] ... [uriN]\n" % cameraUrlList[0])
          sys.exit(1)
  
      # Initialize FPS stream information
      for i in range(0, len(cameraUrlList)):
          self.fps_streams["stream{0}".format(i)] = GETFPS(i)
  
      # Number of sources (cameras)
      number_sources = len(cameraUrlList)
  
      # Initialize GStreamer
      GObject.threads_init()
      Gst.init(None)
  
      # Create a GStreamer pipeline
      pipeline = Gst.Pipeline()
      if not pipeline:
          sys.stderr.write("Unable to create pipeline\n")
  
      # Create a stream-muxer element to form batches from one or more sources
      streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
      if not streammux:
          sys.stderr.write("Unable to create NvStreamMux\n")
      pipeline.add(streammux)
  
      # Initialize source ID to camera ID mapping
      cam_id_to_src_id = {}
  
      # Loop through each camera source to add them to the pipeline
      for i in range(number_sources):
          self.cam_sid_mapping[1][i] = cameraUrlList[i][0]
          cam_id_to_src_id[cameraUrlList[i][0]] = i
          uri_name = cameraUrlList[i][1]
  
          # Skip if the camera is not enabled
          if not cameralist.get(uri_name):
              continue
  
          # Check if the camera stream is live
          is_live = uri_name.startswith("rtsp://")
  
          # Create and add source bin for each camera
          if not constant.IS_OUTPUT_VIDEO_ENABLE:
              source_bin = self.RuntimeSourceAddMain.create_uridecode_bin(i, uri_name)
              if not source_bin:
                  sys.stderr.write("Unable to create source bin\n")
              if uri_name.split(":")[0] != "file":
                  self.RuntimeSourceAddMain.g_source_bin_list.append(source_bin)
                  self.RuntimeSourceAddMain.g_source_uri_list.append(uri_name)
              pipeline.add(source_bin)
          else:
              source_bin = self.create_source_bin(i, uri_name)
              pipeline.add(source_bin)
  
              # Link the source bin to the streammux
              padname = "sink_%u" % i
              sinkpad = streammux.get_request_pad(padname)
              if not sinkpad:
                  sys.stderr.write("Unable to create sink pad bin\n")
              srcpad = source_bin.get_static_pad("src")
              if not srcpad:
                  sys.stderr.write("Unable to create source pad bin\n")
              srcpad.link(sinkpad)
  
      # Create an event loop and connect to the GStreamer bus
      loop = GObject.MainLoop()
      bus = pipeline.get_bus()
      bus.add_signal_watch()
  
      # Connect message handler to the bus
      if constant.IS_OUTPUT_VIDEO_ENABLE:
          bus.connect("message", self.bus_call, loop)
      else:
          bus.connect("message", self.RuntimeSourceAddMain.uridecode_bus_call, loop)
  
      # Set up RTSP output if video output is enabled
      if constant.IS_OUTPUT_VIDEO_ENABLE and not is_aarch64():
          rtsp_port_num = self.people_class_const["RTSP_OUT_PORT"]
          server = GstRtspServer.RTSPServer.new()
          server.props.service = "%d" % rtsp_port_num
          server.attach(None)
          factory = GstRtspServer.RTSPMediaFactory.new()
          factory.set_launch(
              '( udpsrc name=pay0 port=%d buffer-size=52428 caps="application/x-rtp,media=video, clock-rate=90000, encoding-name=(string)%s, payload=96 " )'
              % (updsink_port_num, codec)
          )
          factory.set_shared(True)
          server.get_mount_points().add_factory("/smarg_analytics1", factory)
  
          print(f"\n*** RTSP Streaming at rtsp://localhost:{rtsp_port_num}/smarg_analytics1 ***\n")
  
      # Set up the tiler sink pad for video processing
      tiler_sink_pad = tiler.get_static_pad("sink")
      if not tiler_sink_pad:
          sys.stderr.write("Unable to get source pad\n")
      else:
          tiler_sink_pad.add_probe(Gst.PadProbeType.BUFFER, self.mainPipelineCallBack.tiler_sink_pad_buffer_probe_1, 0)
  
      # Start processing the video feeds
      print("\nNow Running for Given Video")
      for i, source in enumerate(cameraUrlList):
          print(f"{i}: {source}")
  
      print("Start Video Analytics Processing...\n")
      pipeline.set_state(Gst.State.PLAYING)
  
      self.RuntimeSourceAddMain.pipeline = pipeline
  
      # Trigger model loading completion and cleanup
      self.isfirstPipelineFinish[2] = True
      self.triggerDeleteDecryptFile()
  
      # Run the event loop
      try:
          loop.run()
      except Exception as ex:
          print(f"Exception in pipeline run: {ex}")
          logger.error(f"Exception in pipeline run: {ex}")
  
      # Cleanup after loop finishes
      pipeline.set_state(Gst.State.NULL)
      self.isfirstPipelineFinish[1] = True
      self.triggerLogSegregation()
  
      print("Exiting app\n")




logs

Gst.MessageType.ELEMENT======> struct===========> stream-eos, stream-id=(uint)1; t=========> <flags GST_MESSAGE_ELEMENT of type Gst.MessageType>
Got EOS from stream 1
stop_release_source ---- 1
stop status  <enum GST_STATE_CHANGE_SUCCESS of type Gst.StateChangeReturn>
STATE CHANGE SUCCESS

gstnvtracker: Successfully removed stream from low level tracker with Id 100000000.
gstnvtracker: Successfully removed stream 1.
STATE CHANGE SUCCESS

Gst.MessageType.ERROR======> struct===========> GstMessageError, gerror=(GError)NULL, debug=(string)"../gst/rtsp/gstrtspsrc.c\(6790\):\ gst_rtsp_src_receive_response\ \(\):\ /GstPipeline:pipeline0/GstURIDecodeBin:source-bin-01/GstRTSPSrc:source:\012Could\ not\ receive\ message.\ \(System\ error\)"; t=========> <flags GST_MESSAGE_ERROR of type Gst.MessageType>
Error: gst-resource-error-quark: Could not read from resource. (9): ../gst/rtsp/gstrtspsrc.c(6790): gst_rtsp_src_receive_response (): /GstPipeline:pipeline0/GstURIDecodeBin:source-bin-01/GstRTSPSrc:source:
Could not receive message. (System error)
Exiting app

root@smarg:~/data/Ajeet_Gateway_Working/smarg/smarg_surveillance_prod/surveillance_gateway#

Hi @smarg.marcom,
Please post this question in the DeepStream SDK forum.
This issue is specific to DeepStream pipeline management (GStreamer/RTSP), not TensorRT, and the DeepStream team can assist you best there.

Thank You.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.