Will typical threading help the block problem of Probe Function in Deepstream

• Hardware Platform (Jetson / GPU): Jetson NX Dev Kit
• DeepStream Version: 6.0.1
• JetPack Version (valid for Jetson only): 4.6 GA

Hi Sir/Madam:

This question is after the question: Will queue and Appsink help the block problem of Probe Function in Deepstream, and question: Will multiple probes in different threads help the block problem of probe Function in main pipeline of Deepstream.

In my project, I was also confused with heavy calculation time within probe function. And like above question mentioned, and here , it is confirmed that probe() callbacks are synchronous and thus holds the buffer (info.get_buffer()) from traversing the pipeline until user return.

So, I tried three ways to see whether we could build a “new” thread and put all heavy codes inside. In this question, I will only explain the third way, by using python threading directly to create new thread, and put heavy calculation codes inside. Test results works as expected. I will first explain what I did, and then show all codes at the end, so that you could help to re-produce results.

I am wondering whether anyone could help to review my codes and results to see they are correct. I see I am not the only person who is confused with this probe issue. Maybe solution below can give some help.

Logics behind

  1. First, we need to create a pipeline like below:

So, this pipeline is exactly same as test 1 of Deepstream python example.

  1. What’s different is, we created a new thread from object Post_Processing, and run heavy calculations inside.

  2. Main thought is: For each loop in main thread, probe function will send some data to object Post_Processing. But in object Post_Processing, it contains a while loop. Each loop consumes 1 second, mainly to simulate the heavy calculation condition. Inside this loop, we will get metadata, do some postprocessing, and generate new metadata. In main thread, probe function will receive data generated after postprocessing and display on screen.

We hope video can still display in real time, seems everything works good.

How to Reproduce

These code are based on deepstream_test_1.py. After copy and paste codes below, you could try to reproduce by typing python3 deepstream_test_1.py /opt/nvidia/deepstream/deepstream-6.0/samples/streams/sample_qHD.h264 in terminal. I did’t use container.

Codes

Codes for deepstream_test_1.py:

#!/usr/bin/env python3

import sys
sys.path.append('../')
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
from common.is_aarch_64 import is_aarch64
from common.bus_call import bus_call
from post_processing import PostProcessing

import pyds
import os
os.environ["GST_DEBUG_DUMP_DOT_DIR"] = "/tmp"
os.putenv('GST_DEBUG_DUMP_DIR_DIR', '/tmp')

PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3


def osd_sink_pad_buffer_probe(pad,info,u_data):

    frame_number=0
    num_rects=0
    objectdetect = {} 

    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return

    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list
    while l_frame is not None:
        try:
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        frame_number=frame_meta.frame_num
        num_rects = frame_meta.num_obj_meta
        l_obj=frame_meta.obj_meta_list
        while l_obj is not None:
            try:
                obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)
            except StopIteration:
                break
            BB_info = obj_meta.rect_params
            objectdetect.update({obj_meta.class_id: [BB_info.left,BB_info.top,BB_info.left+BB_info.width,BB_info.top+BB_info.height]})
            obj_meta.rect_params.border_color.set(0.0, 0.0, 1.0, 0.0)
            try: 
                l_obj=l_obj.next
            except StopIteration:
                break
        # Send matadata from probe function to post_processing object
        pp.pushAllMeta(frame_number, num_rects, objectdetect)


        # Get metadata from post_processing object to probe function.
        # Notice, data sent above is not same as data received here.
        # Data received here is after post prcessing by post_processing object.
        display_frameNum, display_numRects, display_objCounter = pp.getMeta()
        display_meta=pyds.nvds_acquire_display_meta_from_pool(batch_meta)
        display_meta.num_labels = 1
        py_nvosd_text_params = display_meta.text_params[0]
        py_nvosd_text_params.display_text = "Frame Number={} Number of Objects={} Vehicle_count={} Person_count={}".format(display_frameNum, display_numRects, display_objCounter[PGIE_CLASS_ID_VEHICLE], display_objCounter[PGIE_CLASS_ID_PERSON])
        py_nvosd_text_params.x_offset = 10
        py_nvosd_text_params.y_offset = 12
        py_nvosd_text_params.font_params.font_name = "Serif"
        py_nvosd_text_params.font_params.font_size = 10
        py_nvosd_text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
        py_nvosd_text_params.set_bg_clr = 1
        py_nvosd_text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
        print(pyds.get_string(py_nvosd_text_params.display_text))
        pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
        try:
            l_frame=l_frame.next
        except StopIteration:
            break
			
    return Gst.PadProbeReturn.OK

