Will multiple probes in different threads help the block problem of probe Function in main pipeline of Deepstream

• Hardware Platform (Jetson / GPU): Jetson NX Dev Kit
• DeepStream Version: 6.0.1
• JetPack Version (valid for Jetson only): 4.6 GA

Background

This question starts from this old question: Does the Probe Function in Deepstream have to be blocking? ane the question I just raised yesterday: Will queue and Appsink help the block problem of Probe Function in Deepstream.

In my project, I was also confused with heavy calculation time within probe function. And like above question mentioned, and here , it is confirmed that probe() callbacks are synchronous and thus holds the buffer (info.get_buffer()) from traversing the pipeline until user return.

So, I tried three ways to see whether we could build a “new” thread and put all heavy codes inside. In this question, I will only explain the second way, by using tee, queue, fakesink, and write heavy codes inside probe function towards fakesink.

From my test results, it looks like both probe() callbacks are synchronous, which means, slow calculation in any probe function will also influence the real time video stream display, no matter related modules are running in different thread or not. I will first explain what I did, and then show all codes at the end, so that you could help to re-produce results.

The test result could be reasonable, but I am wondering whether anyone could help to review my codes and results and confirm my test result and principle behind are correct.

Logic behind

First let’s generate the pipeline, as followed:

The main part of this pipeline is zoomed in as followed:

So, I tried to create tee after nvinfer and gsdvideoconvert, before osd. For one queue, it is connected to osd, and the other queue is connected to fakesink. Property of the second queue is 2, with parameter Leaky. (I also tried to run the codes without setting this property, no difference from the result).

Next, I created a probe towards osd:

# Add probe in osd module
osdsinkpad = nvosd.get_static_pad("sink")
osdsinkpad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0)

And another probe towards fakesink:

# Add probe in fakesink module
fakesinkpad = fakesink.get_static_pad("sink")
fakesinkpad.add_probe(Gst.PadProbeType.BUFFER, fakesink_pad_buffer_probe, 0)

In fakesink probe, I tried to run heavy codes, for example, add time.sleep(1) to make the function runs over 1 second. And I tried to parse some data calculated from fakesink probe to osd probe. I hope after running the code, I still could observe real time video processing video displayed on local screen. But the result shows that both probe() callbacks are synchronous.

How to Reproduce

These code are based on deepstream_test_1.py. After copy and paste codes below, you could try to reproduce by typing python3 deepstream_test_1.py /opt/nvidia/deepstream/deepstream-6.0/samples/streams/sample_qHD.h264 in terminal. I did’t use container.

Codes

Codes for deepstream_test_1.py:

#!/usr/bin/env python3

################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2019-2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import sys
sys.path.append('../')
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
from common.is_aarch_64 import is_aarch64
from common.bus_call import bus_call
from post_processing import PostProcessing
import time

import pyds
import os
os.environ["GST_DEBUG_DUMP_DOT_DIR"] = "/tmp"
os.putenv('GST_DEBUG_DUMP_DIR_DIR', '/tmp')

PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3


def osd_sink_pad_buffer_probe(pad,info,u_data):
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list
    while l_frame is not None:
        try:
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break
        # Here we will get data from PostProcessing object.
        # Those data are generated by fakesink_pad_buffer_probe
        display_frameNum, display_numRects, display_objCounter = pp.getMeta()

        display_meta=pyds.nvds_acquire_display_meta_from_pool(batch_meta)
        display_meta.num_labels = 1
        py_nvosd_text_params = display_meta.text_params[0]
        py_nvosd_text_params.display_text = "Frame Number={} Number of Objects={} Vehicle_count={} Person_count={}".format(display_frameNum, display_numRects, display_objCounter[PGIE_CLASS_ID_VEHICLE], display_objCounter[PGIE_CLASS_ID_PERSON])
        py_nvosd_text_params.x_offset = 10
        py_nvosd_text_params.y_offset = 12
        py_nvosd_text_params.font_params.font_name = "Serif"
        py_nvosd_text_params.font_params.font_size = 10
        py_nvosd_text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
        py_nvosd_text_params.set_bg_clr = 1
        py_nvosd_text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
        print(pyds.get_string(py_nvosd_text_params.display_text))
        pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
        try:
            l_frame=l_frame.next
        except StopIteration:
            break
			
    return Gst.PadProbeReturn.OK	

