Nvsreammux EOS sporadically not propagated to a bus callback

Please provide complete information as applicable to your setup.

• Hardware Platform (GPU) - GPU A100
• DeepStream Version - 7.1
• JetPack Version (valid for Jetson only) - N/A
• TensorRT Version - 10.3.0.26-1+cuda12.5
• NVIDIA GPU Driver Version (valid for GPU only) - 550.90.07
• Issue Type( questions, new requirements, bugs) - bugs
• How to reproduce the issue ? (This is for bugs. Including which sample app is using, the configuration files content, the command line used and other details for reproducing)

We have built an app, which uses nvsreammux and nvurisrcbinfor processing multiple RTSP streams. Also, it uses Smart Record feature to keep capture moments of interest.
Streams are being fed to pipeline with original FPS, but inferencing rate is capped @5 FPS.
It may be too complex to provide a runnable snippet here.

Problem: In majority of cases we get EOS propagated to bus callback, but sometimes it’s not.
When it doesn’t happen the streams are stuck with no activity.
It impacts the rest of the business logic.

So far we’ve implemented a workaround to monitor for stuck streams and remove them from pipeline. But there’s a suspicion that it may lead to crashes if we try to remove a stream (nvurisrcbin), if it’s stuck in undefined state.

Also, we experience strange behavior of SR: sometimes video duration may be way longer than we anticipate: minutes vs 45s (our config)

Memory type used - Unified memory. We use it to create a copy of a frame for when the moment of interest happens.

Here’s the function of how we create a pipeline

    def _create(self) -> bool:
        if not self._check_config():
            return False

        self._platform_info = PlatformInfo()

        self._pipeline = Gst.Pipeline()
        if self._pipeline is None:
            logger.error("Unable to create Pipeline")
            return False

        batch_size = self._total_capacity if self._total_capacity > 0 else self.DEFAULT_MUX_BATCH_SIZE
        # Create nvstreammux instance to form batches from one or more sources.
        self._muxer = self._create_element("nvstreammux", "Stream-muxer")
        if self._muxer is None:
            return False

        self._muxer.set_property("batched-push-timeout", 33000)
        self._muxer.set_property("batch-size", batch_size)
        self._muxer.set_property("gpu_id", self._gpu_id)
        self._muxer.set_property("live-source", 1)
        # Set streammux width and height
        self._muxer.set_property('width', self.MUXER_OUTPUT_WIDTH)
        self._muxer.set_property('height', self.MUXER_OUTPUT_HEIGHT)

        self._pgie = self._create_element("nvinferserver", "primary-inference")
        if self._pgie is None:
            return False

        tee_infer = self._create_element("tee", "tee_infer")
        if tee_infer is None:
            return False

        queue_infer = self._create_element("queue", "queue_infer")
        if tee_infer is None:
            return False

        tiler = None
        nv_video_convert = None
        nv_osd = None
        self.nv_video_convert_infer = None

        logger.info("Creating nv_video_convert_infer")
        self.nv_video_convert_infer = self._create_element("nvvideoconvert", "nv_video_convert_infer")
        if self.nv_video_convert_infer is None:
            logger.error("Unable to create nv_video_convert_infer")
            return False

        video_src_pad = self.nv_video_convert_infer.get_static_pad("src")
        if video_src_pad is None:
            logger.error("Unable to get source pad for nvvideoconv")
            return False
        else:
            video_src_pad.add_probe(Gst.PadProbeType.BUFFER, self._videoconv_src_pad_buffer_probe, 0)

        logger.info("Creating caps filter")
        video_convert_caps = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
        if video_convert_caps is None:
            return False

        video_convert_caps_filter = self._create_element("capsfilter", "video_convert_caps_filter")
        if not video_convert_caps_filter:
            logger.error("Unable to create video_convert_caps_filter")
            return False

        video_convert_caps_filter.set_property("caps", video_convert_caps)

        sink = self._create_element("fakesink", "fakesink")
        if sink is None:
            return False

        sink.set_property('enable-last-sample', 0)
        sink.set_property('sync', 0)

        # Set configuration file path
        self._pgie.set_property('config-file-path', self.client_config)

        if not application_environment.DEBUG:
            mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
            self._muxer.set_property("nvbuf-memory-type", mem_type)
            self.nv_video_convert_infer.set_property("nvbuf-memory-type", mem_type)

        logger.info("Adding elements to Pipeline...")
        self._pipeline.add(self._muxer)
        self._pipeline.add(self._pgie)
        self._pipeline.add(tee_infer)
        self._pipeline.add(queue_infer)
        self._pipeline.add(self.nv_video_convert_infer)
        self._pipeline.add(video_convert_caps_filter)
        self._pipeline.add(sink)

        # link elements:
        logger.info("Linking elements in the Pipeline...")
        self._muxer.link(self._pgie)
        self._pgie.link(tee_infer)
        tee_infer.link(queue_infer)
        queue_infer.link(self.nv_video_convert_infer)
        self.nv_video_convert_infer.link(video_convert_caps_filter)
        video_convert_caps_filter.link(sink)

        return True