def main(args):

    global pp

    if len(args) != 2:
        sys.stderr.write("usage: %s <media file or uri>\n" % args[0])
        sys.exit(1)
    GObject.threads_init()
    Gst.init(None)

    print("Creating Pipeline \n ")
    pipeline = Gst.Pipeline()

    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")

    print("Creating Source \n ")
    source = Gst.ElementFactory.make("filesrc", "file-source")
    if not source:
        sys.stderr.write(" Unable to create Source \n")

    print("Creating H264Parser \n")
    h264parser = Gst.ElementFactory.make("h264parse", "h264-parser")
    if not h264parser:
        sys.stderr.write(" Unable to create h264 parser \n")

    print("Creating Decoder \n")
    decoder = Gst.ElementFactory.make("nvv4l2decoder", "nvv4l2-decoder")
    if not decoder:
        sys.stderr.write(" Unable to create Nvv4l2 Decoder \n")

    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        sys.stderr.write(" Unable to create NvStreamMux \n")

    pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    if not pgie:
        sys.stderr.write(" Unable to create pgie \n")

    nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
    if not nvvidconv:
        sys.stderr.write(" Unable to create nvvidconv \n")

    nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")

    if not nvosd:
        sys.stderr.write(" Unable to create nvosd \n")

    if is_aarch64():
        transform = Gst.ElementFactory.make("nvegltransform", "nvegl-transform")

    print("Creating EGLSink \n")
    sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
    if not sink:
        sys.stderr.write(" Unable to create egl sink \n")

    print("Playing file %s " %args[1])
    source.set_property('location', args[1])
    streammux.set_property('width', 1920)
    streammux.set_property('height', 1080)
    streammux.set_property('batch-size', 1)
    streammux.set_property('batched-push-timeout', 4000000)
    pgie.set_property('config-file-path', "dstest1_pgie_config.txt")

    print("Adding elements to Pipeline \n")
    pipeline.add(source)
    pipeline.add(h264parser)
    pipeline.add(decoder)
    pipeline.add(streammux)
    pipeline.add(pgie)
    pipeline.add(nvvidconv)
    pipeline.add(nvosd)
    pipeline.add(sink)
    if is_aarch64():
        pipeline.add(transform)

    # we link the elements together
    # file-source -> h264-parser -> nvh264-decoder ->
    # nvinfer -> nvvidconv -> nvosd -> video-renderer
    print("Linking elements in the Pipeline \n")
    source.link(h264parser)
    h264parser.link(decoder)

    sinkpad = streammux.get_request_pad("sink_0")
    if not sinkpad:
        sys.stderr.write(" Unable to get the sink pad of streammux \n")
    srcpad = decoder.get_static_pad("src")
    if not srcpad:
        sys.stderr.write(" Unable to get source pad of decoder \n")
    srcpad.link(sinkpad)
    streammux.link(pgie)
    pgie.link(nvvidconv)
    nvvidconv.link(nvosd)
    if is_aarch64():
        nvosd.link(transform)
        transform.link(sink)
    else:
        nvosd.link(sink)

    # create an event loop and feed gstreamer bus mesages to it
    loop = GObject.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect ("message", bus_call, loop)

    # Lets add probe to get informed of the meta data generated, we add probe to
    # the sink pad of the osd element, since by that time, the buffer would have
    # had got all the metadata.
    osdsinkpad = nvosd.get_static_pad("sink")
    if not osdsinkpad:
        sys.stderr.write(" Unable to get sink pad of nvosd \n")

    pp = PostProcessing(fps=25)
    pp.start()

    osdsinkpad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0)

    Gst.debug_bin_to_dot_file(pipeline, Gst.DebugGraphDetails.ALL, "pipeline")
    
    # start play back and listen to events
    print("Starting pipeline \n")
    pipeline.set_state(Gst.State.PLAYING)
    try:
        loop.run()
    except:
        pass
    # cleanup
    pipeline.set_state(Gst.State.NULL)

