Will queue and Appsink help the block problem of Probe Function in Deepstream

• Hardware Platform (Jetson / GPU): Jetson NX Dev Kit
• DeepStream Version: 6.0.1
• JetPack Version (valid for Jetson only): 4.6 GA

Hi Sir/Madam:

Background

This question starts from this old question: Does the Probe Function in Deepstream have to be blocking?. In my project, I was also confused with heavy calculation time within probe function. And like above question mentioned, and here, it is confirmed that probe() callbacks are synchronous and thus holds the buffer (info.get_buffer()) from traversing the pipeline until user return.

So, I tried three ways to see whether we could build a “new” thread and put all heavy codes inside. In this question, I will only explain the first way, by using tee, queue, Appsink. From my test results, I didn’t see the function of Appsink works as expected. I will first explain what I did, and then show all codes at the end, so that you could help to re-produce results.

I am wondering whether anyone could help to review my codes and results to see whether Appsink can help the block problem of Probe Function in Deepstream or not.

Logics behind

  1. First, we need to create a pipeline like below:

Core part of above image:

So, I tried to create tee after nvinfer and gsdvideoconvert, before osd. For one queue, it is connected to osd, and the other queue is connected to Appsink. Property of the second queue is 2, with parameter Leaky.

  1. Next, inside function new_buffer, which is connected by Appsink: appsink.connect("new-sample", new_buffer), I tried to write some delays, in order to simulate the situation where heavy calculations inside Appsink.

  2. Probe function is created towards osd: osdsinkpad = nvosd.get_static_pad("sink"); osdsinkpad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0).

There are some details missed, but I will paste all codes below. I would expect to see real time video displayed in local screen. But it is not, after I tried many times.

How to Reproduce

These code are based on deepstream_test_1.py. After copy and paste codes below, you could try to reproduce by typing python3 deepstream_test_1.py /opt/nvidia/deepstream/deepstream-6.0/samples/streams/sample_qHD.h264 in terminal. I did’t use container.

Codes

Codes for deepstream_test_1.py:

#!/usr/bin/env python3

import sys
sys.path.append('../')
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
from common.is_aarch_64 import is_aarch64
from common.bus_call import bus_call
from post_processing import PostProcessing
import time
import cv2
import numpy

import pyds
import os
os.environ["GST_DEBUG_DUMP_DOT_DIR"] = "/tmp"
os.putenv('GST_DEBUG_DUMP_DIR_DIR', '/tmp')

PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3

# Notice,I deleted original comments since nothing changed. And add new comments for changes.
# In this secario, I set 1 sec delay in Appsink new_buffer function, to simulate heavy calculation condition.
# If Appsink calculation speed does not influence the main pipeline system work, then you should expect the video stream display on local screen with real time
# and print message display in terminal with FPS frequency.
# However, what I see is the stuck screen, and print message with frequency 1 Hz.
def osd_sink_pad_buffer_probe(pad,info,u_data):

    frame_number=0
    num_rects=0
    objectdetect = {} 
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return

    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list
    while l_frame is not None:
        try:
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        frame_number=frame_meta.frame_num
        num_rects = frame_meta.num_obj_meta
        l_obj=frame_meta.obj_meta_list
        while l_obj is not None:
            try:
                obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)
            except StopIteration:
                break
            BB_info = obj_meta.rect_params
            objectdetect.update({obj_meta.class_id: [BB_info.left,BB_info.top,BB_info.left+BB_info.width,BB_info.top+BB_info.height]})
            obj_meta.rect_params.border_color.set(0.0, 0.0, 1.0, 0.0)
            try: 
                l_obj=l_obj.next
            except StopIteration:
                break
        # Send data to post_processing object.
        pp.pushAllMeta(frame_number, num_rects, objectdetect)

        # Get data from post_processing object.
        # Notice, although pushAllMeta function and getMeta function are just in consequent line, what the metadata get
        # from getMeta function is in fact the results after postProcessingMain function.
        display_frameNum, display_numRects, display_objCounter = pp.getMeta()
        if display_objCounter != None:
            display_meta=pyds.nvds_acquire_display_meta_from_pool(batch_meta)
            display_meta.num_labels = 1
            py_nvosd_text_params = display_meta.text_params[0]
            # Display the parameters after getMeta function.
            py_nvosd_text_params.display_text = "Frame Number={} Number of Objects={} Vehicle_count={} Person_count={}".format(display_frameNum, display_numRects, display_objCounter[PGIE_CLASS_ID_VEHICLE], display_objCounter[PGIE_CLASS_ID_PERSON])
            py_nvosd_text_params.x_offset = 10
            py_nvosd_text_params.y_offset = 12
            py_nvosd_text_params.font_params.font_name = "Serif"
            py_nvosd_text_params.font_params.font_size = 10
            py_nvosd_text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
            py_nvosd_text_params.set_bg_clr = 1
            py_nvosd_text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
            # Display on terminal.
            print(pyds.get_string(py_nvosd_text_params.display_text))
            pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
        try:
            l_frame=l_frame.next
        except StopIteration:
            break
			
    return Gst.PadProbeReturn.OK