Bus callback implementation:

    def _bus_callback(self, bus, message, loop):
        type = message.type
        if type == Gst.MessageType.EOS:
            logger.info("End-of-stream reached. Stopping pipeline...")
            self._stop_pipeline()
        elif type == Gst.MessageType.WARNING:
            err, debug = message.parse_warning()
            logger.warning(f"Warning: {err}: {debug}")
        elif type == Gst.MessageType.ERROR:
            err, debug = message.parse_error()
            logger.error(f"Error: {err}: {debug}")
            logger.error(f"Message structure: {message.get_structure()}")
            error_message = f"Error: {err}: debug info: {debug}"

            match = self.stream_id_pattern.match(str(message.get_structure()))
            if match is not None:
                try:
                    stream_id = int(match.group(1))
                    source = self._get_source(stream_id)
                    stream = self.get_stream(stream_id)

                    if source is None:
                        historian.detection_point_error(None, stream_id, error_message)
                    else:
                        historian.detection_point_error(stream.detection_point_id, stream_id, error_message)

                    if stream is not None:
                        logger.info(f"Got an error for stream id: {stream_id}. Camera URL is: '{stream.url}' "
                                    f"Detection point ID {stream.detection_point_id}, "
                                    f"name '{stream.detection_point_name}'. "
                                    "Stopping it. Coordinator should restart this stream.")

                        historian.detection_point_stop(stream.detection_point_id,
                                                       stream.detection_point_name,
                                                       stream.url, stream_id,
                                                       f"STOP BY ERROR: {error_message}")
                        self._release_source(stream_id)
                        super().remove_source(stream_id)
                    elif source is not None:
                        historian.detection_point_stop(None, None, None,
                            stream_id, f"OFF-PIPELINE STOP BY ERROR: {error_message}")
                        self._release_source(stream_id)
                except ValueError:
                    logger.warning(f"Got an error, but wasn't able to parse related stream id.")
                    historian.session_error(f"Got an error without stream id. Error: {err}: debug info: {debug}")
            else:
                logger.error(f"Got an error, but wasn't able to extract related stream id.")
                historian.session_error(f"Got an error without stream id. Error: {err}: debug info: {debug}")

        elif type == Gst.MessageType.ELEMENT:
            struct = message.get_structure()
            # Check for stream-eos message
            if struct is not None and struct.has_name("stream-eos"):
                parsed, stream_id = struct.get_uint("stream-id")
                if parsed:
                    logger.info(f"Got End-of-stream for source {stream_id}")

                    stream = self.get_stream(stream_id)
                    source = self._get_source(stream_id)
                    if stream is None:
                        historian.session_error(f"Got EOS for stream {stream_id}, but it's no on the pipeline already. "
                                                f"Is source also gone?: {source is None}")

                    else:
                        historian.detection_point_stop(stream.detection_point_id,
                                                       stream.detection_point_name,
                                                       stream.url,
                                                       stream_id,
                                                       "Removed because of EOS")

                    self._release_source(stream_id)
                    super().remove_source(stream_id)

        return True