def fakesink_pad_buffer_probe(pad,info,u_data):

    obj_counter = {
        PGIE_CLASS_ID_VEHICLE:0,
        PGIE_CLASS_ID_PERSON:0,
        PGIE_CLASS_ID_BICYCLE:0,
        PGIE_CLASS_ID_ROADSIGN:0
    }
    objectdetect = {} 
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list
    while l_frame is not None:
        try:
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        frame_number=frame_meta.frame_num
        num_rects = frame_meta.num_obj_meta
        l_obj=frame_meta.obj_meta_list
        while l_obj is not None:
            try:
                obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)
            except StopIteration:
                break
            BB_info = obj_meta.rect_params
            # This line of code is not correct, but does not metter, not influence our testing purpose.
            objectdetect.update({obj_meta.class_id: [BB_info.left,BB_info.top,BB_info.left+BB_info.width,BB_info.top+BB_info.height]})
            obj_meta.rect_params.border_color.set(0.0, 0.0, 1.0, 0.0)
            obj_counter[obj_meta.class_id] += 1
            try: 
                l_obj=l_obj.next
            except StopIteration:
                break
        
        # Here we push metadata generated in this function to PostProcessing object, and sleep for 1 second.
        # We would see whether system will be influenced.
        pp.pushAllMeta(frame_number, num_rects, objectdetect, obj_counter)
        time.sleep(1)

        try:
            l_frame=l_frame.next
        except StopIteration:
            break
			
    return Gst.PadProbeReturn.OK	