# Not used in order to make test results more clear.
def gst_to_opencv(sample):
    buf = sample.get_buffer()
    caps = sample.get_caps()
    print (caps.get_structure(0).get_value('format'))
    print (caps.get_structure(0).get_value('height'))
    print (caps.get_structure(0).get_value('width'))
    print (buf.get_size())
    arr = numpy.ndarray(
        (caps.get_structure(0).get_value('height'),
         caps.get_structure(0).get_value('width'),
         3),
        buffer=buf.extract_dup(0, buf.get_size()),
        dtype=numpy.uint8)
    return arr

# This function is Appsink buffer function. Main usage of postProcessingMain is just to delay 1 second per time.
def new_buffer(sink):
    global image_arr
    sample = sink.emit("pull-sample")
    # buf = sample.get_buffer()
    # print "Timestamp: ", buf.pts
    #arr = gst_to_opencv(sample)
    #image_arr = arr
    pp.postProcessingMain()
    return Gst.FlowReturn.OK

def main(args):

    global pp

    if len(args) != 2:
        sys.stderr.write("usage: %s <media file or uri>\n" % args[0])
        sys.exit(1)
    GObject.threads_init()
    Gst.init(None)
    print("Creating Pipeline \n ")
    pipeline = Gst.Pipeline()

    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")
    print("Creating Source \n ")
    source = Gst.ElementFactory.make("filesrc", "file-source")
    if not source:
        sys.stderr.write(" Unable to create Source \n")
    print("Creating H264Parser \n")

    h264parser = Gst.ElementFactory.make("h264parse", "h264-parser")
    if not h264parser:
        sys.stderr.write(" Unable to create h264 parser \n")

    print("Creating Decoder \n")
    decoder = Gst.ElementFactory.make("nvv4l2decoder", "nvv4l2-decoder")
    if not decoder:
        sys.stderr.write(" Unable to create Nvv4l2 Decoder \n")

    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        sys.stderr.write(" Unable to create NvStreamMux \n")

    pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    if not pgie:
        sys.stderr.write(" Unable to create pgie \n")

    nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
    if not nvvidconv:
        sys.stderr.write(" Unable to create nvvidconv \n")
    # splits data to multiple pads, like multi-thread.
    tee=Gst.ElementFactory.make("tee", "nvconvertor-tee")                       
    if not tee:
        sys.stderr.write(" Unable to create tee DS pipeline module.")
    # Queue to main pipeline
    queue1=Gst.ElementFactory.make("queue", "nvtee-que1")
    if not queue1:
        sys.stderr.write(" Unable to create queue1 DS pipeline module.")

    nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
    if not nvosd:
        sys.stderr.write(" Unable to create nvosd \n")
    # Queue to Appsink
    queue2=Gst.ElementFactory.make("queue", "nvtee-que2")
    # Set the leaky to 2. I also tried to set it into 0. I didn't see difference.
    queue2.set_property("leaky", 2)
    if not queue2:
        sys.stderr.write(" Unable to create queue2 DS pipeline module.")

    if is_aarch64():
        transform = Gst.ElementFactory.make("nvegltransform", "nvegl-transform")

    print("Creating EGLSink \n")
    sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
    if not sink:
        sys.stderr.write(" Unable to create egl sink \n")

    # Appsink. I also tried to set change/uncomment properties below, but I didn't see difference.
    appsink = Gst.ElementFactory.make("appsink", "sink")
    appsink.set_property("emit-signals",True)
    appsink.set_property("max-buffers", 2)
    appsink.set_property("drop", True)
    appsink.set_property("sync", False)

    # Uncomment first. Will not influence results.
    #caps = Gst.caps_from_string("video/x-raw, format=(string){BGR, GRAY8}; video/x-bayer,format=(string){rggb,bggr,grbg,gbrg}")
    #caps = Gst.caps_from_string("video/x-raw, format=(string)BGR")
    #appsink.set_property("caps", caps)
    appsink.connect("new-sample", new_buffer)

    print("Playing file %s " %args[1])
    source.set_property('location', args[1])
    streammux.set_property('width', 1920)
    streammux.set_property('height', 1080)
    streammux.set_property('batch-size', 1)
    streammux.set_property('batched-push-timeout', 4000000)
    pgie.set_property('config-file-path', "dstest1_pgie_config.txt")

    print("Adding elements to Pipeline \n")
    pipeline.add(source)
    pipeline.add(h264parser)
    pipeline.add(decoder)
    pipeline.add(streammux)
    pipeline.add(pgie)
    pipeline.add(nvvidconv)
    pipeline.add(tee)
    pipeline.add(queue1)
    pipeline.add(queue2)
    pipeline.add(nvosd)
    pipeline.add(sink)
    pipeline.add(appsink)
    if is_aarch64():
        pipeline.add(transform)

    # we link the elements together
    # file-source -> h264-parser -> nvh264-decoder ->
    # nvinfer -> nvvidconv -> nvosd -> video-renderer
    print("Linking elements in the Pipeline \n")
    source.link(h264parser)
    h264parser.link(decoder)

    sinkpad = streammux.get_request_pad("sink_0")
    if not sinkpad:
        sys.stderr.write(" Unable to get the sink pad of streammux \n")
    srcpad = decoder.get_static_pad("src")
    if not srcpad:
        sys.stderr.write(" Unable to get source pad of decoder \n")
    srcpad.link(sinkpad)
    streammux.link(pgie)
    pgie.link(nvvidconv)

    nvvidconv.link(tee)
    src_tee_osd_pad  = tee.get_request_pad("src_%u")
    src_tee_appsink_pad  = tee.get_request_pad("src_%u")
    sink_osd_pad = queue1.get_static_pad("sink")
    sink_appsink_pad = queue2.get_static_pad("sink")
    src_tee_osd_pad.link(sink_osd_pad)
    src_tee_appsink_pad.link(sink_appsink_pad)
    # link tee, queue and appsink
    queue1.link(nvosd)
    queue2.link(appsink)
    if is_aarch64():
        nvosd.link(transform)
        transform.link(sink)
    else:
        nvosd.link(sink)

    # object of Postprocessing
    pp = PostProcessing()

    loop = GObject.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect ("message", bus_call, loop)
    osdsinkpad = nvosd.get_static_pad("sink")
    if not osdsinkpad:
        sys.stderr.write(" Unable to get sink pad of nvosd \n")

    osdsinkpad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0)

    #Gst.debug_bin_to_dot_file(pipeline, Gst.DebugGraphDetails.ALL, "pipeline")
    print("Starting pipeline \n")
    pipeline.set_state(Gst.State.PLAYING)
    try:
        loop.run()
    except:
        pass
    pipeline.set_state(Gst.State.NULL)