The following is how we setup the source bins

    @staticmethod
    def _create_element(element_name, bin_name):
        logger.info(f"Creating bin {bin_name}\n")
        element = Gst.ElementFactory.make(element_name, bin_name)
        if element is None:
            logger.error(f"Unable to create {element_name}\n")
            return None

        return element

    @staticmethod
    def _get_source_bin_name(source_id: int):
        return "source-bin-%02d" % source_id

    def _create_source_bin(self, uri, source_id):
        # Create a source GstBin to abstract this bin's content from the rest of the
        # pipeline
        bin_name = self._get_source_bin_name(source_id)
        logger.info(f"Creating {bin_name} for {uri}")

        # Source element for reading from the uri.
        # We will use decodebin and let it figure out the container format of the
        # stream and the codec and plug the appropriate demux and decode plugins.
        source_bin = self._create_element("nvurisrcbin", f"{bin_name}")

        stream = self.get_stream(source_id)

        # We set the input uri to the source element
        source_bin.set_property("uri", uri)
        source_bin.set_property("smart-record", 2)
        source_bin.set_property("smart-rec-dir-path", self._recordings_path)
        source_bin.set_property("smart-rec-file-prefix", f"recording_dp_{stream.detection_point_id}")
        source_bin.set_property("smart-rec-cache", self.recording_cache_size + 1)
        #source_bin.set_property("smart-rec-start-time", self.recording_cache_size)
        source_bin.set_property("smart-rec-default-duration", self.recording_default_duration)
        source_bin.set_property("smart-rec-mode", 1) # video only
        source_bin.set_property("select-rtp-protocol", 4) #force tcp
        source_bin.set_property("drop-on-latency", False)

        # Connect to the "pad-added" signal of the decodebin which generates a
        # callback once a new pad for raw data has been created by the decodebin
        source_bin.connect("pad-added", self._on_pad_added, source_id)
        source_bin.connect("child-added", self._on_decodebin_child_added, source_id)
        source_bin.connect("start-sr", self._on_recording_started, source_id)
        source_bin.connect("sr-done", self._on_recording_stopped, source_id)

        return source_bin

    def _on_pad_added(self, _, pad, data):
        caps = pad.get_current_caps()
        if not caps:
            caps = pad.query_caps()
        gst_struct = caps.get_structure(0)
        gst_name = gst_struct.get_name()

        # Need to check if the pad created by the decodebin is for video and not
        # audio.
        logger.info(f"Pad added: {gst_name}")
        if gst_name.find("video") != -1:
            source_id = data
            pad_name = self._get_sink_pad_name(source_id)
            # Get a sink pad from the streammux, link to decodebin
            sink_pad = self._muxer.request_pad_simple(pad_name)
            if not sink_pad:
                logger.error(f"Unable to create sink pad bin {pad_name}")
            if pad.link(sink_pad) == Gst.PadLinkReturn.OK:
                logger.info(f"Decodebin {pad_name} linked to pipeline. User data: {data}")
                self._set_inference_for_source(data, True)
                if self._max_fps is not None:
                    self._set_infer_fps(data, self._max_fps)
            else:
                logger.error(f"Failed to link decodebin {pad_name} to pipeline")

    def _on_decodebin_child_added(self, child_proxy: Gst.ChildProxy, obj: GObject.Object, name: str, user_data):
        logger.info(f"Decodebin child added {name}")

        if name.find("decodebin") != -1:
            obj.connect("child-added", self._on_decodebin_child_added, user_data)

        if name.find("nvv4l2decoder") != -1:
            logger.info(f"Found nvv4l2decoder: {name}")

            obj.set_property("gpu_id", self._gpu_id)
            # Use CUDA unified memory in the pipeline so frames can be easily accessed on CPU in Python.
            # 0: NVBUF_MEM_CUDA_DEVICE, 1: NVBUF_MEM_CUDA_PINNED, 2: NVBUF_MEM_CUDA_UNIFIED
            # Don't use direct macro here like NVBUF_MEM_CUDA_UNIFIED since nvv4l2decoder uses a
            # different enum internally
            if not application_environment.DEBUG:
                obj.set_property("cudadec-memtype", 2)
                logger.info(f"Set cudadec-memtype to NVBUF_MEM_CUDA_UNIFIED for {name}")

Aux struct we use for managing additional properties of the stream:

class SourceControlStruct:
    def __init__(self, source_bin = None):
        self.source_bin = source_bin
        self.inference_enabled:bool = False
        self.recording_start_timestamp: float | None = None

    @property
    def is_recording(self) -> bool:
        return self.source_bin.get_property("smart-rec-status") if self.source_bin is not None else False

