Please provide complete information as applicable to your setup.
• Hardware Platform (Jetson / GPU)
• Hardware Platform - GPU
• DeepStream Version - 7.0
• TensorRT Version - 10.6.0
• NVIDIA GPU Driver Version - 560.35.03
• Issue Type - questions
• Question -
Hi currently I’m working video analytics project and my requirement is save the imge using cv2 and working fine sample app python app , but when i integarte with my code at the time i got this error
ERROR: nvdsinfer_context_impl.cpp:342 Failed to make stream wait on event, cuda err_no:999, err_str:cudaErrorUnknown
ERROR: nvdsinfer_context_impl.cpp:1750 Preprocessor transform input data failed., nvinfer error:NVDSINFER_CUDA_ERROR
0:00:47.066622407 1119 0x562bcdf83450 WARN nvinfer gstnvinfer.cpp:1420:gst_nvinfer_input_queue_loop: error: Failed to queue input batch for inferencing
[2025-03-31 07:54:05] ERROR: gst-stream-error-quark: Failed to queue input batch for inferencing (1)
Error: gst-stream-error-quark: Failed to queue input batch for inferencing (1): gstnvinfer.cpp(1420): gst_nvinfer_input_queue_loop (): /GstPipeline:pipeline0/GstNvInfer:primary-inference
CUDA Runtime error cudaMemcpyAsync(device_, host_, bytes(), cudaMemcpyHostToDevice, stream) # unknown error, code = cudaErrorUnknown [ 999 ] in file memory.hpp:59
CUDA Runtime error cudaMemcpyAsync(device_, host_, bytes(), cudaMemcpyHostToDevice, stream) # unknown error, code = cudaErrorUnknown [ 999 ] in file memory.hpp:59
[cuOSD Error] at cuda/cuosd_kernel.cu:1072 : Launch kernel (render_elements_kernel) failed, code = 999CUDA Runtime error cudaPeekAtLastError() # unknown error, code = cudaErrorUnknown [ 999 ] in file cuosd.cpp:968
#!/usr/bin/env python3
################################################################################
import configparser
import sys
import gi
import time
import uuid
import logging
import os
import pytz
from urllib.parse import urlparse
from datetime import datetime
from gi.repository import Gst, GLib
from utils_prod import ntp_to_sydney_time,extract_rtsp_details,convert_pts_to_sydtimestamp,create_dynamic_path,get_current_epoch_in_syd,convert_syd_epoch_to_datetime_str,get_current_datetime_str, send_msg
from threading import Lock
gi.require_version('Gst', '1.0')
import pyds
import ctypes
import numpy as np
import cv2
import json
import sys
import platform
from threading import Lock
from cuda import cudart
from cuda import cuda
start_time=time.time()
pipeline = None
fps_mutex = Lock()
syd_tz = pytz.timezone('Australia/Sydney')
ntp_epoch = datetime(1970, 1, 1, tzinfo=pytz.UTC)
try:
yaml_con = json.loads(os.environ["main_config"])
streams = yaml_con["streams"]
log_path = yaml_con["log_path"]
no_of_sec_to_send_data = int(yaml_con["no_of_sec_send_msg"])
img_dir_name = yaml_con["img_dir_name"]
osd_process_mode = int(yaml_con["osd_process_mode"])
osd_display_text = int(yaml_con["osd_display_text"])
sync_inputs = int(yaml_con["streammux_sync_inputs"])
attach_sys_ts = int(yaml_con["streammux_attach_sys_ts"])
max_latency = int(yaml_con["streammux_max_latency"])
frame_duration = int(yaml_con["streammux_frame_duration"])
is_live = int(yaml_con["is_live"])
frame_width = int(yaml_con["frame_width"])
frame_height = int(yaml_con["frame_height"])
drop_eos = int(yaml_con["drop_eos"])
streammux_muxer_batch_timeout_usec = int(yaml_con["streammux_muxer_batch_timeout_usec"])
analytics_config_path = yaml_con["analytics_config_path"]
infer_config_path = yaml_con["infer_config_path"]
tracker_config_path = yaml_con["tracker_config_path"]
redis_container_name = yaml_con["redis_container_name"]
redis_port = int(yaml_con["redis_port"])
redis_key = yaml_con["redis_key"]
redis_retry = 5
PGIE_CLASS_ID_FACE = 2
max_size_buffers = int(yaml_con["queue_max_size_buffers"])
im_quality = int(yaml_con["jpeg_encoder_quality"])
reconnect_interval = int(yaml_con["reconnect_interval"])
is_add_face_blur = int(yaml_con.get("is_add_face_blur",1))
face_blur_opacity = float(yaml_con.get("face_blur_opacity",0.8))
application_quit_ther = float(yaml_con.get("application_quit_ther",0.5))
reconnect_att = float(yaml_con.get("reconnect_att",5))
blob_base_img_path = yaml_con["blob_base_img_path"]
local_uld_dir_name = os.path.join(img_dir_name,yaml_con["local_uld_dir_name"])
blob_base_img_path_uld = yaml_con["blob_base_img_path_uld"]
uld_roi_name = yaml_con["uld_roi_name"]
uld_cls_id = int(yaml_con["uld_cls_id"])
class_dict = {0:"ULD"}
except Exception as e:
print(f"Error reading config file: {e}")
sys.exit(1)
if not all([ os.path.exists(p) for p in [analytics_config_path,infer_config_path,tracker_config_path]]):
raise FileNotFoundError(f"One or more of the required analytics, inference, or tracker config files not found: {analytics_config_path}, {infer_config_path}, {tracker_config_path}")
os.makedirs(img_dir_name,exist_ok=True)
os.makedirs(local_uld_dir_name,exist_ok=True)
class SydneyFormatter(logging.Formatter):
"""Custom logging formatter to use Sydney timezone."""
def formatTime(self, record, datefmt=None):
sydney_time = datetime.fromtimestamp(record.created, syd_tz)
return sydney_time.strftime("%Y-%m-%d %H:%M:%S")
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(levelname)s: %(message)s",
handlers=[logging.FileHandler(log_path), logging.StreamHandler()]
)
for handler in logging.getLogger().handlers:
handler.setFormatter(SydneyFormatter("[%(asctime)s] %(levelname)s: %(message)s"))
logger = logging.getLogger(__name__)
number_sources = len(streams)
src_ids = {}
snap_dict = {}
first_frame_processed = False
curr_epoch = get_current_epoch_in_syd()
stream_start_time_dict = {i:None for i in range(number_sources)}
start_time_syd_dict_json = {i:curr_epoch for i in range(number_sources)}
start_time_ntp_dict = {i:0 for i in range(number_sources)}
start_time_one_min_dict = {i:0 for i in range(number_sources)}
lag_dict = {i:{ "data":{"cumulative_line_cross_count":0} ,"ds":0,"is_first":True,"data_num":0} for i in range(number_sources)}
guard_platform_info = Lock()
streammux = None
is_first = True
def get_current_timestamp():
current_time = datetime.now()
timestamp_str = current_time.strftime('%Y-%m-%d %H:%M:%S')
return timestamp_str
def on_pad_added(src, pad, sink_element):
# print(f"New pad {pad.get_name()} added from {src.get_name()}")
sink_pad = sink_element.get_static_pad("sink")
if not sink_pad:
msg = (f"Error: Could not get sink pad for {sink_element.get_name()}")
logger.error(msg)
return
if sink_pad.is_linked():
msg = (f"Warning: Sink pad of {sink_element.get_name()} is already linked")
logger.warning(msg)
return
if pad.query_caps(None).is_any():
msg = (f"Warning: Pad {pad.get_name()} does not have valid caps yet!")
logger.warning(msg)
return
link_status = pad.link(sink_pad)
if link_status != Gst.PadLinkReturn.OK:
msg = (f"Pad link failed: {link_status}")
logger.error(msg)
class GETFPS:
def __init__(self,stream_id):
global start_time
self.start_time=start_time
self.is_first=True
self.frame_count=0
self.stream_id=stream_id
def update_fps(self):
end_time = time.time()
if self.is_first:
self.start_time = end_time
self.is_first = False
else:
global fps_mutex
with fps_mutex:
self.frame_count = self.frame_count + 1
def get_fps(self):
end_time = time.time()
with fps_mutex:
stream_fps = float(self.frame_count/(end_time - self.start_time))
self.frame_count = 0
self.start_time = end_time
return round(stream_fps, 2)
def print_data(self):
print('frame_count=',self.frame_count)
print('start_time=',self.start_time)
def get_linked_element(pad):
if pad and pad.is_linked():
peer_pad = pad.get_peer()
if peer_pad:
peer_element = peer_pad.get_parent_element()
return peer_element.get_name() if peer_element else None
return None
class PERF_DATA:
def __init__(self, active_streams,main_loop):
self.perf_dict = {}
self.all_stream_fps = {}
self.fps_mutex = Lock() # Lock for thread safety
self.main_loop = main_loop
for i in active_streams:
self.all_stream_fps[f"stream{i}"] = GETFPS(i)
def update_fps(self, stream_index):
""" Update FPS for the given stream index """
with self.fps_mutex:
if stream_index in self.all_stream_fps:
self.all_stream_fps[stream_index].update_fps()
else:
print(f"Warning: Stream {stream_index} not registered. Ignoring update.")
def perf_print_callback(self):
with self.fps_mutex:
self.perf_dict = {stream_index: stream.get_fps() for stream_index, stream in self.all_stream_fps.items()}
print("\n**PERF Data : ", self.perf_dict, "\n")
deactive_streams = 0
for k, v in self.perf_dict.items():
src_id = src_ids[k]
last_id = src_id['id']
stream_id = src_id['streamid']
rtsp_url = src_id['src']
if not v:
if src_ids[k]["no_of_att"] > reconnect_att:
logger.info(f"reconnect att exist in {stream_id=} {src_ids[k]['no_of_att']}")
deactive_streams+=1
continue
valve_element_name = f"valve_{stream_id}"
valve = pipeline.get_by_name(valve_element_name)
# sink_valve = valve.get_static_pad("sink")
# new_id = last_id
new_id = str(uuid.uuid1())
del_add_rtspsrc(last_id,new_id,stream_id,rtsp_url)
src_ids[k]['id'] = new_id
valve.set_property("drop", False)
time.sleep(2)
src_ids[k]["no_of_att"]+=1
if (deactive_streams/number_sources) > application_quit_ther:
logger.info(f"XXXXXXXXXXXXXXXXXXXXXXXXXX no of deactivate streams {deactive_streams} and no of streams {number_sources} ther {(deactive_streams/number_sources)} > {application_quit_ther} so quit the application XXXXXXXXXXXXXXXXXXXXXXXXXX")
self.main_loop.quit()
logger.info(f"pipeline alive data : {self.perf_dict}")
return True
def wait_for_null_state(element, timeout=3):
# print(f"Waiting for {element.get_name()} to reach NULL state...")
element.set_state(Gst.State.NULL)
success, state, _ = element.get_state(timeout * Gst.SECOND)
if success == Gst.StateChangeReturn.SUCCESS and state == Gst.State.NULL:
# print(f"{element.get_name()} successfully reached NULL state.")
return True
else:
msg = (f"Warning: {element.get_name()} did not reach NULL state within {timeout} seconds.")
logger.warning(msg)
return False
def flush_ele(stream_id):
streammux = pipeline.get_by_name("Stream-muxer")
if not streammux :
msg = (f"[ERROR] Streammux or Demux not found in pipeline")
logger.error(msg)
return
streammux_sink_pad = streammux.get_static_pad(f"sink_{stream_id}")
if streammux_sink_pad:
msg = (f"Flushing streammux pad for stream {stream_id}...")
logger.info(msg)
streammux_sink_pad.send_event(Gst.Event.new_flush_start())
time.sleep(0.1)
streammux_sink_pad.send_event(Gst.Event.new_flush_stop(True))
return
def del_add_rtspsrc(old_id, new_id, stream_id, rtsp_url):
msg = f"Trying to create a new RTSP source for stream {stream_id=}..."
logger.info(msg)
bin_name = f"source-bin-{old_id}"
old_uridecodebin = pipeline.get_by_name(bin_name)
if not old_uridecodebin:
msg = f"[ERROR] No existing uridecodebin found for stream {stream_id}."
logger.error(msg)
return
valve = pipeline.get_by_name(f"valve_{stream_id}")
valve.set_property("drop", True)
valve_sink = valve.get_static_pad("sink")
src_pad = old_uridecodebin.get_static_pad("src")
if src_pad and src_pad.is_linked():
src_pad.unlink(valve_sink)
old_uridecodebin.send_event(Gst.Event.new_eos())
wait_for_null_state(old_uridecodebin)
flush_ele(stream_id)
pipeline.remove(old_uridecodebin)
old_uridecodebin = None
new_uridecodebin = create_source_bin(stream_id,new_id, rtsp_url)
if not new_uridecodebin:
msg = f"[ERROR] Failed to create new uridecodebin for stream {stream_id}."
logger.error(msg)
return
pipeline.add(new_uridecodebin)
new_uridecodebin_src_pad=new_uridecodebin.get_static_pad("src")
if new_uridecodebin_src_pad.link(valve_sink) != Gst.PadLinkReturn.OK:
msg = f"Error linking {new_uridecodebin.get_name()} to {valve.get_name()}"
logger.error(msg)
return
new_uridecodebin.set_state(Gst.State.READY)
time.sleep(0.1)
new_uridecodebin.sync_state_with_parent()
new_uridecodebin.set_state(Gst.State.PLAYING)
msg = f"Successfully replaced RTSP source for stream {stream_id}"
logger.info(msg)
def cb_newpad(decodebin, decoder_src_pad, data):
print("In cb_newpad\n")
caps = decoder_src_pad.get_current_caps()
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()
source_bin = data
features = caps.get_features(0)
# Need to check if the pad created by the decodebin is for video and not
# audio.
if (gstname.find("video") != -1):
# Link the decodebin pad only if decodebin has picked nvidia
# decoder plugin nvdec_*. We do this by checking if the pad caps contain
# NVMM memory features.
if features.contains("memory:NVMM"):
# Get the source bin ghost pad
bin_ghost_pad = source_bin.get_static_pad("src")
if not bin_ghost_pad.set_target(decoder_src_pad):
sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
else:
sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")
def decodebin_child_added(child_proxy, Object, name, user_data):
print("Decodebin child added:", name, "\n")
if name.find("decodebin") != -1:
Object.connect("child-added", decodebin_child_added, user_data)
if not platform_info.is_integrated_gpu() and name.find("nvv4l2decoder") != -1:
# Use CUDA unified memory in the pipeline so frames can be easily accessed on CPU in Python.
# 0: NVBUF_MEM_CUDA_DEVICE, 1: NVBUF_MEM_CUDA_PINNED, 2: NVBUF_MEM_CUDA_UNIFIED
# Dont use direct macro here like NVBUF_MEM_CUDA_UNIFIED since nvv4l2decoder uses a
# different enum internally
Object.set_property("cudadec-memtype", 2)
if "source" in name:
source_element = child_proxy.get_by_name("source")
if source_element.find_property('drop-on-latency') != None:
Object.set_property("drop-on-latency", True)
def on_pad_added(decodebin, pad, data):
print(f"Pad added: {pad.get_name()} (RTSP connection successful)")
data["is_ready"] = True
def test_rtsp_connection(uri, stream_id ,timeout=10):
"""Checks if RTSP stream is alive before adding to main pipeline"""
# Create a temporary pipeline
pipeline = Gst.Pipeline.new("test-pipeline")
uridecodebin = Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
if not uridecodebin:
msg = ("Failed to create uridecodebin")
logger.error(msg)
return False
uridecodebin.set_property("uri", uri)
d = {"is_ready": False}
uridecodebin.connect("pad-added", on_pad_added,d )
fakesink = Gst.ElementFactory.make("fakesink","fakesink")
pipeline.add(uridecodebin)
pipeline.add(fakesink)
uridecodebin.link(fakesink)
pipeline.set_state(Gst.State.PLAYING)
start_time = time.time()
while time.time() - start_time < timeout:
# Wait to see if pad-added is triggered
success, state, _ = pipeline.get_state(Gst.CLOCK_TIME_NONE)
if d["is_ready"]:
msg = f"RTSP stream is active {stream_id=}"
logger.info(msg)
pipeline.set_state(Gst.State.NULL)
return True,uridecodebin
time.sleep(0.5)
# print(f"wating for frame .... {time.time() - start_time}")
msg = (f"RTSP connection failed (No `pad-added` event) {stream_id=}")
logger.warning(msg)
pipeline.set_state(Gst.State.NULL)
return False,None
def create_source_bin(stream_id,index,uri):
uri_decode_bin=Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
if is_first:
is_suc,uri_decode_bin = test_rtsp_connection(uri,stream_id)
if not is_suc:
msg = (f" Error creating source {stream_id=} ")
logger.error(msg)
return False
bin_name=f"source-bin-{index}"
nbin=Gst.Bin.new(bin_name)
if not nbin:
msg = f"Unable to create source bin {bin_name}"
logger.error(msg)
sys.stderr.write(msg)
if not uri_decode_bin:
msg = "Unable to create uri decode bin"
logger.error(msg)
sys.stderr.write(msg)
uri_decode_bin.set_property("uri",uri)
uri_decode_bin.connect("pad-added",cb_newpad,nbin)
uri_decode_bin.connect("child-added",decodebin_child_added,nbin)
Gst.Bin.add(nbin,uri_decode_bin)
bin_pad=nbin.add_pad(Gst.GhostPad.new_no_target("src",Gst.PadDirection.SRC))
if not bin_pad:
msg = "Failed to add ghost pad in source bin"
sys.stderr.write(msg)
logger.error(msg)
return None
return nbin
def buffer_to_numpy(gst_buffer, frame_meta):
n_frame = pyds.get_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
frame_copy = np.array(n_frame, copy=True, order='C')
frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_RGBA2BGRA)
if platform_info.is_integrated_gpu():
pyds.unmap_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
return frame_copy
os.makedirs("saved_images",exist_ok=True)
def nvanlytics_src_pad_buffer_probe(pad, info, user_data):
local_uld_dir_name = user_data["local_uld_dir_name"]
cam_ids_ip_port = user_data["cam_ids"]
blob_base_img_path = user_data["blob_base_img_path_uld"]
global first_frame_processed
global start_time_syd_dict_json
global stream_start_time_dict
global start_time_ntp_dict
global start_time_one_min_dict
global snap_dict
try:
if not first_frame_processed:
gst_buffer = info.get_buffer()
if gst_buffer:
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
stream_id = frame_meta.pad_index
# ntp_sec = frame_meta.ntp_timestamp / 1e9
pts = frame_meta.buf_pts
curr_time = (pts / 1e9)
start_time_ntp_dict[stream_id] = pts
except Exception as e:
msg = (f"Error processing frame demux prob: {e}")
logging.error(msg)
return Gst.PadProbeReturn.OK
try:
l_frame = l_frame.next
except StopIteration:
break
stream_start_time_ = get_current_epoch_in_syd()
start_time_syd_dict_json = {i: stream_start_time_ for i in range(number_sources)}
stream_start_time_dict = {i: stream_start_time_ for i in range(number_sources)}
start_time_one_min_dict = {i: stream_start_time_ for i in range(number_sources)}
first_frame_processed = True
# no_of_sec_send_msg = user_data["no_of_sec_send_msg"]
except Exception as e:
msg = (f"Error processing buffer demux prob : {e} ")
logger.error(msg)
return Gst.PadProbeReturn.OK
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return Gst.PadProbeReturn.OK
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
if frame_meta.frame_num%30==0:
frame = buffer_to_numpy(gst_buffer, frame_meta)
img_path = os.path.join("saved_images",f"{uuid.uuid1()}.jpg")
cv2.imwrite(img_path,frame)
print(f"frame saved sucessfully {img_path=}")
stream_id = frame_meta.pad_index
pts = frame_meta.buf_pts
curr_time = stream_start_time_dict[stream_id] + (pts / 1e9)
syd_time_pts_sql = convert_syd_epoch_to_datetime_str(curr_time, "datetime_sql")
syd_time_pts_uld = convert_syd_epoch_to_datetime_str(curr_time, "uld_datetime")
except StopIteration:
break
l_obj=frame_meta.obj_meta_list
while l_obj:
try:
obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)
class_id = int(obj_meta.class_id)
confidence = obj_meta.confidence
tracker_id = obj_meta.object_id
except StopIteration:
break
try:
l_obj = l_obj.next
except StopIteration:
break
l_user = frame_meta.frame_user_meta_list
while l_user:
try:
user_meta = pyds.NvDsUserMeta.cast(l_user.data)
if user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type("NVIDIA.DSANALYTICSFRAME.NVDSANALYTICS_OBJ_META"):
analytics_meta = pyds.NvDsAnalyticsObjInfo.cast(user_meta.user_meta_data)
if analytics_meta.roiStatus == uld_roi_name and class_id == uld_cls_id:
if tracker_id not in snap_dict:
cam_id_ip_port = cam_ids_ip_port[stream_id]
image_path = f"{cam_id_ip_port[:5]}_{syd_time_pts_uld}_{tracker_id}.jpg"
local_uld_path = os.path.join(local_uld_dir_name,image_path)
frame = buffer_to_numpy(gst_buffer, frame_meta)
if frame is not None:
rect_params = obj_meta.rect_params
left = int(rect_params.left)
top = int(rect_params.top)
width = int(rect_params.width)
height = int(rect_params.height)
bbox = (left, top, left + width, top + height)
x1,y1,x2,y2 = [int(i) for i in bbox]
frame = frame[y1:y2,x1:x2]
cv2.imwrite(local_uld_path, frame)
print("######################################### frame saved ######################################### ")
else:
msg = (f"Error getting frame buffer for tracker_id {tracker_id}")
logger.error(msg)
blob_img_path = os.path.join(blob_base_img_path, cam_id_ip_port, image_path)
snap_dict[tracker_id] = {"earliest_seen_time":syd_time_pts_sql,"last_seen_time":syd_time_pts_sql,"zone":uld_roi_name,"blob_img_path":blob_img_path,"class_id":class_id}
else:
snap_dict[tracker_id]["last_seen_time"] = syd_time_pts_sql
except StopIteration:
break
try:
l_user = l_user.next
except StopIteration:
break
try:
l_frame=l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
def demux_src_pad_buffer_probe(pad, info, user_data):
gst_buffer = info.get_buffer()
if not gst_buffer:
msg = ("Unable to get GstBuffer demux prob")
logger.error(msg)
return Gst.PadProbeReturn.DROP
try:
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
stream_id = frame_meta.pad_index
pts = frame_meta.buf_pts
curr_time = stream_start_time_dict[stream_id] + (pts / 1e9)
# ntp_sec = frame_meta.ntp_timestamp / 1e9
# curr_time = pts
if curr_time - start_time_syd_dict_json[stream_id] < 10:
# print(f"Dropping stream: Source {frame_meta.source_id}, Frame {frame_meta.frame_num}")
stream_index = "stream{0}".format(stream_id)
perf_data.update_fps(stream_index)
return Gst.PadProbeReturn.DROP
start_time_syd_dict_json[stream_id] = curr_time
syd_time_pts_ = convert_pts_to_sydtimestamp(pts, stream_start_time_dict[stream_id])
display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
display_meta.num_labels = 1
text_params = display_meta.text_params[0]
# text_params.display_text = f"buf ts : {syd_time_pts_} ntp ts : {ntp_to_sydney_time(ntp_sec)}"
text_params.display_text = f" PTS TS : {syd_time_pts_}"
text_params.x_offset = 50
text_params.y_offset = 50
text_params.font_params.font_name = "Serif"
text_params.font_params.font_size = 12
text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
text_params.set_bg_clr = 1
text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
l_user = frame_meta.frame_user_meta_list
while l_user:
try:
user_meta = pyds.NvDsUserMeta.cast(l_user.data)
if user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type(
"NVIDIA.DSANALYTICSFRAME.USER_META"
):
user_meta_data = pyds.NvDsAnalyticsFrameMeta.cast(user_meta.user_meta_data)
line_cum_count = user_meta_data.objLCCumCnt
obj_in_roi = user_meta_data.objInROIcnt
display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
d = {**line_cum_count,**obj_in_roi}
no_of_labels = len(d)
display_meta.num_labels = no_of_labels
start_x,start_y = 50,80
for i,(label_id,count) in enumerate(d.items()):
text_params = display_meta.text_params[i]
text_params.display_text = f"{label_id} : {count}"
text_params.x_offset = start_x
text_params.y_offset = start_y
text_params.font_params.font_name = "Serif"
text_params.font_params.font_size = 12
text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
text_params.set_bg_clr = 1
text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
start_y+=30
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
except Exception as e:
msg = f"Error processing user metadata demux prob: {e}"
logger.error(msg)
try:
l_user = l_user.next
except StopIteration:
break
except Exception as e:
msg = f"Error processing frame demux prob: {e}"
logger.error(msg)
try:
l_frame = l_frame.next
except StopIteration:
break
except Exception as e:
msg = f"Error processing buffer demux prob: {e}"
logger.error(msg)
return Gst.PadProbeReturn.OK
def bus_call(bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
msg = "======================= End-of-stream ======================="
logger.error(msg)
# loop.quit()
elif t==Gst.MessageType.WARNING:
err, debug = message.parse_warning()
logger.debug(f"{err}")
print("Warning: %s: %s" % (err, debug))
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
logger.error(f"{err}")
print("Error: %s: %s" % (err, debug))
return True
def valve_prob(pad, info, user_data):
stream_id = user_data["stream_id"]
gst_buffer = info.get_buffer()
if not gst_buffer:
msg = "Unable to get GstBuffer valve prob"
logger.error(msg)
return Gst.PadProbeReturn.OK
try:
stream_index = "stream{0}".format(stream_id)
perf_data.update_fps(stream_index)
except Exception as e:
msg = f"Error accessing batch metadata: {e}"
logger.error(msg)
return Gst.PadProbeReturn.OK
def make_element(element_name, i):
element = Gst.ElementFactory.make(element_name, element_name)
if not element:
sys.stderr.write(" Unable to create {0}".format(element_name))
element.set_property("name", "{0}-{1}".format(element_name, str(i)))
return element
def format_location_callback(splitmuxsink, fragment_id, video_dir_path, stream_id):
# Get the current date and time
current_datetime = convert_syd_epoch_to_datetime_str(get_current_epoch_in_syd(),"date_time")
# Construct the filename with the current datetime
return os.path.join(video_dir_path, f"stream_{stream_id}_rtsp_out_{current_datetime}_uri.mp4")
def frame_filter_pad_probe(pad, info, user_data):
global lag_dict
base_img_path = user_data["base_img_path"]
cam_id_ip_port = user_data["cam_id"]
blob_base_img_path = user_data["blob_base_img_path"]
gst_buffer = info.get_buffer()
if not gst_buffer:
msg = "Unable to get GstBuffer framefilter prob"
logger.error(msg)
return Gst.PadProbeReturn.OK
try:
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
stream_id = frame_meta.pad_index
pts = frame_meta.buf_pts
curr_time = stream_start_time_dict.get(stream_id, 0) + (pts / 1e9)
syd_time_pts_hhmmss = convert_syd_epoch_to_datetime_str(curr_time, "hhmmss")
current_yymmdd = get_current_datetime_str("yymmdd")
current_hhmmss = get_current_datetime_str("hhmmss")
frame_number = frame_meta.frame_num
ntp_sec = frame_meta.ntp_timestamp / 1e9
syd_time_ntp_hhmmss = ntp_to_sydney_time(ntp_sec,"hhmmss")
syd_time_ntp_sql = ntp_to_sydney_time(ntp_sec)
image_folder_path,blob_path = create_dynamic_path(curr_time, base_img_path, cam_id_ip_port)
multifilesink = pad.get_parent_element()
image_name = f"CAM-{cam_id_ip_port[:7]}_DATE-{current_yymmdd}_PTS-{syd_time_pts_hhmmss}_NTP-{syd_time_ntp_hhmmss}_SYS-{current_hhmmss}.jpg"
image_save_path = os.path.join(image_folder_path, image_name)
blob_img_path = os.path.join(blob_base_img_path,blob_path,image_name)
# print(f"Saving image: {image_save_path}, stream_id: {stream_id}")
multifilesink.set_property("location", image_save_path)
l_user = frame_meta.frame_user_meta_list
while l_user:
try:
user_meta = pyds.NvDsUserMeta.cast(l_user.data)
if user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type(
"NVIDIA.DSANALYTICSFRAME.USER_META"
):
user_meta_data = pyds.NvDsAnalyticsFrameMeta.cast(user_meta.user_meta_data)
objInROIcnt = user_meta_data.objInROIcnt
objLCCumCnt = user_meta_data.objLCCumCnt
objLCCurrCnt = user_meta_data.objLCCurrCnt
num_rects = frame_meta.num_obj_meta
if lag_dict[stream_id]["is_first"]:
lag_dict[stream_id]["data"] = {"cumulative_line_cross_count":objLCCumCnt}
lag_dict[stream_id]["is_first"] = False
syd_time_pts_ = convert_syd_epoch_to_datetime_str(curr_time, "datetime_sql")
data = {
"stream_id": stream_id,
"camera_name": cam_id_ip_port,
"unique_number_uuid": 10,
"batch_id": frame_meta.batch_id,
"frame_num": frame_number,
"buf_pts": pts,
"ntp_timestamp": frame_meta.ntp_timestamp,
"source_id": frame_meta.source_id,
"object_count": {class_dict.get(int(cls_id),cls_id):count for cls_id,count in user_meta_data.objCnt.items()},
"Number_of_objects": num_rects,
"object_in_ROI_count": objInROIcnt ,
"cumulative_line_cross_count": objLCCumCnt,
"current_line_cross_count": objLCCurrCnt,
"overcrowding_status": user_meta_data.ocStatus,
"unique_id": user_meta_data.unique_id,
"PTS_TIME": syd_time_pts_,
"NTP_TS": syd_time_ntp_sql,
"blob_image_path":blob_img_path,
"reconnect_id": src_ids[f'stream{stream_id}']["id"],
"data_num":lag_dict[stream_id]["data_num"],
"uld_data":snap_dict
}
diff_lag = {k: {k_:(v_-v[k_]) for k_,v_ in data[k].items()} for k, v in lag_dict[stream_id]["data"].items()}["cumulative_line_cross_count"]
data["lag_diff_cumulative_line_cross_count"] = diff_lag
lag_dict[stream_id]["data"]["cumulative_line_cross_count"] = data["cumulative_line_cross_count"]
lag_dict[stream_id]["ds"] = curr_time
lag_dict[stream_id]["data_num"]+=1
# send_msg(data,redis_key)
print(f"Data : {data}")
except Exception as e:
msg = f"Error processing user metadata: {e}"
logger.error(msg)
# return Gst.PadProbeReturn.OK
try:
l_user = l_user.next
except StopIteration:
break
except Exception as e:
msg = f"Error processing frame metadata: {e}"
logger.error(msg)
return Gst.PadProbeReturn.OK
try:
l_frame = l_frame.next
except StopIteration:
break
except Exception as e:
msg = f"Error accessing batch metadata: {e}"
logger.error(msg)
return Gst.PadProbeReturn.OK
return Gst.PadProbeReturn.OK
def dummy_prob(pad, info, user_data):
print(f"inside the {user_data} prob")
return Gst.PadProbeReturn.OK
def cb_newpad(decodebin, decoder_src_pad, data):
print("In cb_newpad\n")
caps = decoder_src_pad.get_current_caps()
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()
source_bin = data
features = caps.get_features(0)
# Need to check if the pad created by the decodebin is for video and not
# audio.
if (gstname.find("video") != -1):
# Link the decodebin pad only if decodebin has picked nvidia
# decoder plugin nvdec_*. We do this by checking if the pad caps contain
# NVMM memory features.
if features.contains("memory:NVMM"):
# Get the source bin ghost pad
bin_ghost_pad = source_bin.get_static_pad("src")
if not bin_ghost_pad.set_target(decoder_src_pad):
sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
else:
sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")
class PlatformInfo:
def __init__(self):
self.is_wsl_system = False
self.wsl_verified = False
self.is_integrated_gpu_system = False
self.is_integrated_gpu_verified = False
self.is_aarch64_platform = False
self.is_aarch64_verified = False
def is_wsl(self):
with guard_platform_info:
# Check if its already verified as WSL system or not.
if not self.wsl_verified:
try:
# Open /proc/version file
with open("/proc/version", "r") as version_file:
# Read the content
version_info = version_file.readline()
version_info = version_info.lower()
self.wsl_verified = True
# Check if "microsoft" is present in the version information
if "microsoft" in version_info:
self.is_wsl_system = True
except Exception as e:
print(f"ERROR: Opening /proc/version failed: {e}")
return self.is_wsl_system
def is_integrated_gpu(self):
#Using cuda apis to identify whether integrated/discreet
#This is required to distinguish Tegra and ARM_SBSA devices
with guard_platform_info:
#Cuda initialize
if not self.is_integrated_gpu_verified:
cuda_init_result, = cuda.cuInit(0)
if cuda_init_result == cuda.CUresult.CUDA_SUCCESS:
#Get cuda devices count
device_count_result, num_devices = cuda.cuDeviceGetCount()
if device_count_result == cuda.CUresult.CUDA_SUCCESS:
#If atleast one device is found, we can use the property from
#the first device
if num_devices >= 1:
#Get properties from first device
property_result, properties = cudart.cudaGetDeviceProperties(0)
if property_result == cuda.CUresult.CUDA_SUCCESS:
print("Is it Integrated GPU? :", properties.integrated)
self.is_integrated_gpu_system = properties.integrated
self.is_integrated_gpu_verified = True
else:
print("ERROR: Getting cuda device property failed: {}".format(property_result))
else:
print("ERROR: No cuda devices found to check whether iGPU/dGPU")
else:
print("ERROR: Getting cuda device count failed: {}".format(device_count_result))
else:
print("ERROR: Cuda init failed: {}".format(cuda_init_result))
return self.is_integrated_gpu_system
def is_platform_aarch64(self):
#Check if platform is aarch64 using uname
if not self.is_aarch64_verified:
if platform.uname()[4] == 'aarch64':
self.is_aarch64_platform = True
self.is_aarch64_verified = True
return self.is_aarch64_platform
def main(args):
# Check input arguments
global perf_data, pipeline, src_ids, streammux, number_sources,is_first
Gst.init(None)
global platform_info
platform_info = PlatformInfo()
pipeline = Gst.Pipeline.new(f"rtsp-pipeline-{uuid.uuid1()}")
src_ids = {f'stream{i}': {'id': None, 'src': None, 'streamid': i,"no_of_att":0} for i in range(number_sources)}
cam_ids = {stream_id:extract_rtsp_details(uri_name) for stream_id,uri_name in enumerate(streams)}
# Create gstreamer elements */
# Create Pipeline element that will form a connection of other elements
print("Creating Pipeline \n ")
pipeline = Gst.Pipeline()
is_live = False
if not pipeline:
sys.stderr.write(" Unable to create Pipeline \n")
print("Creating streamux \n ")
# Create nvstreammux instance to form batches from one or more sources.
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
sys.stderr.write(" Unable to create NvStreamMux \n")
pipeline.add(streammux)
for src_id, src in enumerate(streams):
last_id = str(uuid.uuid1())
src_ids[f'stream{src_id}']['id'] = last_id
src_ids[f'stream{src_id}']['src'] = src
bin = create_source_bin(src_id,last_id,src)
if not bin:
msg = f"Error: Could not create source bin for {src_id} so remove the src dict"
logger.error(msg)
print(msg)
del src_ids[f'stream{src_id}']
number_sources-=1
continue
msg = (f"uridecodebin_{last_id} created for {src_id}")
logger.info(msg)
valve = Gst.ElementFactory.make("valve", f"valve_{src_id}")
queue = Gst.ElementFactory.make("queue", f"queue_{src_id}")
pipeline.add(bin)
pipeline.add(valve)
pipeline.add(queue)
bin_src = bin.get_static_pad("src")
user_data={"stream_id":src_id}
sink_pad = valve.get_static_pad("sink")
sink_pad.add_probe(Gst.PadProbeType.BUFFER, valve_prob, user_data)
padname=f"sink_{src_id}"
valve_src_pad = valve.get_static_pad("src")
valve_sink_pad = valve.get_static_pad("sink")
if not valve_src_pad or not valve_sink_pad:
msg = (f"Error: Could not get src pad for valve_{src_id} or sink_{src_id}")
logger.error(msg)
return
streammux_sinkpad= streammux.request_pad_simple(padname)
if not streammux_sinkpad:
msg = (f"Error: Could not get sink pad for streammux_{src_id}")
logger.error(msg)
return
user_meta = {"stream_id":src_id,'no_of_sec_to_predict':no_of_sec_to_send_data}
queue_sink_pad = queue.get_static_pad("sink")
queue_src_pad = queue.get_static_pad("src")
bin_src.link(valve_sink_pad)
valve_src_pad.link(queue_sink_pad)
queue_src_pad.link(streammux_sinkpad)
active_streams = [ src["streamid"] for src in src_ids.values()]
is_first = False
print("Creating Pgie \n ")
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
if not pgie:
sys.stderr.write(" Unable to create pgie \n")
print("Creating nvvidconv1 \n ")
nvvidconv1 = Gst.ElementFactory.make("nvvideoconvert", "convertor1")
if not nvvidconv1:
sys.stderr.write(" Unable to create nvvidconv1 \n")
print("Creating filter1 \n ")
caps1 = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
filter1 = Gst.ElementFactory.make("capsfilter", "filter1")
if not filter1:
sys.stderr.write(" Unable to get the caps filter1 \n")
filter1.set_property("caps", caps1)
if is_live:
print("Atleast one of the sources is live")
streammux.set_property('live-source', 1)
streammux.set_property('width', 1920)
streammux.set_property('height', 1080)
streammux.set_property('batch-size', number_sources)
streammux.set_property('batched-push-timeout', 33000)
pgie.set_property('config-file-path', "dsnvanalytics_pgie_config_tripwire.txt")
pgie_batch_size = pgie.get_property("batch-size")
if (pgie_batch_size != number_sources):
print("WARNING: Overriding infer-config batch-size", pgie_batch_size, " with number of sources ",
number_sources, " \n")
pgie.set_property("batch-size", number_sources)
if not platform_info.is_integrated_gpu():
# Use CUDA unified memory in the pipeline so frames
# can be easily accessed on CPU in Python.
vc_mem_type = int(pyds.NVBUF_MEM_CUDA_PINNED)
mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
streammux.set_property("nvbuf-memory-type", vc_mem_type)
if platform_info.is_wsl():
print("using nvbuf_mem_cuda_pinned memory for nvvidconv1\n")
nvvidconv1.set_property("nvbuf-memory-type", vc_mem_type)
else:
nvvidconv1.set_property("nvbuf-memory-type", mem_type)
tracker = Gst.ElementFactory.make("nvtracker", "tracker")
if not tracker:
msg = " Unable to create tracker "
sys.stderr.write(msg)
sys.exit(1)
config = configparser.ConfigParser()
config.read("dsnvanalytics_tracker_config_tripwire.txt")
config.sections()
for key in config['tracker']:
if key == 'tracker-width' :
tracker_width = config.getint('tracker', key)
tracker.set_property('tracker-width', tracker_width)
if key == 'tracker-height' :
tracker_height = config.getint('tracker', key)
tracker.set_property('tracker-height', tracker_height)
if key == 'gpu-id' :
tracker_gpu_id = config.getint('tracker', key)
tracker.set_property('gpu_id', tracker_gpu_id)
if key == 'll-lib-file' :
tracker_ll_lib_file = config.get('tracker', key)
tracker.set_property('ll-lib-file', tracker_ll_lib_file)
if key == 'll-config-file' :
tracker_ll_config_file = config.get('tracker', key)
tracker.set_property('ll-config-file', tracker_ll_config_file)
nvanalytics = Gst.ElementFactory.make("nvdsanalytics", "analytics")
if not nvanalytics:
msg = " Unable to create nvanalytics "
sys.stderr.write(msg)
sys.exit(1)
nvanalytics.set_property("config-file", "config_tripwire.txt")
nvstreamdemux = Gst.ElementFactory.make("nvstreamdemux", "nvstreamdemux")
if not nvstreamdemux:
msg = " Unable to create nvstreamdemux "
sys.stderr.write(msg)
sys.exit(1)
print("Adding elements to Pipeline \n")
pipeline.add(pgie)
pipeline.add(tracker)
pipeline.add(nvanalytics)
pipeline.add(filter1)
pipeline.add(nvvidconv1)
pipeline.add(nvstreamdemux)
print("Linking elements in the Pipeline \n")
streammux.link(pgie)
pgie.link(tracker)
tracker.link(nvvidconv1)
nvvidconv1.link(filter1)
filter1.link(nvanalytics)
nvanalytics.link(nvstreamdemux)
for i in range(number_sources):
queue4_ = Gst.ElementFactory.make("queue", f"queue__{i}")
pipeline.add(queue4_)
queue4_.set_property("max-size-buffers", 100)
queue4_.set_property("leaky", 2)
nvvideoconvert1 = Gst.ElementFactory.make("nvvideoconvert", f"nvvideoconvert_{i}")
nvvidconv1.set_property("nvbuf-memory-type", vc_mem_type)
pipeline.add(nvvideoconvert1)
nvdsosd = Gst.ElementFactory.make("nvdsosd", f"nvdsosd_{i}")
pipeline.add(nvdsosd)
nvdsosd.set_property("process-mode", 1)
nvdsosd.set_property("display-text", 1)
padname = "src_%u" % i
demuxsrcpad = nvstreamdemux.request_pad_simple(padname)
if not demuxsrcpad:
sys.stderr.write("Unable to create demux src pad \n")
sys.exit(1)
queuesinkpad = queue4_.get_static_pad("sink")
if not queuesinkpad:
sys.stderr.write("Unable to create queue sink pad \n")
sys.exit(1)
demuxsrcpad.link(queuesinkpad)
queue5 = Gst.ElementFactory.make("queue", f"queue5_{i}")
pipeline.add(queue5)
queue5.set_property("max-size-buffers", 100)
queue5.set_property("leaky", 2)
nvvidconv2 = Gst.ElementFactory.make("nvvideoconvert", f"nvvideoconvert1_{i}")
nvvidconv1.set_property("nvbuf-memory-type", vc_mem_type)
pipeline.add(nvvidconv2)
jpegenc = Gst.ElementFactory.make("nvjpegenc", f"jpeg-encoder_{i}")
jpegenc.set_property("quality", 30)
if not jpegenc:
sys.stderr.write("Unable to create nvjpegenc \n")
sys.exit(1)
pipeline.add(jpegenc)
multifilesink = Gst.ElementFactory.make("multifilesink", f"multi-file-sink_{i}")
if not multifilesink:
sys.stderr.write(" Unable to create multifilesink \n")
sys.exit(1)
multifilesink.set_property("post-messages", True)
sinkpad = multifilesink.get_static_pad("sink")
pipeline.add(multifilesink)
queue4_.link(nvvideoconvert1)
nvvideoconvert1.link(nvdsosd)
nvdsosd.link(queue5)
queue5.link(nvvidconv2)
nvvidconv2.link(jpegenc)
jpegenc.link(multifilesink)
probe_data = {
"cam_ids": cam_ids,
"local_uld_dir_name":local_uld_dir_name,
"blob_base_img_path_uld":blob_base_img_path_uld
}
# create an event loop and feed gstreamer bus mesages to it
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
perf_data = PERF_DATA(active_streams,loop)
bus.connect("message", bus_call, loop)
nvanalytics_src_pad = nvanalytics.get_static_pad("src")
if not nvanalytics_src_pad:
sys.stderr.write(" Unable to get src pad \n")
else:
nvanalytics_src_pad.add_probe(Gst.PadProbeType.BUFFER, nvanlytics_src_pad_buffer_probe, probe_data)
# perf callback function to print fps every 5 sec
GLib.timeout_add(5000, perf_data.perf_print_callback)
# List the sources
print("Now playing...")
for i, source in enumerate(args[:-1]):
if i != 0:
print(i, ": ", source)
print("Starting pipeline \n")
# start play back and listed to events
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except:
pass
# cleanup
print("Exiting app\n")
pipeline.set_state(Gst.State.NULL)
if __name__ == '__main__':
sys.exit(main(sys.argv))