Issue with RTSP Camera Reconnection: Pipeline Breaks When Camera Comes Back Instantly

Hello NVIDIA Devs,

System Information:

  • DeepStream Version: 8.0.0
  • CUDA Driver Version: 13.0
  • CUDA Runtime Version: 12.9
  • TensorRT Version: 10.9
  • cuDNN Version: 9.8
  • libNVWarp360 Version: 2.0.1d3
  • Operating System: Ubuntu 24.04.1 LTS
  • Distributor ID: Ubuntu
  • Release: 24.04
  • Codename: noble

I am working on a DeepStream project that involves dynamic runtime source addition and deletion for RTSP cameras. The system works well when a camera goes offline and comes back after 20-30 seconds, automatically reconnecting and resuming without any issues.

Problem:

The issue arises when a camera goes offline briefly (just for a few seconds) and comes back instantly. In this case, my pipeline breaks, and I am unable to reconnect successfully.

How can I resolve this problem and ensure that the pipeline remains stable even when a camera disconnects briefly?

What steps or strategies can be used to detect a brief disconnection and safely reconnect the camera without causing the pipeline to break or fail?

Current Approach:

I am using Python + GStreamer + DeepStream, and I have implemented a singleton class to manage runtime sources. My pipeline dynamically adds or removes sources based on camera availability, and I attempt reconnection on EOS or error messages.

Here is the relevant code snippet:

import gi
gi.require_version("Gst", "1.0")
gi.require_version("GstRtspServer", "1.0")
from gi.repository import GObject, Gst, GstRtspServer, GLib
from surveillanceManager.sm_rule_base import *
import time
import cv2

class SMRunTimeSourceAdditionMain(SMRuleBase):
    __instance = None

    @staticmethod
    def getInstance():
        if SMRunTimeSourceAdditionMain.__instance is None:
            SMRunTimeSourceAdditionMain()
        return SMRunTimeSourceAdditionMain.__instance

    def __init__(self):
        if SMRunTimeSourceAdditionMain.__instance is not None:
            raise Exception("This class is a singleton!")
        SMRunTimeSourceAdditionMain.__instance = self

        self.pipeline = None
        self.streammux = None
        self.g_source_bin_list = []
        self.g_source_uri_list = []

    def uridecode_bus_call(self, bus, message, loop):
        t = message.type
        if t == Gst.MessageType.EOS:
            sys.stdout.write("End-of-stream\n")
            loop.quit()
        elif t == Gst.MessageType.WARNING:
            err, debug = message.parse_warning()
            sys.stderr.write("Warning: %s: %s\n" % (err, debug))
        elif t == Gst.MessageType.ERROR:
            struct = message.get_structure()
            err, debug = message.parse_error()
            sys.stderr.write("Error: %s: %s\n" % (err, debug))
            loop.quit()
        elif t == Gst.MessageType.ELEMENT:
            struct = message.get_structure()
            if struct is not None and struct.has_name("stream-eos"):
                parsed, stream_id = struct.get_uint("stream-id")
                if parsed:
                    print("Got EOS from stream %d" % stream_id)
                    self.stop_release_source(stream_id)
        return True

    def cb_newpad_uridecode(self, decodebin, pad, data):
        caps = pad.get_current_caps()
        gststruct = caps.get_structure(0)
        gstname = gststruct.get_name()
        if "video" in gstname:
            source_id = data
            pad_name = "sink_%u" % source_id
            sinkpad = self.streammux.get_request_pad(pad_name)
            if pad.link(sinkpad) != Gst.PadLinkReturn.OK:
                sys.stderr.write("Failed to link decodebin to pipeline\n")

    def uridecode_decodebin_child_added(self, child_proxy, Object, name, user_data):
        if "decodebin" in name:
            Object.connect("child-added", self.uridecode_decodebin_child_added, user_data)

    def create_uridecode_bin(self, index, filename):
        bin_name = "source-bin-%02d" % index
        bin = Gst.ElementFactory.make("uridecodebin", bin_name)
        if not bin:
            sys.stderr.write("Unable to create uri decode bin\n")
            return None
        bin.set_property("uri", filename)
        bin.connect("pad-added", self.cb_newpad_uridecode, index)
        bin.connect("child-added", self.uridecode_decodebin_child_added, index)
        return bin

    def stop_release_source(self, source_id):
        try:
            state_return = self.g_source_bin_list[source_id].set_state(Gst.State.NULL)
            pad_name = "sink_%u" % source_id
            sinkpad = self.streammux.get_static_pad(pad_name)
            sinkpad.send_event(Gst.Event.new_flush_stop(False))
            self.streammux.release_request_pad(sinkpad)
            self.pipeline.remove(self.g_source_bin_list[source_id])
        except Exception as e:
            print("Exception in stop_release_source:", e)

    def add_sources(self, uri):
        if uri not in self.g_source_uri_list:
            source_id = len(self.g_source_bin_list)
        else:
            source_id = self.g_source_uri_list.index(uri)

        try:
            self.stop_release_source(source_id)
        except Exception as ex1:
            print("Unable to remove source:", str(ex1))

        try:
            source_bin = self.create_uridecode_bin(source_id, uri)
            if not source_bin:
                sys.stderr.write("Failed to create source bin\n")
                exit(1)
            if uri not in self.g_source_uri_list:
                self.g_source_bin_list.append(source_bin)
                self.g_source_uri_list.append(uri)
            else:
                self.g_source_bin_list[source_id] = source_bin

            self.pipeline.add(source_bin)
            state_return = source_bin.set_state(Gst.State.PLAYING)
            return True
        except Exception as ex:
            print("Error in source add:", ex)
            return False