Recording management:

    def start_recording(self, source_id: int, timestamp: int):
        source = self._get_source(source_id)

        if not super().start_recording(source_id, timestamp) or source is None:
            logger.error(f"Cannot start recording. Source {source_id} does not exist.")
            return

        if source.is_recording:
            logger.error(f"Source {source_id} is already being recorded. Skipping record command.")
            return

        seconds_before = self.recording_cache_size
        if seconds_before < 0:
            logger.warning(f"Negative seconds_before requested: {seconds_before}, using 0 instead.")
            seconds_before = 0

        if self.recording_default_duration <= 0:
            logger.error(
                f"Non-positive recording_duration requested: {self.recording_default_duration}, not recording.")
            return

        session_id = pyds.alloc_uint32_value(0)
        result = source.source_bin.emit('start-sr', session_id, seconds_before, self.recording_default_duration, None)
        logger.success(f"Started recording session {session_id}, result: {result}")
        pyds.free_gbuffer(session_id)

        start_time = time.time()
        while True:
            if source.is_recording:
                source.recording_start_timestamp = time.time()
                logger.info(f"Recording for {source_id} started @ {source.recording_start_timestamp}")
                break
            time.sleep(0.1)
            if time.time() - start_time > 1:
                logger.warning(f"Timeout while waiting for recording to start for source {source_id}.")
                break

    def stop_recording(self, source_id: int):
        source = self._get_source(source_id)

        if source is None:
            logger.error(f"Source {source_id} does not exist, there is nothing to stop.")
            return

        if not source.is_recording:
            logger.info(f"Source {source_id} is not being recorded, skipping the command.")
            return

        logger.info(f"Stopping smart recording on source {source_id}...")
        source.source_bin.emit('stop-sr', 0)

        start_time = time.time()
        while True:
            if not source.is_recording:
                logger.info(f"Recording for {source_id} stopped.")
                break
            time.sleep(0.1)
            if time.time() - start_time > 1:
                logger.warning(f"Timeout while waiting for recording to stop for source {source_id}.")
                break

    def _on_recording_stopped(self, nvurisrcbin, recording_info, user_data, source_id):
        source = self._get_source(source_id)
        if source is None:
            logger.error(f"Got recording stopped callback for non-existent source '{source_id}'")
        elif source.recording_start_timestamp is None:
            logger.warning(f"Got recording stopped callback while no recording start timestamp '{source_id}'.")
        else:
            logger.info(f"Got recording stopped callback for source '{source_id}'. "
                        f"Recording duration: {time.time() - source.recording_start_timestamp}s")
            source.recording_start_timestamp = None

        logger.success(f"Recording stopped: {source_id}, info: {recording_info}, user data: {user_data}")
        info = pyds.NvDsSRRecordingInfo.cast(hash(recording_info))
        filename = pyds.get_string(info.filename)
        folder = pyds.get_string(info.dirpath)
        logger.success(f"Recording info: {filename}, {folder}")

    def _on_recording_started(self, nvurisrcbin, session_id, seconds_before, duration, user_data, source_id):
        source = self._get_source(source_id)
        logger.success(f"Registered 'recording_started' for source {source_id}. Source exists?: {source is not None}")

