Hi @fanzh
Please find the code and configurations to replicate the issue
#!/usr/bin/env python3
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2019-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import sys
sys.path.append('../')
from pathlib import Path
import gi
import configparser
import argparse
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
from ctypes import *
import time
import sys
import math
import platform
from common.is_aarch_64 import is_aarch64
from common.FPS import PERF_DATA
from ssd_parser import nvds_infer_parse_custom_tf_ssd, DetectionParam, NmsParam, BoxSizeParam
import os
import pyds
import pickle
import socket
from queue import Queue
import threading
import numpy as np
import logging
import traceback
import json
logging.basicConfig(
filename='app.log', # Specify the file name
filemode='a', # Append mode
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # Log format
level=logging.INFO # Set the logging level
)
# Get a logger instance
FILE_LOGGER = logging.getLogger('FILE_LOGGER')
no_display = False
silent = False
file_loop = True
perf_data = None
MAX_DISPLAY_LEN=64
PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3
MUXER_OUTPUT_WIDTH=1920
MUXER_OUTPUT_HEIGHT=1080
MUXER_BATCH_TIMEOUT_USEC = 33000
TILED_OUTPUT_WIDTH=1280
TILED_OUTPUT_HEIGHT=720
GST_CAPS_FEATURES_NVMM="memory:NVMM"
OSD_PROCESS_MODE= 0
OSD_DISPLAY_TEXT= 1
pgie_classes_str= ["Vehicle", "TwoWheeler", "Person","RoadSign"]
MAX_DISPLAY_LEN = 64
PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3
MUXER_OUTPUT_WIDTH = 1920
MUXER_OUTPUT_HEIGHT = 1080
MUXER_BATCH_TIMEOUT_USEC = 4000000
TILED_OUTPUT_WIDTH = 1280
TILED_OUTPUT_HEIGHT = 720
GST_CAPS_FEATURES_NVMM = "memory:NVMM"
OSD_PROCESS_MODE = 0
OSD_DISPLAY_TEXT = 0
pgie_classes_str = ["Vehicle", "TwoWheeler", "Person", "RoadSign"]
CLASS_NB = 91
ACCURACY_ALL_CLASS = 0.5
UNTRACKED_OBJECT_ID = 0xffffffffffffffff
IMAGE_HEIGHT = 1080
IMAGE_WIDTH = 1920
MIN_BOX_WIDTH = 32
MIN_BOX_HEIGHT = 32
TOP_K = 20
IOU_THRESHOLD = 0.3
OUTPUT_VIDEO_NAME = "./out.mp4"
MUXER_BATCH_TIMEOUT_USEC = 33000
MAX_NUM_SOURCES = 3
class TCPConnectionThread(threading.Thread):
def __init__(self, host, port):
super().__init__()
self.host = host
self.port = port
self.queue = Queue(maxsize=100)
self.stop_event = threading.Event()
self.sock = None
def connect(self):
while not self.stop_event.is_set():
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
return True
except Exception as e:
print("From DS-7 pipeline- Error connecting:", e)
time.sleep(1) # Retry after 1 second
return False
def run(self):
while not self.stop_event.is_set():
if not self.sock:
if not self.connect():
# If unable to connect, wait before retrying
time.sleep(1)
continue
try:
while not self.stop_event.is_set():
if not self.queue.empty():
data = self.queue.get()
# Pickle the data
serialized_data = pickle.dumps(data)
# Calculate the length of the data
data_length = len(serialized_data)
# Convert data length to bytes (4 bytes for an integer)
length_bytes = data_length.to_bytes(4, byteorder='big')
# Send the length of the data
self.sock.sendall(length_bytes)
# Send the data
self.sock.sendall(serialized_data)
else:
time.sleep(0.1) # Sleep briefly to avoid busy waiting
except Exception as e:
print("Error sending data:", e)
self.sock.close()
self.sock = None
def stop(self):
self.stop_event.set()
if self.sock:
self.sock.close()
def send_message(self, obj):
try:
self.queue.put(obj,block=False)
except Exception as e:
pass
def get_label_names_from_file(filepath):
""" Read a label file and convert it to string list """
labels = "connected components"
return labels
def osd_sink_pad_buffer_probe(pad, info, u_data):
frame_number = 0
# Intiallizing object counter with 0.
obj_counter = dict(enumerate([0] * CLASS_NB))
num_rects = 0
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return
# Retrieve batch metadata from the gst_buffer
# Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
# C address of gst_buffer as input, which is obtained with hash(gst_buffer)
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
# Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
# The casting also keeps ownership of the underlying memory
# in the C code, so the Python garbage collector will leave
# it alone.
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
frame_number = frame_meta.frame_num
num_rects = frame_meta.num_obj_meta
l_obj = frame_meta.obj_meta_list
while l_obj is not None:
try:
# Casting l_obj.data to pyds.NvDsObjectMeta
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
except StopIteration:
break
obj_counter[obj_meta.class_id] += 1
try:
l_obj = l_obj.next
except StopIteration:
break
# Acquiring a display meta object. The memory ownership remains in
# the C code so downstream plugins can still access it. Otherwise
# the garbage collector will claim it when this probe function exits.
display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
display_meta.num_labels = 1
py_nvosd_text_params = display_meta.text_params[0]
# Setting display text to be shown on screen
# Note that the pyds module allocates a buffer for the string, and the
# memory will not be claimed by the garbage collector.
# Reading the display_text field here will return the C address of the
# allocated string. Use pyds.get_string() to get the string content.
id_dict = {
val: index
for index, val in enumerate(get_label_names_from_file("labels.txt"))
}
disp_string = "Frame Number={} Number of Objects={} Vehicle_count={} Person_count={}"
py_nvosd_text_params.display_text = disp_string.format(
frame_number,
num_rects,
obj_counter[id_dict["car"]],
obj_counter[id_dict["person"]],
)
# Now set the offsets where the string should appear
py_nvosd_text_params.x_offset = 10
py_nvosd_text_params.y_offset = 12
# Font , font-color and font-size
py_nvosd_text_params.font_params.font_name = "Serif"
py_nvosd_text_params.font_params.font_size = 10
# set(red, green, blue, alpha); set to White
py_nvosd_text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
# Text background color
py_nvosd_text_params.set_bg_clr = 1
# set(red, green, blue, alpha); set to Black
py_nvosd_text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
# Using pyds.get_string() to get display_text as string
print(pyds.get_string(py_nvosd_text_params.display_text))
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
try:
l_frame = l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
def add_obj_meta_to_frame(frame_object, batch_meta, frame_meta, label_names):
""" Inserts an object into the metadata """
# this is a good place to insert objects into the metadata.
# Here's an example of inserting a single object.
obj_meta = pyds.nvds_acquire_obj_meta_from_pool(batch_meta)
# Set bbox properties. These are in input resolution.
rect_params = obj_meta.rect_params
rect_params.left = int(frame_object.left)
rect_params.top = int(frame_object.top)
rect_params.width = int(frame_object.width)
rect_params.height = int(frame_object.height)
# print(rect_params.height)
# Semi-transparent yellow backgroud
rect_params.has_bg_color = 0
rect_params.bg_color.set(1, 1, 0, 0.4)
# Red border of width 3
rect_params.border_width = 3
rect_params.border_color.set(1, 0, 0, 1)
# Set object info including class, detection confidence, etc.
obj_meta.confidence = frame_object.detectionConfidence
obj_meta.class_id = frame_object.classId
# There is no tracking ID upon detection. The tracker will
# assign an ID.
obj_meta.object_id = UNTRACKED_OBJECT_ID
lbl_id = frame_object.classId
if lbl_id >= len(label_names):
lbl_id = 0
# Set the object classification label.
obj_meta.obj_label = label_names[lbl_id]
# Set display text for the object.
txt_params = obj_meta.text_params
if txt_params.display_text:
pyds.free_buffer(txt_params.display_text)
txt_params.x_offset = int(rect_params.left)
txt_params.y_offset = max(0, int(rect_params.top) - 10)
txt_params.display_text = (
label_names[lbl_id] + " " + "{:04.3f}".format(frame_object.detectionConfidence)
)
# Font , font-color and font-size
txt_params.font_params.font_name = "Serif"
txt_params.font_params.font_size = 10
# set(red, green, blue, alpha); set to White
txt_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
# Text background color
txt_params.set_bg_clr = 1
# set(red, green, blue, alpha); set to Black
txt_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
# Inser the object into current frame meta
# This object has no parent
pyds.nvds_add_obj_meta_to_frame(frame_meta, obj_meta, None)
def pgie_sink_pad_buffer_probe(pad, info, u_data,q,c,t0):
#print(['Data_Probed',time.ctime()]) # c+=1
# print(['from sink probes ',c,(time.time()-t0)//c])
ordered_dict = OrderedDict()
# print("\n\n")
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return
# Retrieve batch metadata from the gst_buffer
# Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
# C address of gst_buffer as input, which is obtained with hash(gst_buffer)
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
# Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
# The casting also keeps ownership of the underlying memory
# in the C code, so the Python garbage collector will leave
# it alone.
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
source_id = frame_meta.source_id
frame_number = frame_meta.frame_num
# print(f"frame number sending from sink pad={frame_number}")
if source_id not in ordered_dict:
ordered_dict[source_id]=[time.time(),frame_number]
l_frame = l_frame.next
except StopIteration:
break
#print(ordered_dict)
# Define the host and port of the server
host = 'localhost'
port = 12345
# q.put(ordered_dict, host, port)
return Gst.PadProbeReturn.OK
def pgie_src_pad_buffer_probe(pad, info, u_data,connection_thread):
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return
# Retrieve batch metadata from the gst_buffer
# Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
# C address of gst_buffer as input, which is obtained with hash(gst_buffer)
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
detection_params = DetectionParam(CLASS_NB, ACCURACY_ALL_CLASS)
box_size_param = BoxSizeParam(IMAGE_HEIGHT, IMAGE_WIDTH,
MIN_BOX_WIDTH, MIN_BOX_HEIGHT)
nms_param = NmsParam(TOP_K, IOU_THRESHOLD)
label_names = get_label_names_from_file("labels.txt")
source_id_list=[]
while l_frame is not None:
try:
# Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
# The casting also keeps ownership of the underlying memory
# in the C code, so the Python garbage collector will leave
# it alone.
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
source_id = frame_meta.source_id
source_id_list.append(source_id)
# print(f"source is is {source_id}")
except StopIteration:
break
l_user = frame_meta.frame_user_meta_list
while l_user is not None:
try:
# Note that l_user.data needs a cast to pyds.NvDsUserMeta
# The casting also keeps ownership of the underlying memory
# in the C code, so the Python garbage collector will leave
# it alone.
user_meta = pyds.NvDsUserMeta.cast(l_user.data)
except StopIteration:
break
if (
user_meta.base_meta.meta_type
!= pyds.NvDsMetaType.NVDSINFER_TENSOR_OUTPUT_META
):
continue
tensor_meta = pyds.NvDsInferTensorMeta.cast(user_meta.user_meta_data)
# Boxes in the tensor meta should be in network resolution which is
# found in tensor_meta.network_info. Use this info to scale boxes to
# the input frame resolution.
layers_info = []
for i in range(tensor_meta.num_output_layers):
layer = pyds.get_nvds_LayerInfo(tensor_meta, i)
layers_info.append(layer)
frame_object_list = nvds_infer_parse_custom_tf_ssd(
layers_info, detection_params, box_size_param, nms_param
)
try:
l_user = l_user.next
except StopIteration:
break
for frame_object in frame_object_list:
add_obj_meta_to_frame(frame_object, batch_meta, frame_meta, label_names)
try:
# indicate inference is performed on the frame
frame_meta.bInferDone = True
l_frame = l_frame.next
except StopIteration:
break
connection_thread.send_message(np.array(source_id_list))
return Gst.PadProbeReturn.OK
def cb_newpad(decodebin, decoder_src_pad,data):
print("In cb_newpad\n")
caps=decoder_src_pad.get_current_caps()
if not caps:
caps = decoder_src_pad.query_caps()
gststruct=caps.get_structure(0)
gstname=gststruct.get_name()
source_bin=data
features=caps.get_features(0)
# Need to check if the pad created by the decodebin is for video and not
# audio.
print("gstname=",gstname)
if(gstname.find("video")!=-1):
# Link the decodebin pad only if decodebin has picked nvidia
# decoder plugin nvdec_*. We do this by checking if the pad caps contain
# NVMM memory features.
print("features=",features)
if features.contains("memory:NVMM"):
# Get the source bin ghost pad
bin_ghost_pad=source_bin.get_static_pad("src")
if not bin_ghost_pad.set_target(decoder_src_pad):
sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
else:
sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")
def decodebin_child_added(child_proxy,Object,name,user_data):
print("Decodebin child added:", name, "\n")
if(name.find("decodebin") != -1):
Object.connect("child-added",decodebin_child_added,user_data)
if "source" in name:
source_element = child_proxy.get_by_name("source")
if source_element.find_property('drop-on-latency') != None:
Object.set_property("drop-on-latency", True)
def create_source_bin(index,uri):
print("Creating source bin")
# Create a source GstBin to abstract this bin's content from the rest of the
# pipeline
bin_name="source-bin-%02d" %index
print(bin_name)
nbin=Gst.Bin.new(bin_name)
if not nbin:
sys.stderr.write(" Unable to create source bin \n")
# Source element for reading from the uri.
# We will use decodebin and let it figure out the container format of the
# stream and the codec and plug the appropriate demux and decode plugins.
if file_loop:
# use nvurisrcbin to enable file-loop
uri_decode_bin=Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
uri_decode_bin.set_property("file-loop", 1)
uri_decode_bin.set_property("rtsp-reconnect-interval", 10)
uri_decode_bin.set_property("rtsp-reconnect-attempts", 2160)
# uri_decode_bin.set_property("cudadec-memtype", 2)
else:
uri_decode_bin=Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
if not uri_decode_bin:
sys.stderr.write(" Unable to create uri decode bin \n")
# We set the input uri to the source element
uri_decode_bin.set_property("uri",uri)
# Connect to the "pad-added" signal of the decodebin which generates a
# callback once a new pad for raw data has beed created by the decodebin
uri_decode_bin.connect("pad-added",cb_newpad,nbin)
uri_decode_bin.connect("child-added",decodebin_child_added,nbin)
# We need to create a ghost pad for the source bin which will act as a proxy
# for the video decoder src pad. The ghost pad will not have a target right
# now. Once the decode bin creates the video decoder and generates the
# cb_newpad callback, we will set the ghost pad target to the video decoder
# src pad.
Gst.Bin.add(nbin,uri_decode_bin)
bin_pad=nbin.add_pad(Gst.GhostPad.new_no_target("src",Gst.PadDirection.SRC))
if not bin_pad:
sys.stderr.write(" Failed to add ghost pad in source bin \n")
return None
return nbin
def kill_the_pipeline(loop,pipeline,connection_thread):
pipeline.set_state(Gst.State.NULL)
connection_thread.stop()
loop.quit()
sys.exit(1)
def bus_call(bus, message, loop,pipeline,connection_thread):
"""
This is call back from gstreamer
This method takes 1 parameter and access the messages that is posted on pipeline's bus.
:param loop: this is added from mainloop so that u can kill the loop to kill entire pipeline from messges
:return: None
"""
t = message.type
if t == Gst.MessageType.EOS:
sys.stdout.write("End-of-stream\n")
FILE_LOGGER.info("I am here at the EOS part")
FILE_LOGGER.error(f"The EOS recieved from the triton Pipeline, Exiting the application")
kill_the_pipeline(loop,pipeline,connection_thread)
elif t==Gst.MessageType.WARNING:
err, debug = message.parse_warning()
FILE_LOGGER.warning(f"Warning: {err}: {debug}")
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
FILE_LOGGER.error(f"Error: {err}: {debug}")
print(f"Error: {err}: {debug}")
src_element = message.src
parent_obj = src_element.get_parent()
if parent_obj:
try:
parent_obj_name = parent_obj.name
FILE_LOGGER.info(f"parent object name is {parent_obj_name}")
print(f"parent object name is {parent_obj.name}")
# kill_the_pipeline(loop,pipeline,connection_thread)
except Exception as e:
FILE_LOGGER.info(traceback.print_exc())
print("Error! exiting the app ")
FILE_LOGGER.error("Error! exiting the app ",exc_info=True)
# kill_the_pipeline(loop,pipeline,connection_thread)
elif t == Gst.MessageType.ELEMENT:
struct = message.get_structure()
#Check for stream-eos message
if struct is not None and struct.has_name("stream-eos"):
parsed, stream_id = struct.get_uint("stream-id")
if parsed:
#Set eos status of stream to True, to be deleted in delete-sources
print("Got EOS from stream %d" % stream_id)
FILE_LOGGER.info("Got EOS from stream %d" % stream_id)
# Uncomment the below code if you need to restart the entire system
# kill_the_pipeline(loop,pipeline,connection_thread)
return True
def main(args, requested_pgie=None, config=None, disable_probe=False):
host = '127.0.0.1'
port = 54321
connection_thread = TCPConnectionThread(host, port)
connection_thread.start()
#connection_thread.send_message(stats)
global perf_data
perf_data = PERF_DATA(len(args))
number_sources=len(args)
# Standard GStreamer initialization
Gst.init(None)
# Create gstreamer elements */
# Create Pipeline element that will form a connection of other elements
print("Creating Pipeline \n ")
pipeline = Gst.Pipeline()
is_live = False
if not pipeline:
sys.stderr.write(" Unable to create Pipeline \n")
print("Creating streamux \n ")
# Create nvstreammux instance to form batches from one or more sources.
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
sys.stderr.write(" Unable to create NvStreamMux \n")
pipeline.add(streammux)
for i in range(number_sources):
print("Creating source_bin ",i," \n ")
uri_name=args[i]
if uri_name.find("rtsp://") == 0 :
is_live = True
source_bin=create_source_bin(i, uri_name)
if not source_bin:
sys.stderr.write("Unable to create source bin \n")
pipeline.add(source_bin)
padname="sink_%u" %i
sinkpad= streammux.get_request_pad(padname)
if not sinkpad:
sys.stderr.write("Unable to create sink pad bin \n")
srcpad=source_bin.get_static_pad("src")
if not srcpad:
sys.stderr.write("Unable to create src pad bin \n")
srcpad.link(sinkpad)
queue1=Gst.ElementFactory.make("queue","queue1")
queue2=Gst.ElementFactory.make("queue","queue2")
queue3=Gst.ElementFactory.make("queue","queue3")
queue4=Gst.ElementFactory.make("queue","queue4")
queue5=Gst.ElementFactory.make("queue","queue5")
pipeline.add(queue1)
pipeline.add(queue2)
pipeline.add(queue3)
pipeline.add(queue4)
pipeline.add(queue5)
nvdslogger = None
print("Creating Pgie \n ")
if True :
pgie = Gst.ElementFactory.make("nvinferserver", "primary-inference")
# elif requested_pgie != None and requested_pgie == 'nvinfer':
# pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
# else:
# pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
if not pgie:
sys.stderr.write(" Unable to create pgie : %s\n" % requested_pgie)
if disable_probe:
# Use nvdslogger for perf measurement instead of probe function
print ("Creating nvdslogger \n")
nvdslogger = Gst.ElementFactory.make("nvdslogger", "nvdslogger")
print("Creating tiler \n ")
tiler=Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
if not tiler:
sys.stderr.write(" Unable to create tiler \n")
print("Creating nvvidconv \n ")
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
if not nvvidconv:
sys.stderr.write(" Unable to create nvvidconv \n")
print("Creating nvosd \n ")
nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
if not nvosd:
sys.stderr.write(" Unable to create nvosd \n")
nvosd.set_property('process-mode',OSD_PROCESS_MODE)
nvosd.set_property('display-text',OSD_DISPLAY_TEXT)
if file_loop:
if is_aarch64():
# Set nvbuf-memory-type=4 for aarch64 for file-loop (nvurisrcbin case)
streammux.set_property('nvbuf-memory-type', 4)
else:
# Set nvbuf-memory-type=2 for x86 for file-loop (nvurisrcbin case)
# streammux.set_property('nvbuf-memory-type', 1)
pass
if no_display:
print("Creating Fakesink \n")
sink = Gst.ElementFactory.make("fakesink", "fakesink")
sink.set_property('enable-last-sample', 0)
sink.set_property('sync', 0)
else:
if is_aarch64():
print("Creating nv3dsink \n")
sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
if not sink:
sys.stderr.write(" Unable to create nv3dsink \n")
else:
print("Creating EGLSink \n")
sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
if not sink:
sys.stderr.write(" Unable to create egl sink \n")
if not sink:
sys.stderr.write(" Unable to create sink element \n")
# if is_live:
# print("At least one of the sources is live")
# streammux.set_property('live-source', 1)
if os.environ["USE_NEW_NVSTREAMMUX"]=="yes":
streammux.set_property("config-file-path", "/opt/nvidia/deepstream/deepstream-7.0/sources/src/streammux.txt")
streammux.set_property("batch-size", 30)
print("###################Using the new SteramMUX########################\n\n\n\n")
else:
streammux.set_property("batched-push-timeout", 25000)
streammux.set_property("width", 640) #640 3840
streammux.set_property("height", 480) #480 2160
streammux.set_property("batch-size", 30)
streammux.set_property('batched-push-timeout', MUXER_BATCH_TIMEOUT_USEC)
if requested_pgie == "nvinferserver" and config != None:
pgie.set_property('config-file-path', config)
elif requested_pgie == "nvinferserver-grpc" and config != None:
pgie.set_property('config-file-path', config)
elif requested_pgie == "nvinfer" and config != None:
pgie.set_property('config-file-path', config)
else:
#pgie.set_property('config-file-path', "/opt/nvidia/deepstream/deepstream-7.0/sources/src/dstest1_pgie_inferserver_config_fake_1080.txt") # dstest1_pgie_inferserver_config_torch_resnet50.txt
# pgie.set_property('config-file-path', "/opt/nvidia/deepstream/deepstream-7.0/sources/src/dstest1_pgie_inferserver_config_fake_640_10.txt") # dstest1_pgie_inferserver_config_torch_resnet50.txt
print('########################################OK')
# pgie.set_property('config-file-path', "/opt/nvidia/deepstream/deepstream-7.0/sources/src/dstest1_pgie_inferserver_config_torch_resnet50.txt")
pgie_batch_size=pgie.get_property("batch-size")
if(pgie_batch_size != number_sources):
print("WARNING: Overriding infer-config batch-size",pgie_batch_size," with number of sources ", number_sources," \n")
pgie.set_property("batch-size",number_sources)
tiler_rows=int(math.sqrt(number_sources))
tiler_columns=int(math.ceil((1.0*number_sources)/tiler_rows))
tiler.set_property("rows",tiler_rows)
tiler.set_property("columns",tiler_columns)
tiler.set_property("width", TILED_OUTPUT_WIDTH)
tiler.set_property("height", TILED_OUTPUT_HEIGHT)
sink.set_property("qos",0)
print("Adding elements to Pipeline \n")
# pipeline.add(pgie)
if nvdslogger:
pipeline.add(nvdslogger)
pipeline.add(tiler)
pipeline.add(nvvidconv)
pipeline.add(nvosd)
pipeline.add(sink)
print("Linking elements in the Pipeline \n")
streammux.link(queue1)
# queue1.link(pgie)
queue1.link(queue2)
if nvdslogger:
queue2.link(nvdslogger)
nvdslogger.link(tiler)
else:
queue2.link(tiler)
tiler.link(queue3)
queue3.link(nvvidconv)
nvvidconv.link(queue4)
queue4.link(nvosd)
nvosd.link(queue5)
queue5.link(sink)
# create an event loop and feed gstreamer bus mesages to it
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect ("message", bus_call, loop,pipeline,connection_thread)
pgie_src_pad=pgie.get_static_pad("src")
if not pgie_src_pad:
sys.stderr.write(" Unable to get src pad \n")
else:
if not disable_probe:
pgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, pgie_src_pad_buffer_probe, 0,connection_thread)
# perf callback function to print fps every 5 sec
# GLib.timeout_add(5000, perf_data.perf_print_callback)
# List the sources
print("Now playing...")
for i, source in enumerate(args):
print(i, ": ", source)
print("Starting pipeline \n")
# start play back and listed to events
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except:
pass
# cleanup
connection_thread.stop()
print("Exiting app\n")
pipeline.set_state(Gst.State.NULL)
def parse_args():
parser = argparse.ArgumentParser(prog="deepstream_test_3",
description="deepstream-test3 multi stream, multi model inference reference app")
# parser.add_argument(
# "-i",
# "--input",
# help="Path to input streams",
# nargs="+",
# metavar="URIs",
# default=["a"],
# required=True,
# )
# parser.add_argument(
# "-c",
# "--configfile",
# metavar="config_location.txt",
# default=None,
# help="Choose the config-file to be used with specified pgie",
# )
# parser.add_argument(
# "-g",
# "--pgie",
# default=None,
# help="Choose Primary GPU Inference Engine",
# choices=["nvinfer", "nvinferserver", "nvinferserver-grpc"],
# )
# parser.add_argument(
# "--no-display",
# action="store_true",
# default=False,
# dest='no_display',
# help="Disable display of video output",
# )
# parser.add_argument(
# "--file-loop",
# action="store_true",
# default=False,
# dest='file_loop',
# help="Loop the input file sources after EOS",
# )
# parser.add_argument(
# "--disable-probe",
# action="store_true",
# default=False,
# dest='disable_probe',
# help="Disable the probe function and use nvdslogger for FPS",
# )
# parser.add_argument(
# "-s",
# "--silent",
# action="store_true",
# default=False,
# dest='silent',
# help="Disable verbose output",
# )
# # Check input arguments
# # if len(sys.argv) == 1:
# # parser.print_help(sys.stderr)
# # sys.exit(1)
# args = parser.parse_args()
# # stream_paths = args.input
# pgie = args.pgie
# config = args.configfile
# disable_probe = args.disable_probe
global no_display
global silent
global file_loop
pgie = None
config = None
disable_probe = False
# no_display = args.no_display
# silent = args.silent
# file_loop = args.file_loop
# if config and not pgie or pgie and not config:
# sys.stderr.write ("\nEither pgie or configfile is missing. Please specify both! Exiting...\n\n\n\n")
# parser.print_help()
# sys.exit(1)
# if config:
# config_path = Path(config)
# if not config_path.is_file():
# sys.stderr.write ("Specified config-file: %s doesn't exist. Exiting...\n\n" % config)
# sys.exit(1)
# print(vars(args))
return pgie, config, disable_probe
def write_json(D,filename):
with open(filename, 'w') as f:
json.dump(D, f, indent=4)
def read_json(filename):
with open(filename, 'r') as f:
D = json.load(f)
return D
def save_dict(di_, filename_):
with open(filename_, 'wb') as f:
pickle.dump(di_, f)
def load_dict(filename_):
with open(filename_, 'rb') as f:
ret_di = pickle.load(f)
return ret_di
def extract_addresses(cam_list):
addresses = []
if isinstance(cam_list[0], dict):
for key, cam in cam_list[0].items():
if isinstance(cam, dict) and 'Address' in cam:
addresses.append(cam['Address'])
return addresses
if __name__ == '__main__':
root_directory = '/opt/nvidia/deepstream/deepstream-7.0/sources/src/SP2023'
FILE_LOGGER.info("root dir is read as {}".format(root_directory))
Meta_File = os.path.join(root_directory, "Config/Meta.json")
Config_File = os.path.join(root_directory, "Config/Config.pkl")
meta_params = read_json(Meta_File)
Config=load_dict(Config_File)
list_of_cams = []
for key in Config:
if key != 'Set':
list_of_cams.append(Config[key])
stream_paths = extract_addresses(list_of_cams)
pgie, config, disable_probe = parse_args()
stream_paths = ['rtsp://192.168.1.4:8555/video1',
'file:///opt/nvidia/deepstream/deepstream-7.0/sources/src/2.mp4',
'rtsp://192.168.1.4:8555/video1']
sys.exit(main(stream_paths, pgie, config, disable_probe))
streammux.txt
[property]
algorithm-type=1
batch-size=30
max-fps-control=1
overall-max-fps-n=30
overall-max-fps-d=1
overall-min-fps-n=30
overall-min-fps-d=1
max-same-source-frames=1
This is one of the mp4 file as input
You can place any rtsp camera url instead of given ones.
docker-compose yml
services:
aivs_service_triton_70:
container_name: aivs_deepstream_70_container
build:
context: .
args:
- NODE_ENV=local
dockerfile: Dockerfile
network_mode: "host"
runtime: nvidia
shm_size: 6g
volumes:
- "./src:/opt/nvidia/deepstream/deepstream-7.0/sources/src"
- "/tmp/.X11-unix:/tmp/.X11-unix:rw"
- "//var/run/docker.sock://var/run/docker.sock"
- "/etc/localtime:/etc/localtime:ro"
environment:
- USE_NEW_NVSTREAMMUX=yes
- DISPLAY=$DISPLAY
- QT_X11_NO_MITSHM=1
- ENABLE_ALERTS=TRUE
- PRINT_OUT=FALSE
- ROOT_DIR=/workspace/SP2023/
# - GST_DEBUG=1
# - GST_DEBUG_LEVEL=3
restart: always
# command: ["deepstream_python_apps/apps/deepstream-demux-multi-in-multi-out/deepstream_demux_multi_in_multi_out.py","-i", "file:///opt/nvidia/deepstream/deepstream-5.1/workspace/1.mp4"]
command: ["deepstream_test_3.py"]
entrypoint: ["python3", "-u"]
Dockerfile
FROM nvcr.io/nvidia/deepstream:7.0-triton-multiarch
RUN apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64/3bf863cc.pub
RUN apt-get update
RUN apt-get install -y python3-pip \
cmake
RUN python3 -m pip install scikit-build
RUN python3 -m pip install numpy
RUN apt-get install -y gstreamer-1.0 \
gir1.2-gst-rtsp-server-1.0 \
python3-gi \
iputils-ping \
python3-gst-1.0 \
libgstreamer1.0-dev \
libgstreamer-plugins-base1.0-dev \
cmake \
pkg-config
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
gir1.2-gst-rtsp-server-1.0
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
gstreamer1.0-rtsp
RUN apt install libgl1-mesa-glx -y
RUN apt-get install 'ffmpeg'\
'libsm6'\
'libxext6' -y
RUN apt-get install -y libgirepository1.0-dev \
gobject-introspection gir1.2-gst-rtsp-server-1.0 \
python3-numpy
RUN python3 -m pip install pyds_ext
WORKDIR /opt/nvidia/deepstream/deepstream-7.0/sources/src
issue seen from my end