I’ve used the deepstream_test_3.py as the base pipeline for loading multiple sources, and added nvvidconv, caps, and filter configs from deepstream_imagedata-multistream.py to the start of the pipeline to be able to extract image matrix data to numpy array.
# Add nvvidconv1 and filter1 to convert the frames to RGBA
# which is easier to work with in Python.
print("Creating nvvidconv1 \n ")
nvvidconv1 = Gst.ElementFactory.make("nvvideoconvert", "convertor1")
if not nvvidconv1:
sys.stderr.write(" Unable to create nvvidconv1 \n")
print("Creating filter1 \n ")
caps1 = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
filter1 = Gst.ElementFactory.make("capsfilter", "filter1")
if not filter1:
sys.stderr.write(" Unable to get the caps filter1 \n")
filter1.set_property("caps", caps1)
print("Creating tiler \n ")
if not platform_info.is_integrated_gpu():
# Use CUDA unified memory in the pipeline so frames
# can be easily accessed on CPU in Python.
mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
streammux.set_property("nvbuf-memory-type", mem_type)
nvvidconv.set_property("nvbuf-memory-type", mem_type)
if platform_info.is_wsl():
#opencv functions like cv2.line and cv2.putText is not able to access NVBUF_MEM_CUDA_UNIFIED memory
#in WSL systems due to some reason and gives SEGFAULT. Use NVBUF_MEM_CUDA_PINNED memory for such
#usecases in WSL. Here, nvvidconv1's buffer is used in tiler sink pad probe and cv2 operations are
#done on that.
print("using nvbuf_mem_cuda_pinned memory for nvvidconv1\n")
vc_mem_type = int(pyds.NVBUF_MEM_CUDA_PINNED)
nvvidconv1.set_property("nvbuf-memory-type", vc_mem_type)
else:
nvvidconv1.set_property("nvbuf-memory-type", mem_type)
tiler.set_property("nvbuf-memory-type", mem_type)
And here’s my pipeline:
streammux.link(nvvidconv1)
nvvidconv1.link(filter1)
filter1.link(pgie)
And I added a buffer pad probe to my pgie to get the final detections and send them to a shared process queue:
pgie_src_pad = pgie.get_static_pad("src")
if pgie_src_pad:
pgie_src_pad.add_probe(
Gst.PadProbeType.BUFFER,
pgie_src_pad_buffer_probe,
frame_data_queue,
)
And in the probe function I have:
def pgie_src_pad_buffer_probe(pad, info, frame_data_queue: multiprocessing.Queue):
gst_buffer = info.get_buffer()
if not gst_buffer:
return Gst.PadProbeReturn.OK
gst_buffer_hash = hash(gst_buffer)
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(gst_buffer_hash)
if not batch_meta:
return Gst.PadProbeReturn.OK
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
detections = build_dict_from_ds_detections(frame_meta)
detections['gst_buffer_hash'] = gst_buffer_hash
detections['batch_id'] = frame_meta.batch_id
frame_data_queue.put(detections)
try:
l_frame = l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
And in my main logic process, I read the frame_data_queue and extract frame image matrix when needed like this:
frame_data = frame_data_queue.get(timeout=1)
frame = pyds.get_nvds_buf_surface(frame_data["gst_buffer_hash"], frame_data["batch_id"])
But this results in my main logic process to halt silently. Instead when inside of the pgie_src_pad_buffer_probe probe function I store the result of get_nvds_buf_surface() function to the frame_data_queue it decreases the performance of the DeepStream on 20 streams from 30 to 6 FPS!
What I’m essentially going to do with the retrieved frame image is sending it to kafka or saving it locally without any changes.