Annotation of inferenced frames (videoconv probe)

    def _videoconv_src_pad_buffer_probe(self, pad, info, u_data):
        gst_buffer = info.get_buffer()
        if not gst_buffer:
            logger.error("Unable to get source buffer")
            return Gst.PadProbeReturn.DROP

        # Retrieve batch metadata from the gst_buffer
        # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
        # C address of gst_buffer as input, which is obtained with hash(gst_buffer)
        batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
        if not batch_meta:
            return Gst.PadProbeReturn.OK

        l_frame = batch_meta.frame_meta_list
        streams: dict[int, Stream] = {}

        while l_frame is not None:
            try:
                # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
                # The casting is done by pyds.NvDsFrameMeta.cast()
                # The casting also keeps ownership of the underlying memory
                # in the C code, so the Python garbage collector will leave
                # it alone.
                frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
            except StopIteration:
                break

            stream = streams.get(frame_meta.source_id, None)
            if stream is None:
                stream = self.get_stream(frame_meta.source_id)
                if stream is None:
                    break
                streams[frame_meta.source_id] = stream

            # Update frame rate through this probe
            if frame_meta.bInferDone:
                self._perf_data.register_frame_for_source_id(frame_meta.source_id)

                if not application_environment.DEBUG:
                    n_frame = pyds.get_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
                    if n_frame is None:
                        logger.warning("Unable to get frame surface pointer")
                        try:
                            l_frame = l_frame.next
                        except StopIteration:
                            break
                        continue

                    frame_copy = np.array(n_frame, copy=True, order='C')
                    frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_RGBA2BGRA)
                else:
                    frame_copy = None

                source_frame_width = self.MUXER_OUTPUT_WIDTH #frame_meta.source_frame_width
                source_frame_height = self.MUXER_OUTPUT_HEIGHT #frame_meta.source_frame_height
                classes = []
                scores = []
                xs = []
                ys = []
                widths = []
                heights = []

                l_obj = frame_meta.obj_meta_list
                while l_obj is not None:
                    try:
                        obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
                    except StopIteration:
                        break

                    classes.append(obj_meta.class_id)
                    scores.append(obj_meta.confidence)

                    rect_params = obj_meta.rect_params
                    xs.append(rect_params.left / source_frame_width)
                    ys.append(rect_params.top / source_frame_height)
                    widths.append(rect_params.width / source_frame_width)
                    heights.append(rect_params.height / source_frame_height)

                    left = int(rect_params.left)
                    top = int(rect_params.top)
                    w = int(rect_params.width)
                    h = int(rect_params.height)
                    try:
                        if not application_environment.DEBUG:
                            text = f"{self.get_class_name_by_id(obj_meta.class_id)}: {obj_meta.confidence:.2f}"
                            color = OBJECTS_PALETTE[int(obj_meta.class_id) % len(OBJECTS_PALETTE)]  # Assign predefined color
                            cv2.rectangle(frame_copy, (left, top), (left + w, top + h), color, 2)
                            cv2.putText(frame_copy, text, (left, top - 10), cv2.FONT_HERSHEY_SIMPLEX,
                                        0.5, (255, 255, 255), 2)
                    except Exception as e:
                        logger.warning(f"Failed to draw detection bbox: {self.get_class_name_by_id(obj_meta.class_id)}, "
                                       f"({rect_params.left}, {rect_params.top}, "
                                       f"{rect_params.width}, {rect_params.height}): {e})")

                    try:
                        l_obj = l_obj.next
                    except StopIteration:
                        break

                stream.last_frame = frame_copy

            try:
                l_frame = l_frame.next
            except StopIteration:
                break

        return Gst.PadProbeReturn.OK

Unfortunately we don’t have a sample app, which may run locally, as this issue happens sporadically in live environment.

In fact, If you want to dynamically add/remove rtsp cameras, we recommend nvmultiurisrcbin, which has been tested internally.

SR records the original video stream of the rtsp camera. This problem may be related to the rtsp timestamp. The actual video length is usually not longer than the smart-rec-duration.

If you use the reconnect feature of nvurisrcbin, nvurisrcbin will handle the gstreamer EOS/ERROR message, which may be the reason why you occasionally cannot receive EOS on the application bus_call. nvurisrcbin is open source, you can debug it. However, I still recommend you to use nvmultiurisrcbin

Thank you for the reply.

I’ve made a try to utilize nvmultiurisrcbin, but I believe the internals of adding/removing nvurisrcbins are abstracted and are no longer available as it was with nvstreammux. By exploring child-added event I see that nvmultiurisrcbin creates a Stream-muxer_creator bin, which probably will instantiate nvstreammux when needed (just my assumption), but it makes linking dynamically created nvruisrcbins to nvstreammux impossible.

Do you you have some python example which may explain how to solve this task?

From the “test5” app I see that all dynamic sources management is done via REST API, but it’s not what we need in out case (we have our own RESP API implemented to manage sources).
And BTW: do we really need to resort to nvmultiurisrcbin here if we already manage sources dynamically via our own API layer and all we have to solve is a stable EOS handling (original problem in question)?

Any help will be appreciated.

Just convert the parameters of your REST API to the localhost REST API of nvmultiurisrcbin, which can avoid some problems, in fact, the original problem is not caused by the dynamic addition and removal of sources?
nvmultiurisrcbin came into being precisely to solve a similar problem.

Just like a regular deepstream element, plug into the pipeline.

There is no update from you for a period, assuming this is not an issue anymore. Hence we are closing this topic. If need further support, please open a new one. Thanks

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.