• Hardware Platform (Jetson / GPU) - GeForce RTX 2080 Ti
• DeepStream Version - 6.3
• TensorRT Version - 8.6.1
• NVIDIA GPU Driver Version (valid for GPU only) - 530.41.03
If I want to save a frame I get: n_frame = pyds.get_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id) RuntimeError: get_nvds_buf_Surface: Currently we only support RGBA color Format
This is my full code:
#!/usr/bin/env python3
import sys
sys.path.append('../')
import gi
import configparser
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
from gi.repository import GLib
from ctypes import *
import time
import sys
import math
import random
import platform
from common.is_aarch_64 import is_aarch64
import argparse
from common.FPS import PERF_DATA
from common.bus_call import bus_call
import pyds
from PIL import Image
import numpy as np
import cv2
import os
perf_data = None
OSD_PROCESS_MODE = 0
OSD_DISPLAY_TEXT = 1
MAX_DISPLAY_LEN=64
PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3
SGIE_CLASS_ID_FACE = 0
SGIE_CLASS_ID_LP = 1
PRIMARY_DETECTOR_UID = 1
SECONDARY_DETECTOR_UID = 2
SECOND_DETECTOR_IS_SECONDARY = 1
MUXER_OUTPUT_WIDTH=1920
MUXER_OUTPUT_HEIGHT=1080
MUXER_BATCH_TIMEOUT_USEC=4000000
TILED_OUTPUT_WIDTH=1280
TILED_OUTPUT_HEIGHT=720
GPU_ID = 0
MAX_NUM_SOURCES = 1
SINK_ELEMENT = "nveglglessink"
PGIE_CONFIG_FILE = "dstest2_pgie_config.txt"
TRACKER_CONFIG_FILE = "dstest2_tracker_config2.txt"
SGIE1_CONFIG_FILE = "dstest2_sgie1_config_license_face.txt"
SGIE2_CONFIG_FILE = "dstest2_sgie1_config.txt"
SGIE3_CONFIG_FILE = "dstest2_sgie2_config.txt"
SGIE4_CONFIG_FILE = "dstest2_sgie3_config.txt"
CONFIG_GPU_ID = "gpu-id"
CONFIG_GROUP_TRACKER = "tracker"
CONFIG_GROUP_TRACKER_WIDTH = "tracker-width"
CONFIG_GROUP_TRACKER_HEIGHT = "tracker-height"
CONFIG_GROUP_TRACKER_LL_CONFIG_FILE = "ll-config-file"
CONFIG_GROUP_TRACKER_LL_LIB_FILE = "ll-lib-file"
CONFIG_GROUP_TRACKER_ENABLE_BATCH_PROCESS = "enable-batch-process"
g_num_sources = 0
g_source_id_list = [0] * MAX_NUM_SOURCES
g_eos_list = [False] * MAX_NUM_SOURCES
g_source_enabled = [False] * MAX_NUM_SOURCES
g_source_bin_list = [None] * MAX_NUM_SOURCES
# pgie_classes_str= ["Vehicle", "TwoWheeler", "Person","RoadSign"]
# sgie_classes_str = ["Face", "License Plate"]
# frame_number = 0
uri = ""
loop = None
pipeline = None
streammux = None
sink = None
pgie = None
sgie1 = None
sgie2 = None
sgie3 = None
sgie4 = None
nvvideoconvert = None
nvosd = None
tiler = None
tracker = None
trackersec = None
output_dir = 'output'
if not os.path.exists(output_dir):
os.makedirs(output_dir)
def osd_sink_pad_buffer_probe(pad, info, u_data):
# This time we maintain two separate counters for primary and secondary objects
primary_obj_counter = {
PGIE_CLASS_ID_PERSON: 0,
PGIE_CLASS_ID_VEHICLE: 0,
}
secondary_obj_counter = {
SGIE_CLASS_ID_FACE: 0,
SGIE_CLASS_ID_LP: 0,
}
frame_number = 0
# global frame_number
global SECOND_DETECTOR_IS_SECONDARY
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return Gst.PadProbeReturn.OK
# Retrieve batch metadata from the gst_buffer
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
l_obj = frame_meta.obj_meta_list
source_id = frame_meta.source_id
frame_number = frame_meta.frame_num
if frame_number % 30 == 0:
n_frame = pyds.get_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
frame_image_np = np.array(n_frame, copy=True, order='C')
frame_image_np = cv2.cvtColor(frame_image_np, cv2.COLOR_RGBA2BGR)
filename = os.path.join(output_dir, "frame_{}.png".format(frame_number))
cv2.imwrite(filename, frame_image_np)
while l_obj is not None:
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
l_class = obj_meta.classifier_meta_list
if obj_meta.unique_component_id == PRIMARY_DETECTOR_UID:
if obj_meta.class_id in [PGIE_CLASS_ID_PERSON, PGIE_CLASS_ID_VEHICLE]:
primary_obj_counter[obj_meta.class_id] += 1
elif obj_meta.unique_component_id == SECONDARY_DETECTOR_UID:
if obj_meta.class_id in [SGIE_CLASS_ID_FACE, SGIE_CLASS_ID_LP]:
secondary_obj_counter[obj_meta.class_id] += 1
if obj_meta.class_id == 0 and obj_meta.parent.class_id == 2:
print('Face detected at Tracking ID:', obj_meta.parent.object_id)
if obj_meta.class_id == 1 and obj_meta.parent.class_id == 0:
l_class_parent = obj_meta.parent.classifier_meta_list
color = 'undetected'
model = 'undetected'
vehicle_type = 'undetected'
if l_class_parent is not None:
while l_class_parent is not None:
class_meta_parent = pyds.NvDsClassifierMeta.cast(l_class_parent.data)
# top = int(obj_meta.rect_params.top)
# left = int(obj_meta.rect_params.left)
# width = int(obj_meta.rect_params.width)
# height = int(obj_meta.rect_params.height)
if class_meta_parent.unique_component_id == 3:
l_label_parent = class_meta_parent.label_info_list
label_color = pyds.NvDsLabelInfo.cast(l_label_parent.data)
color = label_color.result_label if label_color.result_label else 'undetected'
if class_meta_parent.unique_component_id == 4:
l_label_parent = class_meta_parent.label_info_list
label_model = pyds.NvDsLabelInfo.cast(l_label_parent.data)
model = label_model.result_label if label_model.result_label else 'undetected'
if class_meta_parent.unique_component_id == 5:
l_label_parent = class_meta_parent.label_info_list
label_vehicle_type = pyds.NvDsLabelInfo.cast(l_label_parent.data)
vehicle_type = label_vehicle_type.result_label if label_vehicle_type.result_label else 'undetected'
l_class_parent = l_class_parent.next
print('License Plate detected for vehicle with Tracking ID: {}, Color: {}, Model: {}, Type: {}'
.format(obj_meta.parent.object_id, color, model, vehicle_type))
l_obj = l_obj.next
print("Source ID:", source_id,
"Frame Number:", frame_number,
"Person Count:", primary_obj_counter[PGIE_CLASS_ID_PERSON],
"Vehicle Count:", primary_obj_counter[PGIE_CLASS_ID_VEHICLE],
"Face Count:", secondary_obj_counter[SGIE_CLASS_ID_FACE],
"License Plate Count:", secondary_obj_counter[SGIE_CLASS_ID_LP])
try:
l_frame = l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
def cb_newpad(decodebin, decoder_src_pad, data):
print("In cb_newpad\n")
caps = decoder_src_pad.get_current_caps()
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()
source_bin = data
features = caps.get_features(0)
# Need to check if the pad created by the decodebin is for video and not
# audio.
print("gstname=", gstname)
if gstname.find("video") != -1:
# Link the decodebin pad only if decodebin has picked nvidia
# decoder plugin nvdec_*. We do this by checking if the pad caps contain
# NVMM memory features.
print("features=", features)
if features.contains("memory:NVMM"):
# Get the source bin ghost pad
bin_ghost_pad = source_bin.get_static_pad("src")
if not bin_ghost_pad.set_target(decoder_src_pad):
sys.stderr.write(
"Failed to link decoder src pad to source bin ghost pad\n"
)
else:
sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")
def decodebin_child_added(child_proxy, Object, name, user_data):
print("Decodebin child added:", name, "\n")
if name.find("decodebin") != -1:
Object.connect("child-added", decodebin_child_added, user_data)
if "source" in name:
source_element = child_proxy.get_by_name("source")
if source_element.find_property("drop-on-latency") != None:
Object.set_property("drop-on-latency", True)
def create_source_bin(index, uri):
print("Creating source bin")
# Create a source GstBin to abstract this bin's content from the rest of the
# pipeline
bin_name = "source-bin-%02d" % index
print(bin_name)
nbin = Gst.Bin.new(bin_name)
if not nbin:
sys.stderr.write(" Unable to create source bin \n")
# Source element for reading from the uri.
# We will use decodebin and let it figure out the container format of the
# stream and the codec and plug the appropriate demux and decode plugins.
uri_decode_bin = Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
if not uri_decode_bin:
sys.stderr.write(" Unable to create uri decode bin \n")
# We set the input uri to the source element
uri_decode_bin.set_property("uri", uri)
# Connect to the "pad-added" signal of the decodebin which generates a
# callback once a new pad for raw data has beed created by the decodebin
uri_decode_bin.connect("pad-added", cb_newpad, nbin)
uri_decode_bin.connect("child-added", decodebin_child_added, nbin)
# We need to create a ghost pad for the source bin which will act as a proxy
# for the video decoder src pad. The ghost pad will not have a target right
# now. Once the decode bin creates the video decoder and generates the
# cb_newpad callback, we will set the ghost pad target to the video decoder
# src pad.
Gst.Bin.add(nbin, uri_decode_bin)
bin_pad = nbin.add_pad(Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC))
if not bin_pad:
sys.stderr.write(" Failed to add ghost pad in source bin \n")
return None
return nbin
def make_element(element_name, i):
element = Gst.ElementFactory.make(element_name, element_name)
if not element:
sys.stderr.write(" Unable to create {0}".format(element_name))
element.set_property("name", "{0}-{1}".format(element_name, str(i)))
return element
def main(args, requested_pgie=None, config=None, disable_probe=False):
global g_num_sources
global g_source_bin_list
global uri
global loop
global pipeline
global streammux
global sink
global pgie
global sgie1
global sgie2
global sgie3
global nvvideoconvert
global nvosd
global tiler
global tracker
global trackersec
input_sources = args
number_sources = len(input_sources)
global perf_data
perf_data = PERF_DATA(number_sources)
# Standard GStreamer initialization
Gst.init(None)
# Create gstreamer elements */
# Create Pipeline element that will form a connection of other elements
print("Creating Pipeline \n ")
pipeline = Gst.Pipeline()
is_live = False
if not pipeline:
sys.stderr.write(" Unable to create Pipeline \n")
print("Creating streamux \n ")
# Create nvstreammux instance to form batches from one or more sources.
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
sys.stderr.write(" Unable to create NvStreamMux \n")
pipeline.add(streammux)
for i in range(number_sources):
print("Creating source_bin ", i, " \n ")
uri_name = input_sources[i]
if uri_name.find("rtsp://") == 0:
is_live = True
source_bin = create_source_bin(i, uri_name)
if not source_bin:
sys.stderr.write("Unable to create source bin \n")
pipeline.add(source_bin)
padname = "sink_%u" % i
sinkpad = streammux.get_request_pad(padname)
if not sinkpad:
sys.stderr.write("Unable to create sink pad bin \n")
srcpad = source_bin.get_static_pad("src")
if not srcpad:
sys.stderr.write("Unable to create src pad bin \n")
srcpad.link(sinkpad)
# queue1 = Gst.ElementFactory.make("queue", "queue1")
# pipeline.add(queue1)
print("Creating Pgie \n ")
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
if not pgie:
sys.stderr.write(" Unable to create pgie \n")
print("Creating nvtracker \n ")
tracker = Gst.ElementFactory.make("nvtracker", "tracker")
if not tracker:
sys.stderr.write(" Unable to create tracker \n")
print("Creating nvtracker2 \n ")
trackersec = Gst.ElementFactory.make("nvtracker", "trackersec")
if not trackersec:
sys.stderr.write(" Unable to create tracker \n")
print("Creating sgie1 \n ")
sgie1 = Gst.ElementFactory.make("nvinfer", "secondary1-nvinference-engine")
if not sgie1:
sys.stderr.write(" Unable to make sgie1 \n")
print("Creating sgie2 \n ")
sgie2 = Gst.ElementFactory.make("nvinfer", "secondary2-nvinference-engine")
if not sgie2:
sys.stderr.write(" Unable to make sgie2 \n")
print("Creating sgie3 \n ")
sgie3 = Gst.ElementFactory.make("nvinfer", "secondary3-nvinference-engine")
if not sgie3:
sys.stderr.write(" Unable to make sgie3 \n")
print("Creating sgie4 \n ")
sgie4 = Gst.ElementFactory.make("nvinfer", "secondary4-nvinference-engine")
if not sgie4:
sys.stderr.write(" Unable to make sgie4 \n")
print("Creating tiler \n ")
tiler=Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
if not tiler:
sys.stderr.write(" Unable to create tiler \n")
print("Creating nvvidconv \n ")
nvvideoconvert = Gst.ElementFactory.make("nvvideoconvert", "convertor")
if not nvvideoconvert:
sys.stderr.write(" Unable to create nvvidconv \n")
print("Creating nvvidconv1 \n ")
nvvideoconvert1 = Gst.ElementFactory.make("nvvideoconvert", "convertor1")
if not nvvideoconvert1:
sys.stderr.write(" Unable to create nvvidconv1 \n")
print("Creating filter1 \n ")
caps1 = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
filter1 = Gst.ElementFactory.make("capsfilter", "filter1")
if not filter1:
sys.stderr.write(" Unable to get the caps filter1 \n")
filter1.set_property("caps", caps1)
print("Creating nvosd \n ")
nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
if not nvosd:
sys.stderr.write(" Unable to create nvosd \n")
if is_aarch64():
print("Creating nv3dsink \n")
sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
if not sink:
sys.stderr.write(" Unable to create nv3dsink \n")
else:
print("Creating EGLSink \n")
sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
if not sink:
sys.stderr.write(" Unable to create egl sink \n")
if is_live:
print("Atleast one of the sources is live")
streammux.set_property('live-source', 1)
print("Creating nvstreamdemux \n ")
nvstreamdemux = Gst.ElementFactory.make("nvstreamdemux", "nvstreamdemux")
if not nvstreamdemux:
sys.stderr.write(" Unable to create nvstreamdemux \n")
if is_live:
print("Atleast one of the sources is live")
streammux.set_property("live-source", 1)
streammux.set_property("width", 960)
streammux.set_property("height", 540)
streammux.set_property("batch-size", number_sources)
streammux.set_property("batched-push-timeout", 4000000)
pgie.set_property("config-file-path", PGIE_CONFIG_FILE)
pgie.set_property("unique-id", PRIMARY_DETECTOR_UID)
pgie_batch_size = pgie.get_property("batch-size")
if pgie_batch_size != number_sources:
print(
"WARNING: Overriding infer-config batch-size",
pgie_batch_size,
" with number of sources ",
number_sources,
" \n",
)
pgie.set_property("batch-size", number_sources)
sgie1.set_property('config-file-path', SGIE1_CONFIG_FILE)
sgie1.set_property("unique-id", SECONDARY_DETECTOR_UID)
sgie1.set_property("process-mode", 2 if SECOND_DETECTOR_IS_SECONDARY else 1)
sgie2.set_property('config-file-path', SGIE2_CONFIG_FILE)
sgie2.set_property("unique-id", 3)
sgie2.set_property("process-mode", 2)
sgie3.set_property('config-file-path', SGIE3_CONFIG_FILE)
sgie3.set_property("unique-id", 4)
sgie3.set_property("process-mode", 2)
sgie4.set_property('config-file-path', SGIE4_CONFIG_FILE)
sgie4.set_property("unique-id", 5)
sgie4.set_property("process-mode", 2)
config = configparser.ConfigParser()
config.read(TRACKER_CONFIG_FILE)
config.sections()
for key in config['tracker']:
if key == 'tracker-width' :
tracker_width = config.getint('tracker', key)
tracker.set_property('tracker-width', tracker_width)
trackersec.set_property('tracker-width', tracker_width)
if key == 'tracker-height' :
tracker_height = config.getint('tracker', key)
tracker.set_property('tracker-height', tracker_height)
trackersec.set_property('tracker-height', tracker_height)
if key == 'gpu-id' :
tracker_gpu_id = config.getint('tracker', key)
tracker.set_property('gpu_id', tracker_gpu_id)
trackersec.set_property('gpu_id', tracker_gpu_id)
if key == 'll-lib-file' :
tracker_ll_lib_file = config.get('tracker', key)
tracker.set_property('ll-lib-file', tracker_ll_lib_file)
trackersec.set_property('ll-lib-file', tracker_ll_lib_file)
if key == 'll-config-file' :
tracker_ll_config_file = config.get('tracker', key)
tracker.set_property('ll-config-file', tracker_ll_config_file)
trackersec.set_property('ll-config-file', tracker_ll_config_file)
if key == 'enable-batch-process' :
tracker_enable_batch_process = config.getint('tracker', key)
# tracker.set_property('enable_batch_process', tracker_enable_batch_process)
# trackersec.set_property('enable_batch_process', tracker_enable_batch_process)
tiler_rows=int(math.sqrt(number_sources))
tiler_columns=int(math.ceil((1.0*number_sources)/tiler_rows))
tiler.set_property("rows",tiler_rows)
tiler.set_property("columns",tiler_columns)
tiler.set_property("width", TILED_OUTPUT_WIDTH)
tiler.set_property("height", TILED_OUTPUT_HEIGHT)
#Set gpu IDs of tiler, nvvideoconvert, and nvosd
tiler.set_property("gpu_id", GPU_ID)
nvvideoconvert.set_property("gpu_id", GPU_ID)
nvosd.set_property("gpu_id", GPU_ID)
#Set gpu ID of sink if not aarch64
if(not is_aarch64()):
sink.set_property("gpu_id", GPU_ID)
print("Adding elements to Pipeline \n")
pipeline.add(pgie)
pipeline.add(tracker)
pipeline.add(sgie1)
# pipeline.add(trackersec)
pipeline.add(sgie2)
pipeline.add(sgie3)
pipeline.add(sgie4)
pipeline.add(tiler)
pipeline.add(nvvideoconvert)
pipeline.add(nvosd)
pipeline.add(sink)
print("Linking elements in the Pipeline \n")
streammux.link(pgie)
pgie.link(tracker)
tracker.link(sgie1)
sgie1.link(sgie2)
sgie2.link(sgie3)
sgie3.link(sgie4)
sgie4.link(tiler)
tiler.link(nvvideoconvert)
nvvideoconvert.link(nvosd)
nvosd.link(sink)
print("Linking elements in the Pipeline \n")
sink.set_property("sync", 0)
sink.set_property("qos",0)
# create an event loop and feed gstreamer bus mesages to it
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
pgie_src_pad = tiler.get_static_pad("sink")
if not pgie_src_pad:
sys.stderr.write(" Unable to get src pad \n")
else:
pgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0)
# perf callback function to print fps every 5 sec
GLib.timeout_add(5000, perf_data.perf_print_callback)
# List the sources
print("Now playing...")
for i, source in enumerate(input_sources):
print(i, ": ", source)
print("Starting pipeline \n")
# start play back and listed to events
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except:
pass
# cleanup
print("Exiting app\n")
pipeline.set_state(Gst.State.NULL)
def parse_args():
parser = argparse.ArgumentParser(prog="deepstream_demux_multi_in_multi_out.py",
description="deepstream-demux-multi-in-multi-out takes multiple URI streams as input" \
"and uses `nvstreamdemux` to split batches and output separate buffer/streams")
parser.add_argument(
"-i",
"--input",
help="Path to input streams",
nargs="+",
metavar="URIs",
default=["a"],
required=True,
)
args = parser.parse_args()
stream_paths = args.input
return stream_paths
if __name__ == "__main__":
stream_paths = parse_args()
sys.exit(main(stream_paths))