main pipline code

def execute(self, allCameraObjectListOfList, instancSingle):
      """
      Main execute function for the pipeline. This function is called in a thread.
      It contains logic for pipeline plugin configuration, model loading, linking, 
      and callback function.
  
      :param allCameraObjectListOfList: List containing all camera group objects.
      :param instancSingle: Singleton instance of the communication manager class.
      """
  
      # Initialize camera status dictionary
      cameralist = {}
      for camdata in instancSingle.cameraObjectList:
          cameralist.update({camdata.url: camdata.status})
  
      # Get the list of camera URLs
      cameraUrlList = self.getCameraList(allCameraObjectListOfList)
      self.isfirstPipelineFinish[0] = True
  
      # Initialize communication instance and check for configuration validity
      self.communicateInstance = instancSingle
      self.checkEffective()
  
      # Initialize face models if needed
      if self.isBlacklistedFaceEnable or self.isVipPersonEntry:
          initAllFaceModel()
  
      # Send an initial heartbeat
      self.interval = 2
      self.n.notify("READY=1")
      SMCallWatch.getInstance().send_heartbeat(datetime.now(), self.communicateInstance, 1)
  
      # Check if the camera list is empty
      if len(cameraUrlList) < 1:
          sys.stderr.write("usage: %s <uri1> [uri2] ... [uriN]\n" % cameraUrlList[0])
          sys.exit(1)
  
      # Initialize FPS stream information
      for i in range(0, len(cameraUrlList)):
          self.fps_streams["stream{0}".format(i)] = GETFPS(i)
  
      # Number of sources (cameras)
      number_sources = len(cameraUrlList)
  
      # Initialize GStreamer
      GObject.threads_init()
      Gst.init(None)
  
      # Create a GStreamer pipeline
      pipeline = Gst.Pipeline()
      if not pipeline:
          sys.stderr.write("Unable to create pipeline\n")
  
      # Create a stream-muxer element to form batches from one or more sources
      streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
      if not streammux:
          sys.stderr.write("Unable to create NvStreamMux\n")
      pipeline.add(streammux)
  
      # Initialize source ID to camera ID mapping
      cam_id_to_src_id = {}
  
      # Loop through each camera source to add them to the pipeline
      for i in range(number_sources):
          self.cam_sid_mapping[1][i] = cameraUrlList[i][0]
          cam_id_to_src_id[cameraUrlList[i][0]] = i
          uri_name = cameraUrlList[i][1]
  
          # Skip if the camera is not enabled
          if not cameralist.get(uri_name):
              continue
  
          # Check if the camera stream is live
          is_live = uri_name.startswith("rtsp://")
  
          # Create and add source bin for each camera
          if not constant.IS_OUTPUT_VIDEO_ENABLE:
              source_bin = self.RuntimeSourceAddMain.create_uridecode_bin(i, uri_name)
              if not source_bin:
                  sys.stderr.write("Unable to create source bin\n")
              if uri_name.split(":")[0] != "file":
                  self.RuntimeSourceAddMain.g_source_bin_list.append(source_bin)
                  self.RuntimeSourceAddMain.g_source_uri_list.append(uri_name)
              pipeline.add(source_bin)
          else:
              source_bin = self.create_source_bin(i, uri_name)
              pipeline.add(source_bin)
  
              # Link the source bin to the streammux
              padname = "sink_%u" % i
              sinkpad = streammux.get_request_pad(padname)
              if not sinkpad:
                  sys.stderr.write("Unable to create sink pad bin\n")
              srcpad = source_bin.get_static_pad("src")
              if not srcpad:
                  sys.stderr.write("Unable to create source pad bin\n")
              srcpad.link(sinkpad)
  
      # Create an event loop and connect to the GStreamer bus
      loop = GObject.MainLoop()
      bus = pipeline.get_bus()
      bus.add_signal_watch()
  
      # Connect message handler to the bus
      if constant.IS_OUTPUT_VIDEO_ENABLE:
          bus.connect("message", self.bus_call, loop)
      else:
          bus.connect("message", self.RuntimeSourceAddMain.uridecode_bus_call, loop)
  
      # Set up RTSP output if video output is enabled
      if constant.IS_OUTPUT_VIDEO_ENABLE and not is_aarch64():
          rtsp_port_num = self.people_class_const["RTSP_OUT_PORT"]
          server = GstRtspServer.RTSPServer.new()
          server.props.service = "%d" % rtsp_port_num
          server.attach(None)
          factory = GstRtspServer.RTSPMediaFactory.new()
          factory.set_launch(
              '( udpsrc name=pay0 port=%d buffer-size=52428 caps="application/x-rtp,media=video, clock-rate=90000, encoding-name=(string)%s, payload=96 " )'
              % (updsink_port_num, codec)
          )
          factory.set_shared(True)
          server.get_mount_points().add_factory("/smarg_analytics1", factory)
  
          print(f"\n*** RTSP Streaming at rtsp://localhost:{rtsp_port_num}/smarg_analytics1 ***\n")
  
      # Set up the tiler sink pad for video processing
      tiler_sink_pad = tiler.get_static_pad("sink")
      if not tiler_sink_pad:
          sys.stderr.write("Unable to get source pad\n")
      else:
          tiler_sink_pad.add_probe(Gst.PadProbeType.BUFFER, self.mainPipelineCallBack.tiler_sink_pad_buffer_probe_1, 0)
  
      # Start processing the video feeds
      print("\nNow Running for Given Video")
      for i, source in enumerate(cameraUrlList):
          print(f"{i}: {source}")
  
      print("Start Video Analytics Processing...\n")
      pipeline.set_state(Gst.State.PLAYING)
  
      self.RuntimeSourceAddMain.pipeline = pipeline
  
      # Trigger model loading completion and cleanup
      self.isfirstPipelineFinish[2] = True
      self.triggerDeleteDecryptFile()
  
      # Run the event loop
      try:
          loop.run()
      except Exception as ex:
          print(f"Exception in pipeline run: {ex}")
          logger.error(f"Exception in pipeline run: {ex}")
  
      # Cleanup after loop finishes
      pipeline.set_state(Gst.State.NULL)
      self.isfirstPipelineFinish[1] = True
      self.triggerLogSegregation()
  
      print("Exiting app\n")