if __name__ == '__main__':
    sys.exit(main(sys.argv))

Codes for post_processing.py

import threading
import time

PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3

class PostProcessing(object):

    def __init__(self):
        self.objectdetect = None
        self.objectdetect_new = None
        self.frame_number = 0
        self.frame_number_new = 0
        self.num_rects = 0
        self.num_rects_new = 0
        self.obj_counter = None

    def pushAllMeta(self, frame_number, num_rects, objectdetect):
        self.frame_number_new = frame_number
        self.num_rects_new = num_rects
        self.objectdetect_new = objectdetect

    def getMeta(self):
        return self.frame_number, self.num_rects, self.obj_counter

    def postProcessingMain(self):

        self.obj_counter = {
            PGIE_CLASS_ID_VEHICLE:0,
            PGIE_CLASS_ID_PERSON:0,
            PGIE_CLASS_ID_BICYCLE:0,
            PGIE_CLASS_ID_ROADSIGN:0
        }
        if self.objectdetect_new == None:
            return
        else:
            for key, value in self.objectdetect_new.items():
                self.obj_counter[key] += 1
            self.frame_number = self.frame_number_new
            self.num_rects = self.num_rects_new
            self.objectdetect = self.objectdetect_new
            time.sleep(1)

Thanks a lot for your help.
Best
Yichao

