How to safely start and stop a pipeline repeatedly in Python?

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU) Jetson
• DeepStream Version 6.0
• JetPack Version (valid for Jetson only) 4.6
• TensorRT Version 8.0
• Issue Type( questions, new requirements, bugs) questions
My current code works fine when started for the first time, but it throws an error upon stopping, and also throws an error when started again for the second time. How can I resolve this issue?


start and stop code:

class DeepStreamApp:
    def __init__(self, inputs):
        self.inputs = inputs
        self.number_sources = len(inputs)
        self.pipeline = None
        self.loop = None
        self._bus_call_received_stop_signal = False
        self._stop_requested = False

    def _bus_call(self, bus, message, loop):
        t = message.type
        if t == Gst.MessageType.EOS:
            sys.stdout.write("End-of-stream\n")
            self._bus_call_received_stop_signal = True
            loop.quit()
        elif t == Gst.MessageType.WARNING:
            err, debug = message.parse_warning()
            sys.stderr.write("Warning: %s: %s\n" % (err, debug))
        elif t == Gst.MessageType.ERROR:
            err, debug = message.parse_error()
            self._bus_call_received_stop_signal = True
            sys.stderr.write("Error: %s: %s\n" % (err, debug))
            loop.quit()
        return True

    def _pgie_src_pad_buffer_probe(self, pad, info, u_data):
        # This method should be overridden by subclasses
        pass

    def _osd_sink_pad_buffer_probe(self, pad, info, u_data):
        # This method should be overridden by subclasses
        pass

    def _osd_src_pad_buffer_probe(self, pad, info, u_data):
        # This method should be overridden by subclasses
        pass

    def _nvinfer_postprocess(self, output_tensor, **kwargs):
        # custom postprocess
        pass

    def _create_pipeline(self):
        # Create and return the pipeline here
        raise NotImplementedError("The _create_pipeline method must be implemented by subclasses")

    def start(self, retry_interval=5, max_retries=-1):
        GObject.threads_init()
        Gst.init(None)
        retries = 0
        should_retry = True
        self._stop_requested = False
        while should_retry and (max_retries == -1 or retries < max_retries) and not self._stop_requested:
            self._bus_call_received_stop_signal = False
            self.pipeline = self._create_pipeline()
            if not self.pipeline:
                sys.stderr.write("Unable to create pipeline\n")
                return

            self.loop = GObject.MainLoop()
            bus = self.pipeline.get_bus()
            bus.add_signal_watch()
            bus.connect("message", self._bus_call, self.loop)
            self.pipeline.set_state(Gst.State.PLAYING)
            try:
                self.loop.run()
            except BaseException:
                pass
            should_retry = self._bus_call_received_stop_signal
            if should_retry:
                self.pipeline.set_state(Gst.State.NULL)
                print(f"Retrying in {retry_interval} seconds...")
                time.sleep(retry_interval)
                retries += 1
            else:
                break

    def stop(self):
        self._stop_requested = True
        if self.pipeline:
            self.pipeline.set_state(Gst.State.NULL)
        if self.loop:
            self.loop.quit()

Hi @zyann,

I would suggest using our GStreamer daemon to handle the pipeline. It is an open source project that streamlines pipeline manipulation. We also have a python client.

I created an example using the Bash client with the elements that I assume you are using. The only component missing is rtspsink, but you can use your code to stream RTSP by switching the fakesink with an appsink

gstd-client pipeline_create p0 \
uridecodebin3 uri=file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4 ! queue ! \
nvstreammux0.sink_0 nvstreammux name=nvstreammux0 batch-size=1 batched-push-timeout=40000 width=1920 height=1080 live-source=TRUE ! queue ! \
nvvideoconvert ! queue ! \
nvinfer name=nvinfer1 config-file-path="/opt/nvidia/deepstream/deepstream-5.0/sources/apps/sample_apps/deepstream-test1/dstest1_pgie_config.txt" output-tensor-meta=true ! queue ! \
nvtracker tracker-width=240 tracker-height=200 ll-lib-file=libnvds_mot_iou.so ll-config-file=iou_config.txt ! queue ! \
nvdsosd ! \
nvvideoconvert ! \
nvv4l2h264enc name=encoder control-rate=1 maxperf-enable=true iframeinterval=10 ! \
capsfilter caps=video/x-h264 ! rtph264pay ! \
perf ! \
fakesink sync=false

gstd-client pipeline_play p0

gstd-client pipeline_stop p0

I tested ‘stop’ and ‘play’ in that example several times and didn’t encounter any issues. As a disclaimer, some GStreamer elements need to be sent an EOS before stopping them, so a more secure way of stopping would be:

gstd-client event_eos p0
gstd-client pipeline_stop p0

The python client has the same commands:

