Sure.
#!/usr/bin/env python3
################################################################################
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
################################################################################
import sys
sys.path.append('../')
import gi
import configparser
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
from gi.repository import GLib
from ctypes import *
import time
import sys
import numpy as np
import cv2
import math
import platform
from common.is_aarch_64 import is_aarch64
from common.utils import long_to_uint64
from common.bus_call import bus_call
from common.FPS import GETFPS
import pyds
from threading import Thread
import datetime
import os
import json
import pyds
import time
from queue import Queue
MAX_DISPLAY_LEN = 64
MAX_TIME_STAMP_LEN = 32
MUXER_OUTPUT_WIDTH = 1920
MUXER_OUTPUT_HEIGHT = 1080
MUXER_BATCH_TIMEOUT_USEC = 4000000
TILED_OUTPUT_WIDTH=640*2
TILED_OUTPUT_HEIGHT=640
GST_CAPS_FEATURES_NVMM="memory:NVMM"
OSD_PROCESS_MODE= 0
OSD_DISPLAY_TEXT= 0
PGIE_CLASS_ID_T = 0
PGIE_CLASS_ID_INGOT = 1
PGIE_CLASS_ID_TRUCK = 2
schema_type = 0
proto_lib = "/opt/nvidia/deepstream/deepstream-6.3/lib/libnvds_kafka_proto.so"
conn_str = "kraz-s-kafka01.sib.rual.ru;9092;testTopicDeep"
cfg_file = None
topic = None
input_file = None
PGIE_CONFIG_FILE = "dstest3_pgie_config.txt"
MSCONV_CONFIG_FILE = "dstest4_msgconv_config.txt"
pgie_classes_str = ["T", "Ingot", "Truck"]
#start timer
init = time.time()
#create image directories
path1 = "positive"
path2 = "negative"
if not os.path.exists(path1):
os.mkdir(path1)
if not os.path.exists(path2):
os.mkdir(path2)
#read config
try:
with open("config.json", "r") as f:
config = json.load(f)
print(config)
if len(list(config)) == 0:
print("No configurations provided in json file")
sys.exit(1)
conn_str = config["conn_str"]
sources = config["source"]
if len(list(sources)) == 0:
print("No source provided in json file")
sys.exit(1)
for key, value in sources.items():
if value == "":
print("No source provided in json file")
sys.exit(1)
display = config["display"]
if not isinstance(display, bool):
print("wrong value for 'display' in json file. Valid usage is 'display': true or 'display': false")
sys.exit(1)
MUXER_OUTPUT_WIDTH = config["processing_width"]
if type(MUXER_OUTPUT_WIDTH)!=type(1):
print("wrong value for 'processing_width' in json file. Should be integer. eg. 640")
sys.exit(1)
MUXER_OUTPUT_HEIGHT = config["processing_height"]
if type(MUXER_OUTPUT_HEIGHT) != type(1):
print("wrong value for 'processing_height' in json file. Should be integer. eg. 480")
sys.exit(1)
image_timer = config["image_timer"]
if type(image_timer) != type(1):
print("wrong value for 'image_timer' in json file. Should be integer. eg. 600")
sys.exit(1)
queue_size = config["queue_size"]
if type(queue_size) != type(1):
print("wrong value for 'queue_size' in json file. Should be integer and greater than 0. e.g. 20")
sys.exit(1)
else:
if queue_size == 0:
print("'queue_size' cannot be 0. Switching to default value 20.")
queue_size = 20
time.sleep(5)
except Exception as e:
print(e)
print("Error in json file")
sys.exit(1)
number_sources = len(list(sources))
id_dict = {}
fps_streams={}
for i in range(number_sources):
#initialise id dictionary to keep track of object_id streamwise
id_dict[i] = Queue(maxsize=queue_size)
fps_streams["stream{0}".format(i)]=GETFPS(i)
#create image directories for separate streams
if not os.path.exists(os.path.join(path1,"stream_"+str(i))):
os.mkdir(os.path.join(path1,"stream_"+str(i)))
if not os.path.exists(os.path.join(path2,"stream_"+str(i))):
os.mkdir(os.path.join(path2,"stream_"+str(i)))
# Callback function for deep-copying an NvDsEventMsgMeta struct
def meta_copy_func(data, user_data):
# Cast data to pyds.NvDsUserMeta
user_meta = pyds.NvDsUserMeta.cast(data)
src_meta_data = user_meta.user_meta_data
# Cast src_meta_data to pyds.NvDsEventMsgMeta
srcmeta = pyds.NvDsEventMsgMeta.cast(src_meta_data)
# Duplicate the memory contents of srcmeta to dstmeta
# First use pyds.get_ptr() to get the C address of srcmeta, then
# use pyds.memdup() to allocate dstmeta and copy srcmeta into it.
# pyds.memdup returns C address of the allocated duplicate.
dstmeta_ptr = pyds.memdup(pyds.get_ptr(srcmeta),
sys.getsizeof(pyds.NvDsEventMsgMeta))
# Cast the duplicated memory to pyds.NvDsEventMsgMeta
dstmeta = pyds.NvDsEventMsgMeta.cast(dstmeta_ptr)
# Duplicate contents of ts field. Note that reading srcmeat.ts
# returns its C address. This allows to memory operations to be
# performed on it.
dstmeta.ts = pyds.memdup(srcmeta.ts, MAX_TIME_STAMP_LEN + 1)
# Copy the sensorStr. This field is a string property. The getter (read)
# returns its C address. The setter (write) takes string as input,
# allocates a string buffer and copies the input string into it.
# pyds.get_string() takes C address of a string and returns the reference
# to a string object and the assignment inside the binder copies content.
dstmeta.sensorStr = pyds.get_string(srcmeta.sensorStr)
if srcmeta.objSignature.size > 0:
dstmeta.objSignature.signature = pyds.memdup(
srcmeta.objSignature.signature, srcmeta.objSignature.size)
dstmeta.objSignature.size = srcmeta.objSignature.size
if srcmeta.extMsgSize > 0:
if srcmeta.objType == pyds.NvDsObjectType.NVDS_OBJECT_TYPE_VEHICLE:
srcobj = pyds.NvDsVehicleObject.cast(srcmeta.extMsg)
obj = pyds.alloc_nvds_vehicle_object()
obj.type = pyds.get_string(srcobj.type)
obj.make = pyds.get_string(srcobj.make)
obj.model = pyds.get_string(srcobj.model)
obj.color = pyds.get_string(srcobj.color)
obj.license = pyds.get_string(srcobj.license)
obj.region = pyds.get_string(srcobj.region)
dstmeta.extMsg = obj
dstmeta.extMsgSize = sys.getsizeof(pyds.NvDsVehicleObject)
if srcmeta.objType == pyds.NvDsObjectType.NVDS_OBJECT_TYPE_PERSON:
srcobj = pyds.NvDsPersonObject.cast(srcmeta.extMsg)
obj = pyds.alloc_nvds_person_object()
obj.age = srcobj.age
obj.gender = pyds.get_string(srcobj.gender)
obj.cap = pyds.get_string(srcobj.cap)
obj.hair = pyds.get_string(srcobj.hair)
obj.apparel = pyds.get_string(srcobj.apparel)
dstmeta.extMsg = obj
dstmeta.extMsgSize = sys.getsizeof(pyds.NvDsPersonObject)
return dstmeta
# Callback function for freeing an NvDsEventMsgMeta instance
def meta_free_func(data, user_data):
user_meta = pyds.NvDsUserMeta.cast(data)
srcmeta = pyds.NvDsEventMsgMeta.cast(user_meta.user_meta_data)
# pyds.free_buffer takes C address of a buffer and frees the memory
# It's a NOP if the address is NULL
pyds.free_buffer(srcmeta.ts)
pyds.free_buffer(srcmeta.sensorStr)
if srcmeta.objSignature.size > 0:
pyds.free_buffer(srcmeta.objSignature.signature)
srcmeta.objSignature.size = 0
if srcmeta.extMsgSize > 0:
if srcmeta.objType == pyds.NvDsObjectType.NVDS_OBJECT_TYPE_VEHICLE:
obj = pyds.NvDsVehicleObject.cast(srcmeta.extMsg)
pyds.free_buffer(obj.type)
pyds.free_buffer(obj.color)
pyds.free_buffer(obj.make)
pyds.free_buffer(obj.model)
pyds.free_buffer(obj.license)
pyds.free_buffer(obj.region)
if srcmeta.objType == pyds.NvDsObjectType.NVDS_OBJECT_TYPE_PERSON:
obj = pyds.NvDsPersonObject.cast(srcmeta.extMsg)
pyds.free_buffer(obj.gender)
pyds.free_buffer(obj.cap)
pyds.free_buffer(obj.hair)
pyds.free_buffer(obj.apparel)
pyds.free_gbuffer(srcmeta.extMsg)
srcmeta.extMsgSize = 0
def generate_vehicle_meta(data):
obj = pyds.NvDsVehicleObject.cast(data)
obj.type = "sedan"
obj.color = "blue"
obj.make = "Bugatti"
obj.model = "M"
obj.license = "XX1234"
obj.region = "CA"
return obj
def generate_person_meta(data):
obj = pyds.NvDsPersonObject.cast(data)
obj.age = 45
obj.cap = "none"
obj.hair = "black"
obj.gender = "male"
obj.apparel = "formal"
return obj
def generate_event_msg_meta(data, class_id):
meta = pyds.NvDsEventMsgMeta.cast(data)
meta.sensorId = 0
meta.placeId = 0
meta.moduleId = 0
meta.sensorStr = "sensor-0"
meta.ts = pyds.alloc_buffer(MAX_TIME_STAMP_LEN + 1)
pyds.generate_ts_rfc3339(meta.ts, MAX_TIME_STAMP_LEN)
# This demonstrates how to attach custom objects.
# Any custom object as per requirement can be generated and attached
# like NvDsVehicleObject / NvDsPersonObject. Then that object should
# be handled in payload generator library (nvmsgconv.cpp) accordingly.
if class_id == PGIE_CLASS_ID_T:
meta.type = pyds.NvDsEventType.NVDS_EVENT_MOVING
meta.objType = pyds.NvDsObjectType.NVDS_OBJECT_TYPE_VEHICLE
meta.objClassId = PGIE_CLASS_ID_T
obj = pyds.alloc_nvds_vehicle_object()
obj = generate_vehicle_meta(obj)
meta.extMsg = obj
meta.extMsgSize = sys.getsizeof(pyds.NvDsVehicleObject)
if class_id == PGIE_CLASS_ID_TRUCK:
meta.type = pyds.NvDsEventType.NVDS_EVENT_ENTRY
meta.objType = pyds.NvDsObjectType.NVDS_OBJECT_TYPE_PERSON
meta.objClassId = PGIE_CLASS_ID_TRUCK
obj = pyds.alloc_nvds_person_object()
obj = generate_person_meta(obj)
meta.extMsg = obj
meta.extMsgSize = sys.getsizeof(pyds.NvDsPersonObject)
return meta
# tiler_sink_pad_buffer_probe will extract metadata received on OSD sink pad
# and update params for drawing rectangle, object information etc.
def tiler_src_pad_buffer_probe(pad,info,u_data):
global init
frame_number=0
num_rects=0
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
frame_number=frame_meta.frame_num
l_obj=frame_meta.obj_meta_list
num_rects = frame_meta.num_obj_meta
while l_obj is not None:
try:
# Casting l_obj.data to pyds.NvDsObjectMeta
obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)
except StopIteration:
break
l_user_meta = obj_meta.obj_user_meta_list
while l_user_meta:
try:
user_meta = pyds.NvDsUserMeta.cast(l_user_meta.data)
if user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type("NVIDIA.DSANALYTICSOBJ.USER_META"):
user_meta_data = pyds.NvDsAnalyticsObjInfo.cast(user_meta.user_meta_data)
if user_meta_data.roiStatus:
if obj_meta.object_id not in list(id_dict[frame_meta.pad_index].queue):
if id_dict[frame_meta.pad_index].full():
id_dict[frame_meta.pad_index].get()
id_dict[frame_meta.pad_index].put(obj_meta.object_id)
#write image when object detected in "positive" folder
try:
frame = get_frame(gst_buffer, frame_meta.batch_id)
name= "img_"+datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")+".jpg"
cv2.imwrite(os.path.join(os.path.join(path1,"stream_"+str(frame_meta.pad_index)), name), frame)
except cv2.error as e:
print(e)
except StopIteration:
break
try:
l_user_meta = l_user_meta.next
except StopIteration:
break
try:
l_obj=l_obj.next
except StopIteration:
break
#write image every n secs if object not detected in "negative" folder
if time.time() - init > image_timer:
if frame_meta.num_obj_meta==0:
try:
frame = get_frame(gst_buffer, frame_meta.batch_id)
name= "img_"+datetime.datetime.now().strftime("%Y%m%d_%H%M%S")+".jpg"
cv2.imwrite(os.path.join(os.path.join(path2,"stream_"+str(frame_meta.pad_index)), name), frame)
init = time.time()
except cv2.error as e:
print(e)
# Get frame rate through this probe
fps_streams["stream{0}".format(frame_meta.pad_index)].get_fps()
# print([list(id_dict[x].queue) for x in list(id_dict)])
try:
l_frame=l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
def get_frame(gst_buffer, batch_id):
n_frame=pyds.get_nvds_buf_surface(hash(gst_buffer),batch_id)
#convert python array into numy array format.
frame_image=np.array(n_frame,copy=True,order='C')
#covert the array into cv2 default color format
frame_image=cv2.cvtColor(frame_image,cv2.COLOR_RGBA2BGRA)
return frame_image
def cb_newpad(decodebin, decoder_src_pad,data):
caps=decoder_src_pad.get_current_caps()
if not caps:
caps = decoder_src_pad.query_caps()
gststruct=caps.get_structure(0)
gstname=gststruct.get_name()
source_bin=data
features=caps.get_features(0)
if(gstname.find("video")!=-1):
if features.contains("memory:NVMM"):
bin_ghost_pad=source_bin.get_static_pad("src")
# osd_sink_pad_buffer_probe will extract metadata received on OSD sink pad
# and update params for drawing rectangle, object information etc.
# IMPORTANT NOTE:
# a) probe() callbacks are synchronous and thus holds the buffer
# (info.get_buffer()) from traversing the pipeline until user return.
# b) loops inside probe() callback could be costly in python.
# So users shall optimize according to their use-case.
def osd_sink_pad_buffer_probe(pad, info, u_data):
frame_number = 0
# Intiallizing object counter with 0.
obj_counter = {
PGIE_CLASS_ID_T: 0,
PGIE_CLASS_ID_INGOT: 0,
PGIE_CLASS_ID_TRUCK: 0
# PGIE_CLASS_ID_ROADSIGN: 0
}
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return
# Retrieve batch metadata from the gst_buffer
# Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
# C address of gst_buffer as input, which is obtained with hash(gst_buffer)
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
if not batch_meta:
return Gst.PadProbeReturn.OK
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
# Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
# The casting is done by pyds.NvDsFrameMeta.cast()
# The casting also keeps ownership of the underlying memory
# in the C code, so the Python garbage collector will leave
# it alone.
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
continue
is_first_object = True
# Short example of attribute access for frame_meta:
# print("Frame Number is ", frame_meta.frame_num)
# print("Source id is ", frame_meta.source_id)
# print("Batch id is ", frame_meta.batch_id)
# print("Source Frame Width ", frame_meta.source_frame_width)
# print("Source Frame Height ", frame_meta.source_frame_height)
# print("Num object meta ", frame_meta.num_obj_meta)
frame_number = frame_meta.frame_num
l_obj = frame_meta.obj_meta_list
while l_obj is not None:
try:
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
except StopIteration:
continue
if obj_meta.confidence < 0.45:
try:
l_obj = l_obj.next
except StopIteration:
break
continue
# Update the object text display
txt_params = obj_meta.text_params
# Set display_text. Any existing display_text string will be
# freed by the bindings module.
txt_params.display_text = pgie_classes_str[obj_meta.class_id]
obj_counter[obj_meta.class_id] += 1
# Font , font-color and font-size
txt_params.font_params.font_name = "Serif"
txt_params.font_params.font_size = 10
# set(red, green, blue, alpha); set to White
txt_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
# Text background color
txt_params.set_bg_clr = 1
# set(red, green, blue, alpha); set to Black
txt_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
# Ideally NVDS_EVENT_MSG_META should be attached to buffer by the
# component implementing detection / recognition logic.
# Here it demonstrates how to use / attach that meta data.
if is_first_object and (frame_number % 30) == 0:
# Frequency of messages to be send will be based on use case.
# Here message is being sent for first object every 30 frames.
# Allocating an NvDsEventMsgMeta instance and getting
# reference to it. The underlying memory is not manged by
# Python so that downstream plugins can access it. Otherwise
# the garbage collector will free it when this probe exits.
msg_meta = pyds.alloc_nvds_event_msg_meta()
msg_meta.bbox.top = obj_meta.rect_params.top
msg_meta.bbox.left = obj_meta.rect_params.left
msg_meta.bbox.width = obj_meta.rect_params.width
msg_meta.bbox.height = obj_meta.rect_params.height
msg_meta.frameId = frame_number
msg_meta.trackingId = long_to_uint64(obj_meta.object_id)
msg_meta.confidence = obj_meta.confidence
msg_meta = generate_event_msg_meta(msg_meta, obj_meta.class_id)
user_event_meta = pyds.nvds_acquire_user_meta_from_pool(
batch_meta)
if user_event_meta:
user_event_meta.user_meta_data = msg_meta
user_event_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_EVENT_MSG_META
# Setting callbacks in the event msg meta. The bindings
# layer will wrap these callables in C functions.
# Currently only one set of callbacks is supported.
pyds.user_copyfunc(user_event_meta, meta_copy_func)
pyds.user_releasefunc(user_event_meta, meta_free_func)
pyds.nvds_add_user_meta_to_frame(frame_meta,
user_event_meta)
else:
print("Error in attaching event meta to buffer\n")
is_first_object = False
try:
l_obj = l_obj.next
except StopIteration:
break
try:
l_frame = l_frame.next
except StopIteration:
break
print("Frame Number =", frame_number, "T Count =",
obj_counter[PGIE_CLASS_ID_T], "Truck Count =",
obj_counter[PGIE_CLASS_ID_TRUCK])
return Gst.PadProbeReturn.OK
def custom_child_added(child_proxy,Object,name,user_data):
print("custom_child_added:", name, "\n")
if(name.find("rtspsrc") != -1):
Object.connect("child-added",custom_child_added,user_data)
if "source" in name:
source_element = child_proxy.get_by_name("source")
if source_element.find_property('drop-on-latency') != None:
Object.set_property("drop-on-latency", True)
def on_pad_added(src, pad, depay):
print("on_pad_added")
sinkpad = depay.get_static_pad("sink")
if not sinkpad.is_linked():
ret = pad.link(sinkpad)
if ret != Gst.PadLinkReturn.OK:
print("Failed to link pads: %s" % ret)
def create_source_bin(index, uri):
bin_name = "source-bin-%02d" % index
nbin = Gst.Bin.new(bin_name)
source = Gst.ElementFactory.make("rtspsrc", "source")
source.set_property('location', uri)
# source.set_property('latency', 300)
queue1 = Gst.ElementFactory.make("queue", "queue1")
depay = Gst.ElementFactory.make("rtph264depay", "depay")
parse = Gst.ElementFactory.make("h264parse", "parse")
tee = Gst.ElementFactory.make("tee", "tee")
# splitmuxsink = Gst.ElementFactory.make("splitmuxsink", "splitmuxsink")
# splitmuxsink.set_property("location", "/opt/nvidia/deepstream/deepstream-6.3/sources/deepstream_python_apps/apps/deepstream-test3/video/output%05d.mp4")
# splitmuxsink.set_property("max-size-time", 10 * Gst.SECOND)
dec = Gst.ElementFactory.make("nvv4l2decoder", "dec")
# dec = Gst.ElementFactory.make("avdec_h264", "dec")
convert = Gst.ElementFactory.make("nvvideoconvert", "convert")
nbin.add(source)
nbin.add(depay)
nbin.add(parse)
nbin.add(tee)
nbin.add(dec)
nbin.add(convert)
nbin.add(queue1)
source.connect("pad-added", on_pad_added, queue1)
queue1.link(depay)
depay.link(parse) # Связываем depay с parse
parse.link(tee) # Связываем parse с tee
# queue2 = Gst.ElementFactory.make("queue", "queue2")
# nbin.add(queue2)
# nbin.add(splitmuxsink)
# tee_msg_pad = tee.get_request_pad('src_%u')
# sink_pad = queue2.get_static_pad("sink")
# tee_msg_pad.link(sink_pad)
# queue2.link(splitmuxsink)
tee_render_pad = tee.get_request_pad("src_%u")
sink_pad = dec.get_static_pad("sink")
tee_render_pad.link(sink_pad)
dec.link(convert)
convert_pad = convert.get_static_pad("src")
bin_pad = nbin.add_pad(Gst.GhostPad.new("stream_%u" % index, convert_pad))
dec.connect("pad-added", cb_newpad, nbin)
return nbin
def generate_event_msg_meta(data, class_id):
meta = pyds.NvDsEventMsgMeta.cast(data)
meta.sensorId = 0
meta.placeId = 0
meta.moduleId = 0
meta.sensorStr = "sensor-0"
meta.ts = pyds.alloc_buffer(MAX_TIME_STAMP_LEN + 1)
pyds.generate_ts_rfc3339(meta.ts, MAX_TIME_STAMP_LEN)
# This demonstrates how to attach custom objects.
# Any custom object as per requirement can be generated and attached
# like NvDsVehicleObject / NvDsPersonObject. Then that object should
# be handled in payload generator library (nvmsgconv.cpp) accordingly.
if class_id == PGIE_CLASS_ID_T:
meta.type = pyds.NvDsEventType.NVDS_EVENT_MOVING
meta.objType = pyds.NvDsObjectType.NVDS_OBJECT_TYPE_VEHICLE
meta.objClassId = PGIE_CLASS_ID_T
obj = pyds.alloc_nvds_vehicle_object()
obj = generate_vehicle_meta(obj)
meta.extMsg = obj
meta.extMsgSize = sys.getsizeof(pyds.NvDsVehicleObject)
if class_id == PGIE_CLASS_ID_TRUCK:
meta.type = pyds.NvDsEventType.NVDS_EVENT_ENTRY
meta.objType = pyds.NvDsObjectType.NVDS_OBJECT_TYPE_PERSON
meta.objClassId = PGIE_CLASS_ID_TRUCK
obj = pyds.alloc_nvds_person_object()
obj = generate_person_meta(obj)
meta.extMsg = obj
meta.extMsgSize = sys.getsizeof(pyds.NvDsPersonObject)
return meta
def main(args):
global conn_str
# Standard GStreamer initialization
GObject.threads_init()
Gst.init(None)
pyds.register_user_copyfunc(meta_copy_func)
pyds.register_user_releasefunc(meta_free_func)
# Create gstreamer elements */
# Create Pipeline element that will form a connection of other elements
print("Creating Pipeline \n ")
pipeline = Gst.Pipeline()
is_live = False
if not pipeline:
sys.stderr.write(" Unable to create Pipeline \n")
print("Creating streamux \n ")
# Create nvstreammux instance to form batches from one or more sources.
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
sys.stderr.write(" Unable to create NvStreamMux \n")
pipeline.add(streammux)
for i in range(number_sources):
print("Creating source_bin ",i," \n ")
uri_name=sources["stream_"+str(i)]
if uri_name.find("rtsp://") == 0 :
is_live = True
source_bin=create_source_bin(i, uri_name)
for pad in source_bin.iterate_pads():
print(f"PAD!!! {pad.get_name()}")
pipeline.add(source_bin)
padname="sink_%u" %i
sinkpad= streammux.get_request_pad(padname)
srcpad=source_bin.get_static_pad("stream_%u" % i)
srcpad.link(sinkpad)
print("Creating Pgie \n ")
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
if not pgie:
sys.stderr.write(" Unable to create pgie \n")
tracker = Gst.ElementFactory.make("nvtracker", "tracker")
if not tracker:
sys.stderr.write(" Unable to create tracker \n")
print("Creating nvdsanalytics \n ")
nvanalytics = Gst.ElementFactory.make("nvdsanalytics", "analytics")
if not nvanalytics:
sys.stderr.write(" Unable to create nvanalytics \n")
nvanalytics.set_property("config-file", "config_nvdsanalytics.txt")
print("Creating tiler \n ")
tiler=Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
if not tiler:
sys.stderr.write(" Unable to create tiler \n")
print("Creating nvvidconv1 \n ")
nvvidconv1 = Gst.ElementFactory.make("nvvideoconvert", "convertor1")
if not nvvidconv1:
sys.stderr.write(" Unable to create nvvidconv1 \n")
print("Creating filter1 \n ")
caps1 = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
filter1 = Gst.ElementFactory.make("capsfilter", "filter1")
if not filter1:
sys.stderr.write(" Unable to get the caps filter1 \n")
filter1.set_property("caps", caps1)
print("Creating nvvidconv \n ")
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
if not nvvidconv:
sys.stderr.write(" Unable to create nvvidconv \n")
print("Creating nvosd \n ")
nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
if not nvosd:
sys.stderr.write(" Unable to create nvosd \n")
# nvosd.set_property('process-mode',OSD_PROCESS_MODE)
# nvosd.set_property('display-text',OSD_DISPLAY_TEXT)
if(is_aarch64()):
print("Creating transform \n ")
transform=Gst.ElementFactory.make("nvegltransform", "nvegl-transform")
if not transform:
sys.stderr.write(" Unable to create transform \n")
if display:
print("Creating EGLSink \n")
sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
if not sink:
sys.stderr.write(" Unable to create egl sink \n")
else:
print("Creating FakeSink \n")
sink = Gst.ElementFactory.make("fakesink", "fakesink")
if not sink:
sys.stderr.write(" Unable to create fake sink \n")
if is_live:
print("Atleast one of the sources is live")
streammux.set_property('live-source', 1)
# Set properties of tracker
config = configparser.ConfigParser()
config.read('/opt/nvidia/deepstream/deepstream-6.3/sources/DeepStream-Yolo/tr.txt')
config.sections()
for key in config['tracker']:
if key == 'tracker-width':
tracker_width = config.getint('tracker', key)
tracker.set_property('tracker-width', tracker_width)
if key == 'tracker-height':
tracker_height = config.getint('tracker', key)
tracker.set_property('tracker-height', tracker_height)
if key == 'gpu-id':
tracker_gpu_id = config.getint('tracker', key)
tracker.set_property('gpu_id', tracker_gpu_id)
if key == 'll-lib-file':
tracker_ll_lib_file = config.get('tracker', key)
tracker.set_property('ll-lib-file', tracker_ll_lib_file)
if key == 'll-config-file':
tracker_ll_config_file = config.get('tracker', key)
tracker.set_property('ll-config-file', tracker_ll_config_file)
if key == 'enable-batch-process':
tracker_enable_batch_process = config.getint('tracker', key)
tracker.set_property('enable_batch_process',
tracker_enable_batch_process)
streammux.set_property('width', MUXER_OUTPUT_WIDTH)
streammux.set_property('height', MUXER_OUTPUT_HEIGHT)
streammux.set_property('batch-size', number_sources)
streammux.set_property('batched-push-timeout', 4000000)
pgie.set_property('config-file-path', "primary_config.txt")
pgie_batch_size=pgie.get_property("batch-size")
if(pgie_batch_size != number_sources):
print("WARNING: Overriding infer-config batch-size",pgie_batch_size," with number of sources ", number_sources," \n")
pgie.set_property("batch-size",number_sources)
tiler_rows=int(math.sqrt(number_sources))
tiler_columns=int(math.ceil((1.0*number_sources)/tiler_rows))
tiler.set_property("rows",tiler_rows)
tiler.set_property("columns",tiler_columns)
tiler.set_property("width", TILED_OUTPUT_WIDTH)
tiler.set_property("height", TILED_OUTPUT_HEIGHT)
sink.set_property("qos",0)
sink.set_property("sync",0)
tee = Gst.ElementFactory.make("tee", "nvsink-tee")
if not tee:
sys.stderr.write(" Unable to create tee \n")
msgconv = Gst.ElementFactory.make("nvmsgconv", "nvmsg-converter")
if not msgconv:
sys.stderr.write(" Unable to create msgconv \n")
msgbroker = Gst.ElementFactory.make("nvmsgbroker", "nvmsg-broker")
if not msgbroker:
sys.stderr.write(" Unable to create msgbroker \n")
msgconv.set_property('config', MSCONV_CONFIG_FILE)
msgconv.set_property('payload-type', schema_type)
msgbroker.set_property('proto-lib', proto_lib)
msgbroker.set_property('conn-str', conn_str)
if cfg_file is not None:
msgbroker.set_property('config', cfg_file)
if topic is not None:
msgbroker.set_property('topic', topic)
msgbroker.set_property('sync', False)
if not is_aarch64():
# Use CUDA unified memory in the pipeline so frames
# can be easily accessed on CPU in Python.
mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
streammux.set_property("nvbuf-memory-type", mem_type)
nvvidconv.set_property("nvbuf-memory-type", mem_type)
nvvidconv1.set_property("nvbuf-memory-type", mem_type)
tiler.set_property("nvbuf-memory-type", mem_type)
queue1=Gst.ElementFactory.make("queue","queue1")
queue2=Gst.ElementFactory.make("queue","queue2")
queue3=Gst.ElementFactory.make("queue","queue3")
queue4=Gst.ElementFactory.make("queue","queue4")
queue5=Gst.ElementFactory.make("queue","queue5")
queue6=Gst.ElementFactory.make("queue","queue6")
queue7=Gst.ElementFactory.make("queue","queue7")
queue8=Gst.ElementFactory.make("queue","queue8")
queue9=Gst.ElementFactory.make("queue","queue9")
queue20=Gst.ElementFactory.make("queue","queue20")
queue21=Gst.ElementFactory.make("queue","queue21")
queue22 = Gst.ElementFactory.make("queue", "queue22")
pipeline.add(queue1)
pipeline.add(queue2)
pipeline.add(queue3)
pipeline.add(queue4)
pipeline.add(queue5)
pipeline.add(queue6)
pipeline.add(queue7)
pipeline.add(queue8)
pipeline.add(queue9)
pipeline.add(queue20)
pipeline.add(queue21)
# pipeline.add(queue22)
print("Adding elements to Pipeline \n")
pipeline.add(pgie)
pipeline.add(tracker)
pipeline.add(nvanalytics)
pipeline.add(tiler)
pipeline.add(nvvidconv)
pipeline.add(filter1)
pipeline.add(nvvidconv1)
pipeline.add(nvosd)
pipeline.add(tee)
pipeline.add(msgconv)
pipeline.add(msgbroker)
if is_aarch64():
pipeline.add(transform)
pipeline.add(sink)
print("Linking elements in the Pipeline \n")
streammux.link(queue1)
queue1.link(pgie)
pgie.link(queue2)
queue2.link(tracker)
tracker.link(queue3)
queue3.link(nvanalytics)
nvanalytics.link(queue4)
queue4.link(nvvidconv1)
nvvidconv1.link(queue5)
queue5.link(filter1)
filter1.link(queue6)
queue6.link(tiler)
tiler.link(queue7)
queue7.link(nvvidconv)
nvvidconv.link(queue8)
queue8.link(nvosd)
nvosd.link(tee)
queue20.link(msgconv)
msgconv.link(msgbroker)
# queue21.link(sink)
if is_aarch64() and display:
# nvosd.link(queue9)
queue9.link(transform)
transform.link(sink)
else:
# nvosd.link(queue9)
queue9.link(sink)
sink_pad = queue20.get_static_pad("sink")
tee_msg_pad = tee.get_request_pad('src_%u')
tee_msg_pad.link(sink_pad)
sink_pad = queue9.get_static_pad("sink")
tee_render_pad = tee.get_request_pad("src_%u")
tee_render_pad.link(sink_pad)
osdsinkpad = nvosd.get_static_pad("sink")
osdsinkpad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0)
# create an event loop and feed gstreamer bus messages to it
loop = GObject.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect ("message", bus_call, loop)
tiler_src_pad=tiler.get_static_pad("sink")
if not tiler_src_pad:
sys.stderr.write(" Unable to get src pad \n")
tiler_src_pad.add_probe(Gst.PadProbeType.BUFFER, tiler_src_pad_buffer_probe, 0)
# List the sources
print("Now playing...")
for i, src in sources.items():
print(i, ": ", src)
print("Starting pipeline \n")
# start play back and listed to events
pipeline.set_state(Gst.State.PLAYING)
os.environ["GST_DEBUG_DUMP_DOT_DIR"] = "/workspace"
Gst.debug_bin_to_dot_file(pipeline, Gst.DebugGraphDetails.ALL, "pipeline")
try:
loop.run()
except:
pass
# cleanup
pyds.unset_callback_funcs()
print("Exiting app\n")
pipeline.set_state(Gst.State.NULL)
if __name__ == '__main__':
sys.exit(main(sys.argv))
here’s the most interesting thing about the create_source_bin method. I commented on adding splitmuxsink to the tee object there. I run with 2 rtsp streams and if I run without splitmuxsink, then everything works (the video stream is output, messages go to kafka, the detector and tracker work). If you run with splitmuxsink, then all the elements are initialized and everything freezes, it does not output anything (if you look at GST_DEBUG=3)