1 Like

Hi @zyctimes , thanks for your debugging method, cause it may take some time to anylize your 3 topics, we will infer you as soon as we have a conclusion .

Hi @zyctimes , about the block problem, it may be determined by the Gstreamer’s architecture. You can refer the link below:
https://gstreamer.freedesktop.org/documentation/application-development/advanced/pipeline-manipulation.html?gi-language=c#data-probes
We use the GST_PAD_PROBE_TYPE_BUFFER type probe function, and the Gstreamer official recommendations are as follows:

Data probes run in the pipeline's streaming thread context, so callbacks should try to avoid blocking and generally, avoid doing weird stuff. 
Doing so could have a negative impact on the pipeline's performance or, in case of bugs, lead to deadlocks or crashes. 
More precisely, one should usually avoid calling GUI-related functions from within a probe callback, nor try to change the state of the pipeline. 
An application may post custom messages on the pipeline's bus to communicate with the main application thread and have it do things like stop the pipeline.

Hi Yuweiw:

Thanks a lot for your reply. I could understand what you mean. If logics inside probe function is too complex, it will influence the pipeline performance. Thus, I tried three ways to see whether we could use appsink, multiple probes, or threading to help the block problem of probe function.

My conclusion after test is: appsink and multiple probes will not help, but threading works. I am afraid it is difficult to explain why appsink and multiple probes do not work based on your reference of Gstreamer official recommendations. But from your experience in Deepstream, do you think three conclusions above are correct? Thanks a lot for your help.

Hi @zyctimes ,From the Gstreamer official recommendations about tee plugin:
https://gstreamer.freedesktop.org/documentation/coreelements/tee.html?gi-language=c#tee-page
If you use a queue plugin, the two braches after tee will not affect to each other. So I think one brach may be affect the Clock of the pipeline.
Could you just do a simple test to set the “async” property to FALSE for nveglglessink plugin?

sink.set_property('async', False)

Thanks

Hi @yuweiw :

Sorry for my late reply. I tried to set sink.set_property('sync', False) in appsink and multiple probes cases (async does not work, but sync works), the performance is improved, but the problem is still there.

So, I still set a time delay with 1 second (time.sleep(1)) in appsink, or another probe in another tee branch. Before adding this line of code, local video display is totally in a still frame. After adding this line of code, I can see local video is displayed with 1 second 1 frame.

I am wondering whether there are some other parameters need to set. I will also have some tries and update in the messages below. Thanks a lot for your help.

Best
Yichao

I also tried to change the value of batched-push-timeout of streammux and qos pf sink. But did not see difference.

streammux.set_property('batched-push-timeout', int(1000000/25))
sink.set_property("qos", False)

Hi @zyctimes , The tee plugin just splits data to multiple pads. It doesn’t copy data.
So if your plugin doesn’t copy data after tee, all the braches are sharing the same data. If you process the shared data in one branch, it will stuck the other branch too. If you want the braches don’t affect to each other. You can try to test your demo by copying the gstbuffer data after the queue plugin. Thanks

Hi @yuweiw, your comments should make sense. However, after my test, results still are same. What I did is:

In the multiple probes cases, for one probe, I just keep the codes same as original deepstream_test_1.py file, for the other probe, I only add a time delay inside:

def fakesink_pad_buffer_probe(pad,info,u_data):

    time.sleep(1)
    
    return Gst.PadProbeReturn.OK	

The local video display still show the one frame per second.

And, what do you mean splits data to multiple pads? Split data with 50-50?

Best
Yichao

Hi @zyctimes , No, it does not really split the real data. If you know about C program, you should know the concept about pointer. No matter how many braches there are after tee plugin, there only has one real data if you don’t copy the data deeply . So you are operating the data in one branch, it may affect the other. We suggest you learn something about the ‘deep copy’ and ‘pointer’ concept.

Hi @yuweiw, Umm, I know what you mean deep copy… I tried copy and deepcopy. Both doesn’t work. I will not continue trying then. If anyone who are interested in this topic, I have pasted all the codes related in the beginning, and you could have a try. Currently the result after I tested is: appsink and multiple probes will not help, but threading works. Feel free to add comments below if you found something different.

Hi @zyctimes , OK, Thanks for your trying.We will add this topic to the QA later. If anyone want to certify this, they can have a try.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.