I’m developing deepstream pipeline for simple face recognition app using multiprocessing + pycuda + TensorRT follow deepstream-test-3 sample. Bellow is the modified code:
#!/usr/bin/env python3
import os
import cv2
import sys
import gi
from copy import deepcopy
import configparser
gi.require_version('Gst', '1.0')
import multiprocessing as mp
try:
mp.set_start_method('spawn', force=True)
except RuntimeError:
pass
from gi.repository import GObject, Gst
from gi.repository import GLib
import random
from ctypes import *
import time
import sys
import math
import platform
import json
import datetime
import numpy as np
from common.is_aarch_64 import is_aarch64
from common.FPS import GETFPS
import pyds
from parsers.parser_scrfd import nvds_infer_parse_custom_scrfd
from face_ds import face_recog_mp
from multiprocessing import Queue, Process
MUXER_BATCH_TIMEOUT_USEC=-1
MAX_NUM_SOURCES = 4
def init_pipeline():
global g_num_sources, g_source_id_list, g_eos_list, g_source_enabled, g_source_bin_list, g_source_base_meta_list, g_source_camera_id_list, fps_streams
g_num_sources = 0
g_source_id_list = [0] * MAX_NUM_SOURCES
g_eos_list = [False] * MAX_NUM_SOURCES
g_source_enabled = [False] * MAX_NUM_SOURCES
g_source_bin_list = [None] * MAX_NUM_SOURCES
g_source_base_meta_list = dict()
g_source_camera_id_list = [None] * MAX_NUM_SOURCES
fps_streams = dict()
def pgie_src_pad_buffer_probe(pad,info,u_data):
global g_source_base_meta_list, main_queue
frame_number=0
num_rects=0
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return
# Retrieve batch metadata from the gst_buffer
# Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
# C address of gst_buffer as input, which is obtained with hash(gst_buffer)
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
lst_meta = []
lst_frame = {}
while l_frame is not None:
try:
# Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
# The casting is done by pyds.NvDsFrameMeta.cast()
# The casting also keeps ownership of the underlying memory
# in the C code, so the Python garbage collector will leave
# it alone.
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
frame_number = frame_meta.frame_num
source_id = frame_meta.source_id
fps_streams[g_source_camera_id_list[source_id]].get_fps()
n_frame = pyds.get_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
frame_copy = np.array(n_frame, copy=True, order='C')
frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_RGBA2RGB)
l_user = frame_meta.frame_user_meta_list
while l_user is not None:
try:
user_meta = pyds.NvDsUserMeta.cast(l_user.data)
except StopIteration:
break
if (
user_meta.base_meta.meta_type
!= pyds.NvDsMetaType.NVDSINFER_TENSOR_OUTPUT_META
):
continue
tensor_meta = pyds.NvDsInferTensorMeta.cast(user_meta.user_meta_data)
layers_info = []
for i in range(tensor_meta.num_output_layers):
layer = pyds.get_nvds_LayerInfo(tensor_meta, i)
layers_info.append(layer)
#################
main_queue.put(deepcopy(layers_info))
#################
try:
l_user = l_user.next
except StopIteration:
break
try:
# indicate inference is performed on the frame
frame_meta.bInferDone = True
l_frame = l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
def cb_newpad(decodebin,pad,data):
global streammux
caps=pad.get_current_caps()
gststruct=caps.get_structure(0)
gstname=gststruct.get_name()
# Need to check if the pad created by the decodebin is for video and not
# audio.
if(gstname.find("video")!=-1):
source_id = data
pad_name = "sink_%u" % source_id
# Get a sink pad from the streammux, link to decodebin
sinkpad = streammux.get_request_pad(pad_name)
if pad.link(sinkpad) == Gst.PadLinkReturn.OK:
print(" --> Decodebin linked to pipeline")
else:
sys.stderr.write("Failed to link decodebin to pipeline\n")
def decodebin_child_added(child_proxy,Object,name,user_data):
print("Decodebin child added:", name, "\n")
if(name.find("decodebin") != -1):
Object.connect("child-added",decodebin_child_added,user_data)
if(name.find("nvv4l2decoder") != -1):
if (is_aarch64()):
Object.set_property("enable-max-performance", True)
Object.set_property("drop-frame-interval", 0)
Object.set_property("num-extra-surfaces", 0)
else:
Object.set_property("gpu_id", 0)
def create_uridecode_bin(index,filename):
global g_source_id_list, g_source_enabled
print(" --- Creating uridecodebin for [%s]" % filename)
# Create a source GstBin to abstract this bin's content from the rest of the
# pipeline
g_source_id_list[index] = index
bin_name="source-bin-%02d" % index
print(" --- " + bin_name)
# Source element for reading from the uri.
# We will use decodebin and let it figure out the container format of the
# stream and the codec and plug the appropriate demux and decode plugins.
bin=Gst.ElementFactory.make("uridecodebin", bin_name)
if not bin:
sys.stderr.write(" Unable to create uri decode bin \n")
# We set the input uri to the source element
bin.set_property("uri",filename)
# Connect to the "pad-added" signal of the decodebin which generates a
# callback once a new pad for raw data has been created by the decodebin
bin.connect("pad-added",cb_newpad,g_source_id_list[index])
bin.connect("child-added",decodebin_child_added,g_source_id_list[index])
#Set status of the source to enabled
g_source_enabled[index] = True
return bin
def bus_call(bus, message, loop):
global g_eos_list
t = message.type
if t == Gst.MessageType.EOS:
print("End-of-stream\n")
loop.quit()
elif t==Gst.MessageType.WARNING:
err, debug = message.parse_warning()
print("Warning: %s: %s\n" % (err, debug))
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print("Error: %s: %s\n" % (err, debug))
loop.quit()
return True
def run(lst_video_info, infer_size = (1920, 1080)):
global streammux, loop, pipeline, face_processor, main_queue
# Check input arguments
g_num_sources=len(lst_video_info)
assert g_num_sources <= MAX_NUM_SOURCES, "Current support only {} parallel streams".format(MAX_NUM_SOURCES)
# Standard GStreamer initialization
GObject.threads_init()
Gst.init(None)
# Create gstreamer elements */
# Create Pipeline element that will form a connection of other elements
print('='*30 + " Creating Pipeline " + '='*30)
pipeline = Gst.Pipeline()
is_live = False
if not pipeline:
sys.stderr.write(" Unable to create Pipeline")
print("[INFO] Creating streamux")
# Create nvstreammux instance to form batches from one or more sources.
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
sys.stderr.write(" Unable to create NvStreamMux \n")
if infer_size[0] is not None and infer_size[1] is not None:
print(' --> Optimized streammux size: {}'.format(infer_size))
MUXED_OUTPUT_WIDTH = infer_size[0]
MUXED_OUTPUT_HEIGHT = infer_size[1]
streammux.set_property('width', infer_size[0])
streammux.set_property('height', infer_size[1])
time.sleep(1)
else:
MUXED_OUTPUT_WIDTH = 1280
MUXED_OUTPUT_HEIGHT = 720
streammux.set_property('width', MUXED_OUTPUT_WIDTH)
streammux.set_property('height', MUXED_OUTPUT_HEIGHT)
streammux.set_property('batch-size', g_num_sources)
streammux.set_property('batched-push-timeout', MUXER_BATCH_TIMEOUT_USEC)
print("[INFO] Creating source_bin")
for i in range(g_num_sources):
print(" --> Creating source_bin ",i)
uri_name=lst_video_info[i]["urlLocal"]
if uri_name.find("rtsp://") == 0:
is_live = True
#Create first source bin and add to pipeline
source_bin=create_uridecode_bin(i, uri_name)
if not source_bin:
sys.stderr.write("Failed to create source bin. Exiting. \n")
sys.exit(1)
g_source_bin_list[i] = source_bin
camera_id = lst_video_info[i]["cameraID"]
g_source_camera_id_list[i] = camera_id
g_source_base_meta_list[camera_id] = lst_video_info[i]
fps_streams[camera_id] = GETFPS(camera_id)
pipeline.add(source_bin)
print("[INFO] Creating Pgie")
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
if not pgie:
sys.stderr.write(" Unable to create pgie")
pgie.set_property('config-file-path', "configs/config_scrfd.txt")
pgie_batch_size=pgie.get_property("batch-size")
if(pgie_batch_size != g_num_sources):
print("WARNING: Overriding infer-config batch-size", pgie_batch_size," with number of sources ", g_num_sources," \n")
pgie.set_property("batch-size", g_num_sources)
print("[INFO] Creating nvvidconv")
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
if not nvvidconv:
sys.stderr.write(" Unable to create nvvidconv")
print("[INFO] Creating filter")
caps1 = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
filter1 = Gst.ElementFactory.make("capsfilter", "filter1")
if not filter1:
sys.stderr.write(" Unable to get the caps filter1 \n")
filter1.set_property("caps", caps1)
print("[INFO] Creating FakeLSink")
sink = Gst.ElementFactory.make("fakesink", "fakesink")
if not sink:
sys.stderr.write(" Unable to create fakesink \n")
queue1 = Gst.ElementFactory.make("queue","queue1")
queue2 = Gst.ElementFactory.make("queue","queue2")
queue3 = Gst.ElementFactory.make("queue","queue3")
queue4 = Gst.ElementFactory.make("queue","queue4")
if is_live:
print("[WARNING] Atleast one of the sources is live")
streammux.set_property('live-source', 1)
sink.set_property("sync", 0)
sink.set_property("qos", 0)
if not is_aarch64():
# Use CUDA unified memory in the pipeline so frames
# can be easily accessed on CPU in Python.
mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
streammux.set_property("nvbuf-memory-type", mem_type)
nvvidconv.set_property("nvbuf-memory-type", mem_type)
print("Adding elements to Pipeline \n")
pipeline.add(streammux)
pipeline.add(pgie)
pipeline.add(nvvidconv)
pipeline.add(filter1)
pipeline.add(sink)
pipeline.add(queue1)
pipeline.add(queue2)
pipeline.add(queue3)
pipeline.add(queue4)
print("Linking elements in the Pipeline \n")
streammux.link(queue1)
queue1.link(nvvidconv)
nvvidconv.link(queue2)
queue2.link(filter1)
filter1.link(queue3)
queue3.link(pgie)
pgie.link(queue4)
queue4.link(sink)
# create an event loop and feed gstreamer bus mesages to it
loop = GObject.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect ("message", bus_call, loop)
pgie_src_pad = pgie.get_static_pad("src")
if not pgie_src_pad:
sys.stderr.write(" Unable to get pgie src pad \n")
else:
pgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, pgie_src_pad_buffer_probe, 0)
# Init face recognition model in another process
main_queue = Queue()
face_processor = Process(target = face_recog_mp, args = (main_queue, lst_video_info))
face_processor.start()
print("Starting pipeline \n")
# start play back and listed to events
pipeline.set_state(Gst.State.PLAYING)
loop.run()
print("Exiting app\n")
pipeline.set_state(Gst.State.NULL)
main_queue.put(None) # Stop face recognition
if __name__ == '__main__':
meta_video_1 = {
'urlLocal': '<path-to-stream-1>',
'cameraID': '1'
}
meta_video_2 = {
'urlLocal': '<path-to-stream-2>',
'cameraID': '2'
}
init_pipeline()
run([meta_video_1, meta_video_2], (1920, 1080))
For 2 x MP4 input it work well with 45+ FPS each, but for 2 x RTSP input it only show 13-14 FPS each and sometime has delay, sometime I got frames from previous time in probe function (e.x got some frames from 11:45 AM in 11:47 AM)
Thanks
• Hardware Platform (Jetson / GPU): A100
• DeepStream Version: 6.1-Triton
• NVIDIA GPU Driver Version (valid for GPU only): 470.57.02
• Issue Type( questions, new requirements, bugs): bugs