from pygstc.gstc import *
from pygstc.logger import *
gstd_logger = CustomLogger('gstd', loglevel='DEBUG')
gstd_client = GstdClient(logger=gstd_logger)
gstd_client.pipeline_create("p0", "...")
gstd_client.pipeline_play("p0")
gstd_client.pipeline_stop("p0")

Thank you for the reply, I believe this is a valuable project. However, I am not merely launching the application through the command line. Instead, I have incorporated a series of additional data stream processes, such as adding custom models for post-processing and custom metadata through probes. This part is modified based on the official DeepStream Python examples.
Now, the functionality has already been developed and completed, it’s just that there are issues with repeatedly starting and stopping. I observed that pygstd seems to only support launching from the command line. If it can’t simply replace GObject, it might be difficult to meet my requirements.

Hi @zyann

I understand that migrating an entire application to gstd might require a significant effort, but we already have several DeepStream applications for different clients that use gstd to manage all the pipelines. There are ways to address all the functionalities you mentioned.

  • Custom metadata: We manage metadata using signals and properties. We have an element that serializes all metadata to JSON and sends it to the Python app using a signal. The Python app can then modify the metadata and send it back using a property.

  • Custom models for post-processing: We’ve developed a simple GStreamer element based on GstVideoFilter, which calls custom code in the transform_frame_ip method. We have both a Python and a C version of the element, so we can either pass a compiled .so file or a .py file with the code we want to execute.

Creating custom GStreamer elements has a significant learning curve, but aside from that, we believe that the solution using gstd is the simplest and most maintainable way to develop DeepStream applications. This approach is what we presented on the NVIDIA GTC 2020 and it is also what we use on our DeepStream Reference Designs.

Hi, @fanzh

I don’t know why I can’t find your response anymore.

  1. I am using a custom model with rotated bounding boxes, and the post-processing is handled by a custom probe. Within this probe, I pass the aligned bounding box information to the tracker.

  2. I’ve modified the stop method further with references from forums. There wasn’t an error when stopping for the first time, but an error occurred upon restarting.

class DeepStreamApp:
    def __init__(self, inputs):
        self.inputs = inputs
        self.number_sources = len(inputs)
        self.pipeline = None
        self.loop = None
        self._bus_call_received_stop_signal = False
        self._stop_requested = False
        self.streammux = None
        GObject.threads_init()
        Gst.init(None)

    def _bus_call(self, bus, message, loop):
        t = message.type
        if t == Gst.MessageType.EOS:
            sys.stdout.write("End-of-stream\n")
            self._bus_call_received_stop_signal = True
            loop.quit()
        elif t == Gst.MessageType.WARNING:
            err, debug = message.parse_warning()
            sys.stderr.write("Warning: %s: %s\n" % (err, debug))
        elif t == Gst.MessageType.ERROR:
            err, debug = message.parse_error()
            self._bus_call_received_stop_signal = True
            sys.stderr.write("Error: %s: %s\n" % (err, debug))
            loop.quit()
        elif self._stop_requested:  # 检查是否请求了停止
            loop.quit()
        return True

    def _pgie_src_pad_buffer_probe(self, pad, info, u_data):
        # This method should be overridden by subclasses
        pass

    def _osd_sink_pad_buffer_probe(self, pad, info, u_data):
        # This method should be overridden by subclasses
        pass

    def _osd_src_pad_buffer_probe(self, pad, info, u_data):
        # This method should be overridden by subclasses
        pass

    def _nvinfer_postprocess(self, output_tensor, **kwargs):
        # custom postprocess
        pass

    def _create_pipeline(self):
        # Create and return the pipeline here
        raise NotImplementedError("The _create_pipeline method must be implemented by subclasses")

    def start(self, retry_interval=5, max_retries=-1):
        retries = 0
        should_retry = True
        self._stop_requested = False
        while should_retry and (max_retries == -1 or retries < max_retries) and not self._stop_requested:
            self._bus_call_received_stop_signal = False
            self.pipeline = self._create_pipeline()
            if not self.pipeline:
                sys.stderr.write("Unable to create pipeline\n")
                return

            self.loop = GObject.MainLoop()
            bus = self.pipeline.get_bus()
            bus.add_signal_watch()
            bus.connect("message", self._bus_call, self.loop)
            self.pipeline.set_state(Gst.State.PLAYING)
            try:
                self.loop.run()
            except BaseException:
                pass
            should_retry = self._bus_call_received_stop_signal
            if should_retry:
                self.pipeline.set_state(Gst.State.NULL)
                print(f"Retrying in {retry_interval} seconds...")
                time.sleep(retry_interval)
                retries += 1
            else:
                break

    def stop(self):
        self._stop_requested = True
        print("Send EoS")
        Gst.Element.send_event(self.pipeline, Gst.Event.new_eos())
        # wait until EOS or error
        bus = self.pipeline.get_bus()
        msg = bus.timed_pop_filtered(
            Gst.CLOCK_TIME_NONE, Gst.MessageType.EOS)
        # free resources
        print("Switch to NULL state")
        self.pipeline.set_state(Gst.State.NULL)

