• Hardware Platform (Jetson / GPU): Jetson NX Dev Kit
• DeepStream Version: 6.0.1
• JetPack Version (valid for Jetson only): 4.6 GA
Hi Sir/Madam:
This question is after the question: Will queue and Appsink help the block problem of Probe Function in Deepstream, and question: Will multiple probes in different threads help the block problem of probe Function in main pipeline of Deepstream.
In my project, I was also confused with heavy calculation time within probe function. And like above question mentioned, and here , it is confirmed that probe() callbacks are synchronous and thus holds the buffer (info.get_buffer()) from traversing the pipeline until user return.
So, I tried three ways to see whether we could build a “new” thread and put all heavy codes inside. In this question, I will only explain the third way, by using python threading directly to create new thread, and put heavy calculation codes inside. Test results works as expected. I will first explain what I did, and then show all codes at the end, so that you could help to re-produce results.
I am wondering whether anyone could help to review my codes and results to see they are correct. I see I am not the only person who is confused with this probe issue. Maybe solution below can give some help.
Logics behind
- First, we need to create a pipeline like below:
So, this pipeline is exactly same as test 1 of Deepstream python example.
-
What’s different is, we created a new thread from object
Post_Processing
, and run heavy calculations inside. -
Main thought is: For each loop in main thread, probe function will send some data to object
Post_Processing
. But in objectPost_Processing
, it contains a while loop. Each loop consumes 1 second, mainly to simulate the heavy calculation condition. Inside this loop, we will get metadata, do some postprocessing, and generate new metadata. In main thread, probe function will receive data generated after postprocessing and display on screen.
We hope video can still display in real time, seems everything works good.
How to Reproduce
These code are based on deepstream_test_1.py
. After copy and paste codes below, you could try to reproduce by typing python3 deepstream_test_1.py /opt/nvidia/deepstream/deepstream-6.0/samples/streams/sample_qHD.h264
in terminal. I did’t use container.
Codes
Codes for deepstream_test_1.py
:
#!/usr/bin/env python3
import sys
sys.path.append('../')
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
from common.is_aarch_64 import is_aarch64
from common.bus_call import bus_call
from post_processing import PostProcessing
import pyds
import os
os.environ["GST_DEBUG_DUMP_DOT_DIR"] = "/tmp"
os.putenv('GST_DEBUG_DUMP_DIR_DIR', '/tmp')
PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3
def osd_sink_pad_buffer_probe(pad,info,u_data):
frame_number=0
num_rects=0
objectdetect = {}
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
frame_number=frame_meta.frame_num
num_rects = frame_meta.num_obj_meta
l_obj=frame_meta.obj_meta_list
while l_obj is not None:
try:
obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)
except StopIteration:
break
BB_info = obj_meta.rect_params
objectdetect.update({obj_meta.class_id: [BB_info.left,BB_info.top,BB_info.left+BB_info.width,BB_info.top+BB_info.height]})
obj_meta.rect_params.border_color.set(0.0, 0.0, 1.0, 0.0)
try:
l_obj=l_obj.next
except StopIteration:
break
# Send matadata from probe function to post_processing object
pp.pushAllMeta(frame_number, num_rects, objectdetect)
# Get metadata from post_processing object to probe function.
# Notice, data sent above is not same as data received here.
# Data received here is after post prcessing by post_processing object.
display_frameNum, display_numRects, display_objCounter = pp.getMeta()
display_meta=pyds.nvds_acquire_display_meta_from_pool(batch_meta)
display_meta.num_labels = 1
py_nvosd_text_params = display_meta.text_params[0]
py_nvosd_text_params.display_text = "Frame Number={} Number of Objects={} Vehicle_count={} Person_count={}".format(display_frameNum, display_numRects, display_objCounter[PGIE_CLASS_ID_VEHICLE], display_objCounter[PGIE_CLASS_ID_PERSON])
py_nvosd_text_params.x_offset = 10
py_nvosd_text_params.y_offset = 12
py_nvosd_text_params.font_params.font_name = "Serif"
py_nvosd_text_params.font_params.font_size = 10
py_nvosd_text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
py_nvosd_text_params.set_bg_clr = 1
py_nvosd_text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
print(pyds.get_string(py_nvosd_text_params.display_text))
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
try:
l_frame=l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
def main(args):
global pp
if len(args) != 2:
sys.stderr.write("usage: %s <media file or uri>\n" % args[0])
sys.exit(1)
GObject.threads_init()
Gst.init(None)
print("Creating Pipeline \n ")
pipeline = Gst.Pipeline()
if not pipeline:
sys.stderr.write(" Unable to create Pipeline \n")
print("Creating Source \n ")
source = Gst.ElementFactory.make("filesrc", "file-source")
if not source:
sys.stderr.write(" Unable to create Source \n")
print("Creating H264Parser \n")
h264parser = Gst.ElementFactory.make("h264parse", "h264-parser")
if not h264parser:
sys.stderr.write(" Unable to create h264 parser \n")
print("Creating Decoder \n")
decoder = Gst.ElementFactory.make("nvv4l2decoder", "nvv4l2-decoder")
if not decoder:
sys.stderr.write(" Unable to create Nvv4l2 Decoder \n")
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
sys.stderr.write(" Unable to create NvStreamMux \n")
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
if not pgie:
sys.stderr.write(" Unable to create pgie \n")
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
if not nvvidconv:
sys.stderr.write(" Unable to create nvvidconv \n")
nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
if not nvosd:
sys.stderr.write(" Unable to create nvosd \n")
if is_aarch64():
transform = Gst.ElementFactory.make("nvegltransform", "nvegl-transform")
print("Creating EGLSink \n")
sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
if not sink:
sys.stderr.write(" Unable to create egl sink \n")
print("Playing file %s " %args[1])
source.set_property('location', args[1])
streammux.set_property('width', 1920)
streammux.set_property('height', 1080)
streammux.set_property('batch-size', 1)
streammux.set_property('batched-push-timeout', 4000000)
pgie.set_property('config-file-path', "dstest1_pgie_config.txt")
print("Adding elements to Pipeline \n")
pipeline.add(source)
pipeline.add(h264parser)
pipeline.add(decoder)
pipeline.add(streammux)
pipeline.add(pgie)
pipeline.add(nvvidconv)
pipeline.add(nvosd)
pipeline.add(sink)
if is_aarch64():
pipeline.add(transform)
# we link the elements together
# file-source -> h264-parser -> nvh264-decoder ->
# nvinfer -> nvvidconv -> nvosd -> video-renderer
print("Linking elements in the Pipeline \n")
source.link(h264parser)
h264parser.link(decoder)
sinkpad = streammux.get_request_pad("sink_0")
if not sinkpad:
sys.stderr.write(" Unable to get the sink pad of streammux \n")
srcpad = decoder.get_static_pad("src")
if not srcpad:
sys.stderr.write(" Unable to get source pad of decoder \n")
srcpad.link(sinkpad)
streammux.link(pgie)
pgie.link(nvvidconv)
nvvidconv.link(nvosd)
if is_aarch64():
nvosd.link(transform)
transform.link(sink)
else:
nvosd.link(sink)
# create an event loop and feed gstreamer bus mesages to it
loop = GObject.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect ("message", bus_call, loop)
# Lets add probe to get informed of the meta data generated, we add probe to
# the sink pad of the osd element, since by that time, the buffer would have
# had got all the metadata.
osdsinkpad = nvosd.get_static_pad("sink")
if not osdsinkpad:
sys.stderr.write(" Unable to get sink pad of nvosd \n")
pp = PostProcessing(fps=25)
pp.start()
osdsinkpad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0)
Gst.debug_bin_to_dot_file(pipeline, Gst.DebugGraphDetails.ALL, "pipeline")
# start play back and listen to events
print("Starting pipeline \n")
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except:
pass
# cleanup
pipeline.set_state(Gst.State.NULL)
if __name__ == '__main__':
sys.exit(main(sys.argv))
Codes for post_processing.py
import threading
import time
PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3
class PostProcessing(object):
def __init__(self, fps=30):
self.frame_number = 0
self.frame_number_ = 0
self.num_rects = 0
self.num_rects_ = 0
self.obj_counter = 0
self.obj_counter_ = 0
self.__stop = False
self.__lock = threading.Lock()
self.fps = fps
self.objectdetect_ = None
self.objectdetect = None
# Create a new thread which with daemon=True. Means that it will stop when the program stops. (__run function contains a while loop. Thus, this new thread will
# not stop until the program is end.)
def start(self):
self.__stop = False
self.__thread = threading.Thread(name='TESTAAA', target=self.__run, daemon=True)
self.__thread.start()
# We could also stop this new thread by using stop function.
def stop(self):
self.__stop = True
if self.__thread is not None:
self.__thread.join()
# Meta data are pushed from probe function to this object.
def pushAllMeta(self, frame_number, num_rects, objectdetect):
with self.__lock:
self.frame_number_ = frame_number
self.num_rects_ = num_rects
self.objectdetect_ = objectdetect
# Send the metadata from this object back to probe function.
# These metadata are generated by nextMeta function when going through __run.
def getMeta(self):
return self.frame_number, self.num_rects, self.obj_counter
# It means, we will create new thread and run while loop inside.
# Time for each loop, from current test, depends on sleep time value, i.e. 1sec.
# Also inside each loop, we will first get data from which sent from probe (in nextMeta function),
# then do some post processing (in postProcessingMain function),
# finally go through time.sleep, to simulate heavy calculation condition.
def __run(self):
prev_time = 0
while not self.__stop:
t1 = time.time()
# Get next batch meta
self.nextMeta()
# main codes
self.postProcessingMain()
t2 = time.time()
# time to wait [s] to fulfill input fps
wait_time = 1 / self.fps - (t2 - t1)
#print("wait_time: ",wait_time)
# wait until
time.sleep(1)
#print(time.time())
#time.sleep(max(0, wait_time))
def nextMeta(self):
self.frame_number = self.frame_number_
self.num_rects = self.num_rects_
self.objectdetect = self.objectdetect_
def postProcessingMain(self):
self.obj_counter = {
PGIE_CLASS_ID_VEHICLE:0,
PGIE_CLASS_ID_PERSON:0,
PGIE_CLASS_ID_BICYCLE:0,
PGIE_CLASS_ID_ROADSIGN:0
}
if self.objectdetect == None:
return
else:
for key, value in self.objectdetect.items():
self.obj_counter[key] += 1
Thanks a lot.
Best
Yichao