logs


Gst.MessageType.ELEMENT======> struct===========> stream-eos, stream-id=(uint)1; t=========> <flags GST_MESSAGE_ELEMENT of type Gst.MessageType>
Got EOS from stream 1
stop_release_source ---- 1
stop status  <enum GST_STATE_CHANGE_SUCCESS of type Gst.StateChangeReturn>
STATE CHANGE SUCCESS

gstnvtracker: Successfully removed stream from low level tracker with Id 100000000.
gstnvtracker: Successfully removed stream 1.
STATE CHANGE SUCCESS

Gst.MessageType.ERROR======> struct===========> GstMessageError, gerror=(GError)NULL, debug=(string)"../gst/rtsp/gstrtspsrc.c\(6790\):\ gst_rtsp_src_receive_response\ \(\):\ /GstPipeline:pipeline0/GstURIDecodeBin:source-bin-01/GstRTSPSrc:source:\012Could\ not\ receive\ message.\ \(System\ error\)"; t=========> <flags GST_MESSAGE_ERROR of type Gst.MessageType>
Error: gst-resource-error-quark: Could not read from resource. (9): ../gst/rtsp/gstrtspsrc.c(6790): gst_rtsp_src_receive_response (): /GstPipeline:pipeline0/GstURIDecodeBin:source-bin-01/GstRTSPSrc:source:
Could not receive message. (System error)
Exiting app