def main(args):

    global pp

    # Check input arguments
    if len(args) != 2:
        sys.stderr.write("usage: %s <media file or uri>\n" % args[0])
        sys.exit(1)

    GObject.threads_init()
    Gst.init(None)

    print("Creating Pipeline \n ")
    pipeline = Gst.Pipeline()

    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")

    print("Creating Source \n ")
    source = Gst.ElementFactory.make("filesrc", "file-source")
    if not source:
        sys.stderr.write(" Unable to create Source \n")

    print("Creating H264Parser \n")
    h264parser = Gst.ElementFactory.make("h264parse", "h264-parser")
    if not h264parser:
        sys.stderr.write(" Unable to create h264 parser \n")

    print("Creating Decoder \n")
    decoder = Gst.ElementFactory.make("nvv4l2decoder", "nvv4l2-decoder")
    if not decoder:
        sys.stderr.write(" Unable to create Nvv4l2 Decoder \n")

    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        sys.stderr.write(" Unable to create NvStreamMux \n")

    pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    if not pgie:
        sys.stderr.write(" Unable to create pgie \n")

    nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
    if not nvvidconv:
        sys.stderr.write(" Unable to create nvvidconv \n")

    # splits data to multiple pads, like multi-thread.
    tee=Gst.ElementFactory.make("tee", "nvconvertor-tee")                        
    if not tee:
        sys.stderr.write(" Unable to create tee DS pipeline module.")

    # Queue is created to connect tee and osd
    queue1=Gst.ElementFactory.make("queue", "nvtee-que1")
    if not queue1:
        sys.stderr.write(" Unable to create queue1 DS pipeline module.")

    nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
    if not nvosd:
        sys.stderr.write(" Unable to create nvosd \n")
    
    # Queue is created to connect tee and fakesink
    queue2=Gst.ElementFactory.make("queue", "nvtee-que2")
    queue2.set_property("leaky", 2)
    if not queue2:
        sys.stderr.write(" Unable to create queue2 DS pipeline module.")

    # Finally render the osd output
    if is_aarch64():
        transform = Gst.ElementFactory.make("nvegltransform", "nvegl-transform")

    print("Creating EGLSink \n")
    sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
    if not sink:
        sys.stderr.write(" Unable to create egl sink \n")
    
    fakesink = Gst.ElementFactory.make("fakesink", "fakesink")              # fakesink is a kind of video output which output nothing.
    fakesink.set_property("sync", False)

    print("Playing file %s " %args[1])
    source.set_property('location', args[1])
    streammux.set_property('width', 1920)
    streammux.set_property('height', 1080)
    streammux.set_property('batch-size', 1)
    streammux.set_property('batched-push-timeout', 4000000)
    pgie.set_property('config-file-path', "dstest1_pgie_config.txt")

    print("Adding elements to Pipeline \n")
    pipeline.add(source)
    pipeline.add(h264parser)
    pipeline.add(decoder)
    pipeline.add(streammux)
    pipeline.add(pgie)
    pipeline.add(nvvidconv)
    pipeline.add(tee)
    pipeline.add(queue1)
    pipeline.add(queue2)
    pipeline.add(nvosd)
    pipeline.add(sink)
    pipeline.add(fakesink)
    if is_aarch64():
        pipeline.add(transform)

    # we link the elements together
    # file-source -> h264-parser -> nvh264-decoder ->
    # nvinfer -> nvvidconv -> nvosd -> video-renderer
    print("Linking elements in the Pipeline \n")
    source.link(h264parser)
    h264parser.link(decoder)

    sinkpad = streammux.get_request_pad("sink_0")
    if not sinkpad:
        sys.stderr.write(" Unable to get the sink pad of streammux \n")
    srcpad = decoder.get_static_pad("src")
    if not srcpad:
        sys.stderr.write(" Unable to get source pad of decoder \n")
    srcpad.link(sinkpad)
    streammux.link(pgie)
    pgie.link(nvvidconv)
    # Link the pipeline, tee, queue, fakesink parts.
    nvvidconv.link(tee)
    src_tee_osd_pad  = tee.get_request_pad("src_%u")
    src_tee_fakesink_pad  = tee.get_request_pad("src_%u")
    sink_osd_pad = queue1.get_static_pad("sink")
    sink_fakesink_pad = queue2.get_static_pad("sink")
    src_tee_osd_pad.link(sink_osd_pad)
    src_tee_fakesink_pad.link(sink_fakesink_pad)
    
    queue1.link(nvosd)
    queue2.link(fakesink)
    if is_aarch64():
        nvosd.link(transform)
        transform.link(sink)
    else:
        nvosd.link(sink)

    # PostProcessing object
    pp = PostProcessing()
    loop = GObject.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect ("message", bus_call, loop)

    # Add probe in osd module
    osdsinkpad = nvosd.get_static_pad("sink")
    osdsinkpad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0)
    # Add probe in fakesink module
    fakesinkpad = fakesink.get_static_pad("sink")
    fakesinkpad.add_probe(Gst.PadProbeType.BUFFER, fakesink_pad_buffer_probe, 0)

    Gst.debug_bin_to_dot_file(pipeline, Gst.DebugGraphDetails.ALL, "pipeline")
    print("Starting pipeline \n")
    pipeline.set_state(Gst.State.PLAYING)
    try:
        loop.run()
    except:
        pass
    # cleanup
    pipeline.set_state(Gst.State.NULL)

if __name__ == '__main__':
    sys.exit(main(sys.argv))

Codes for post_processing.py

import threading
import time

PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3

class PostProcessing(object):

    def __init__(self):
        self.objectdetect = None
        self.frame_number = 0
        self.num_rects = 0
        self.obj_counter = 0

    def pushAllMeta(self, frame_number, num_rects, objectdetect, obj_counter):
        self.frame_number = frame_number
        self.num_rects = num_rects
        self.objectdetect = objectdetect
        self.obj_counter = obj_counter

    def getMeta(self):
        return self.frame_number, self.num_rects, self.obj_counter

Can anyone help to explain principles behind.
Thanks a lot.

Best
Yichao

We will make a unified analysis about this topic onhttps://forums.developer.nvidia.com/t/will-queue-and-appsink-help-the-block-problem-of-probe-function-in-deepstream/218928 Thanks

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.