if __name__ == '__main__':
    sys.exit(main(sys.argv))

Codes for post_processing.py

import threading
import time

PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3

class PostProcessing(object):

    def __init__(self, fps=30):
        self.frame_number = 0
        self.frame_number_ = 0
        self.num_rects = 0
        self.num_rects_ = 0
        self.obj_counter = 0
        self.obj_counter_ = 0
        self.__stop = False
        self.__lock = threading.Lock()
        self.fps = fps
        self.objectdetect_ = None
        self.objectdetect = None
    
    # Create a new thread which with daemon=True. Means that it will stop when the program stops. (__run function contains a while loop. Thus, this new thread will
    # not stop until the program is end.)
    def start(self):
        self.__stop = False
        self.__thread = threading.Thread(name='TESTAAA', target=self.__run, daemon=True)
        self.__thread.start()
    
    # We could also stop this new thread by using stop function.
    def stop(self):
        self.__stop = True
        if self.__thread is not None:
            self.__thread.join()

    # Meta data are pushed from probe function to this object.
    def pushAllMeta(self, frame_number, num_rects, objectdetect):
        with self.__lock:
            self.frame_number_ = frame_number
            self.num_rects_ = num_rects
            self.objectdetect_ = objectdetect

    # Send the metadata from this object back to probe function.
    # These metadata are generated by nextMeta function when going through __run.
    def getMeta(self):
        return self.frame_number, self.num_rects, self.obj_counter

    # It means, we will create new thread and run while loop inside.
    # Time for each loop, from current test, depends on sleep time value, i.e. 1sec.
    # Also inside each loop, we will first get data from which sent from probe (in nextMeta function),
    # then do some post processing (in postProcessingMain function),
    # finally go through time.sleep, to simulate heavy calculation condition.
    def __run(self):
        prev_time = 0

        while not self.__stop:
            t1 = time.time()
            # Get next batch meta
            self.nextMeta()
            # main codes
            self.postProcessingMain()

            t2 = time.time()
            # time to wait [s] to fulfill input fps
            wait_time = 1 / self.fps - (t2 - t1)
            #print("wait_time: ",wait_time)
            # wait until
            time.sleep(1)
            #print(time.time())
            #time.sleep(max(0, wait_time))

    def nextMeta(self):
        self.frame_number = self.frame_number_
        self.num_rects = self.num_rects_
        self.objectdetect = self.objectdetect_
    
    def postProcessingMain(self):

        self.obj_counter = {
            PGIE_CLASS_ID_VEHICLE:0,
            PGIE_CLASS_ID_PERSON:0,
            PGIE_CLASS_ID_BICYCLE:0,
            PGIE_CLASS_ID_ROADSIGN:0
        }
        if self.objectdetect == None:
            return
        else:
            for key, value in self.objectdetect.items():
                self.obj_counter[key] += 1

Thanks a lot.

Best
Yichao

2 Likes

We will make a unified analysis about this topic on https://forums.developer.nvidia.com/t/will-queue-and-appsink-help-the-block-problem-of-probe-function-in-deepstream/218928 Thanks

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.