root@smarg:~/data/Ajeet_Gateway_Working/smarg/smarg_surveillance_prod/surveillance_gateway#

Please refer to /opt/nvidia/deepstream/deepstream/service-maker/sources/apps/python/pipeline_api/deepstream_test5_app for your dynamically add/remove sources case.

can you reproduce the same issue with /opt/nvidia/deepstream/deepstream/service-maker/sources/apps/python/pipeline_api/deepstream_test5_app?

1 .First Command:

  • Command:
    python3 deepstream_test5.py -c test5_b16_dynamic_source_with_enable_property.yaml -s source_list_dynamic.yaml

    **Problem**:
    
    * The pipeline does not reconnect automatically when a camera goes offline.
    

2 .Second Command:

  • Command:

    python3 deepstream_test5.py -s source_list_dynamic.yaml source_list_static.yaml -c test5_b16_dynamic_source.yaml test5_b2.yaml

    Error:

    • The following error message appears

      ```
      Plugin smart_recording_action initialized
      value: /opt/nvidia/deepstream/deepstream/lib/libnvds_kafka_proto.so
      value: localhost;9092
      value: /opt/nvidia/deepstream/deepstream/sources/apps/sample_apps/deepstream-test5/configs/dstest5_msgconv_sample_config.txt
      value: /opt/nvidia/deepstream/deepstream/sources/libs/kafka_protocol_adaptor/cfg_kafka.txt
      Failed to subscribe to topics
      Plugin smart_recording_signal initialized
      Text disabled. Use keyboard/mouse commands to toggle source expand and toggle text display.
      NOTE: To expand a source in the 2D tiled display and view object details, left-click on the source.
      To go back to the tiled display, right-click anywhere on the window.
      value: /opt/nvidia/deepstream/deepstream/samples/models/Primary_Detector/resnet18_trafficcamnet_pruned.onnx_b2_gpu0_int8.engine
      Unable to set the pipeline to the paused state for initialization.
      Unable to set the pipeline to the playing state.
      Unable to set the pipeline to the playing state.
      Pipeline deepstream-test5-0 stopped
      Pipeline deepstream-test5-1 stopped
      


I would like guidance on how to resolve the following issues so that the system can detect when a camera goes offline and automatically reconnect when it comes back online.
  1. Have you set up your Kafka server correctly before you run this command?
  2. Do you have complete log?

There is no update from you for a period, assuming this is not an issue anymore. Hence we are closing this topic. If need further support, please open a new one. Thanks.