Here is the related log file:
1.zip (10.2 MB)

  1. This project is somewhat complex, and it’s challenging to minimize the code to reproduce the error effectively. I have referred to some minimal examples that run without errors, such as: Nvarguscamera crashes after frequent restarts - #12 by DaneLLL

  2. My DeepStream version is 6.0, and my project needs to select whether to perform inference in real-time. One way is to dynamically modify the inference interval parameter, setting it to a very large value when inference is not needed, but version 6.0 seems not to support this modification. Based on my current research, the best solution should be dynamically adding and removing pipelines. Therefore, I attempted to run the deepstream_rt_src_add_del.py from the corresponding version of deepstream_python_apps to learn this method, but I cannot run this sample program normally. I have tried to minimize this program, removing the inference and tracking steps, but an error is reported when adding to the third source. Below are the modified files and the error log files:
    2.zip (13.0 MB)
    deepstream_rt_src_add_del.zip (5.3 KB)

  1. My issue of failing to start after stopping seems to be related to the nvtracker not being correctly released. The problem was resolved after I removed the tracker from the pipeline. How should I operate the nvtracker to release it using the Python SDK? I incorporated the rotated boxes into the standard DeepStream bounding boxes for tracking using the following method:
def add_obj_meta_to_frame(frame_object, batch_meta, frame_meta, label_names, camera_width, camera_height):
    left = min(frame_object[0], frame_object[2], frame_object[4], frame_object[6])
    top = min(frame_object[1], frame_object[3], frame_object[5], frame_object[7])
    right = max(frame_object[0], frame_object[2], frame_object[4], frame_object[6])
    bottom = max(frame_object[1], frame_object[3], frame_object[5], frame_object[7])
    obj_meta = pyds.nvds_acquire_obj_meta_from_pool(batch_meta)
    rect_params = obj_meta.rect_params
    if camera_width < camera_height:
        rect_params.left = left + ((camera_height - camera_width) / 2)
        rect_params.top = top
    else:
        rect_params.left = left
        rect_params.top = top + ((camera_width - camera_height) / 2)
    rect_params.width = right - left
    rect_params.height = bottom - top
    # rect_params.has_bg_color = 1
    # rect_params.bg_color.set(1, 1, 0, 0.4)
    rect_params.border_width = 3
    rect_params.border_color.set(0, 1, 0, 1)
    obj_meta.confidence = frame_object[8]
    obj_meta.class_id = frame_object[9]
    obj_meta.object_id = UNTRACKED_OBJECT_ID
    lbl_id = int(frame_object[9].item())
    if lbl_id >= len(label_names):
        lbl_id = 0
    obj_meta.obj_label = label_names[lbl_id]
    txt_params = obj_meta.text_params
    if txt_params.display_text:
        pyds.free_buffer(txt_params.display_text)
    if camera_width < camera_height:
        txt_params.x_offset = min(camera_width, max(0, int(rect_params.left + ((camera_height - camera_width) / 2) + (rect_params.width / 2))))
        txt_params.y_offset = min(camera_height, max(0, int(rect_params.top + (rect_params.height / 2))))
    else:
        txt_params.x_offset = min(camera_width, max(0, int(rect_params.left + (rect_params.width / 2))))
        txt_params.y_offset = min(camera_height, max(0, int(rect_params.top + ((camera_width - camera_height) / 2) + (rect_params.height / 2))))
    txt_params.display_text = (label_names[lbl_id] + " " + "{:04.2f}".format(frame_object[8]))
    txt_params.font_params.font_name = "Serif"
    txt_params.font_params.font_size = 60
    txt_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
    txt_params.set_bg_clr = 1
    txt_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
    pyds.nvds_add_obj_meta_to_frame(frame_meta, obj_meta, None)

Obtain the results of nvtracker in the osd probe :

l_obj = frame_meta.obj_meta_list
tracker_bbox = []
while l_obj is not None:
    try:
        obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
    except StopIteration:
        break
    tracker_bbox.append([obj_meta.rect_params.left, obj_meta.rect_params.top, obj_meta.rect_params.width, obj_meta.rect_params.height, obj_meta.object_id])
    try:
        l_obj = l_obj.next
    except StopIteration:
        break
  1. Please help solve the issue where the deepstream_rt_src_add_del.py program cannot execute. I still wish to control the start and stop dynamically using this method.

please refer to this topic. nvinfer is opensource. you can customize it.

1 Like

Thank you for your reply. I eventually found out that the cause of the anomaly was due to the Hikvision camera. This brand of camera seems to have some issues with protocol compatibility, as discussed here. Switching to another brand of camera works well.
Customizing ‘nvinfer’ can solve this problem in another way.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.