Please provide complete information as applicable to your setup.
• Hardware Platform - GPU
• DeepStream Version - 7.0
• TensorRT Version - 10.6.0
• NVIDIA GPU Driver Version - 560.35.03
• Issue Type - questions
• Question -
Hi,
I’m currently working on a video analytics project, and my application architecture is structured as follows:
decodebin(n) -> streammux -> nvinfer -> nvtracker -> nvanalytics -> streamdemux -> videoconvert(n) -> nvosd(n) -> videoconvert(n) -> jpegencoder(n) -> multifilesink
The requirement is to track all objects, calculate the object count for line crossing, and save an image every 10 seconds. My approach processes all frames from the RTSP streams through nvanalytics
, and then, after nvanalytics
, I pass only one frame every 10 seconds for further processing. This approach aims to reduce resource usage by limiting downstream processing.
While the application works fine, I’m facing an issue where, after removing the frame metadata from batchmeta , the frame and user metadata are not syncing correctly. This results in mismatched data, such as frames from one stream being paired with user metadata from another.
I would like to know:
- Why is this mismatch happening?
- Is my approach correct for this scenario?
- What would be the best approach to handle this requirement effectively?
The application functions properly for video files, but this issue occurs only with RTSP streams.
Thank you!
Code :
import sys
sys.path.append('../')
import time
from temp_utils import read_yaml,extract_rtsp_details,convert_pts_to_sydtimestamp,create_dynamic_path,get_current_epoch_in_syd,convert_syd_epoch_to_datetime_str
# from utils import read_yaml,extract_rtsp_details,convert_pts_to_sydtimestamp,create_dynamic_path,send_message_to_iothub,get_current_epoch_in_syd,convert_syd_epoch_to_datetime_str
import gi
import configparser
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
from ctypes import *
from urllib.parse import urlparse
from common.platform_info import PlatformInfo
from common.bus_call import bus_call
from common.FPS import PERF_DATA
import os
import json
import pyds
import uuid
# logging.basicConfig(level=logging.INFO,filename="fps.log",format=" [ %(asctime)s %(message)s ]\n ")
perf_data = None
MAX_DISPLAY_LEN=64
PGIE_CLASS_ID_PERSON = 0
PGIE_CLASS_ID_BAG = 1
PGIE_CLASS_ID_FACE = 2
MUXER_OUTPUT_WIDTH=1920
MUXER_OUTPUT_HEIGHT=1080
MAX_RECT_CAPACITY = 16
MUXER_BATCH_TIMEOUT_USEC = 0.5*1e6
TILED_OUTPUT_WIDTH=1280
TILED_OUTPUT_HEIGHT=720
GST_CAPS_FEATURES_NVMM="memory:NVMM"
# OSD_PROCESS_MODE= 0 ## CPU Mode
OSD_PROCESS_MODE = 1 ## GPU Mode
OSD_DISPLAY_TEXT= 1
pgie_classes_str= ["person","bag","face"]
no_of_streams = len(sys.argv)-1
config_file_path = "main_config.yaml"
first_frame_processed = False
curr_epoch = get_current_epoch_in_syd()
stream_start_time_dict = {i:None for i in range(no_of_streams)}
start_time_syd_dict = {i:curr_epoch for i in range(no_of_streams)}
start_time_syd_dict_json = {i:curr_epoch for i in range(no_of_streams)}
json_dir_name = "jsons"
os.makedirs(json_dir_name,exist_ok=True)
def generate_unique_number():
unique_number = uuid.uuid4().int
truncated_number = int(str(unique_number)[:12])
return truncated_number
global unique_number
unique_number = generate_unique_number()
def cb_newpad(decodebin, decoder_src_pad,data):
print("In cb_newpad\n")
caps=decoder_src_pad.get_current_caps()
if not caps:
caps = decoder_src_pad.query_caps()
gststruct=caps.get_structure(0)
gstname=gststruct.get_name()
source_bin=data
features=caps.get_features(0)
print("gstname=",gstname)
if(gstname.find("video")!=-1):
print("features=",features)
if features.contains("memory:NVMM"):
bin_ghost_pad=source_bin.get_static_pad("src")
if not bin_ghost_pad.set_target(decoder_src_pad):
sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
else:
sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")
def decodebin_child_added(child_proxy,Object,name,user_data):
print("Decodebin child added:", name, "\n")
if(name.find("decodebin") != -1):
Object.connect("child-added",decodebin_child_added,user_data)
if "source" in name:
source_element = child_proxy.get_by_name("source")
if source_element.find_property('drop-on-latency') != None:
Object.set_property("drop-on-latency", True)
def create_source_bin(index,uri):
print("Creating source bin")
bin_name="source-bin-%02d" %index
print('bin_name-',bin_name)
nbin=Gst.Bin.new(bin_name)
if not nbin:
sys.stderr.write(" Unable to create source bin \n")
uri_decode_bin=Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
if not uri_decode_bin:
sys.stderr.write(" Unable to create uri decode bin \n")
reconnect_attempts = -1
reconnect_interval = 30
uri_decode_bin.set_property("rtsp-reconnect-attempts",reconnect_attempts)
uri_decode_bin.set_property("rtsp-reconnect-interval",reconnect_interval)
uri_decode_bin.set_property("message-forward", True)
uri_decode_bin.set_property("uri",uri)
uri_decode_bin.connect("pad-added",cb_newpad,nbin)
uri_decode_bin.connect("child-added",decodebin_child_added,nbin)
Gst.Bin.add(nbin,uri_decode_bin)
bin_pad=nbin.add_pad(Gst.GhostPad.new_no_target("src",Gst.PadDirection.SRC))
if not bin_pad:
sys.stderr.write(" Failed to add ghost pad in source bin \n")
return None
return nbin
def make_element(element_name, i):
element = Gst.ElementFactory.make(element_name, element_name)
if not element:
sys.stderr.write(" Unable to create {0}".format(element_name))
element.set_property("name", "{0}-{1}".format(element_name, str(i)))
return element
def nvanalytics_src_pad_buffer_probe(pad, info, u_data):
global start_time_syd_dict_json
global first_frame_processed
global stream_start_time_dict
global perf_data
no_of_sec_send_msg = u_data["no_of_sec_send_msg"]
cam_ids = u_data["cam_id"]
if not first_frame_processed:
straem_start_time_ = get_current_epoch_in_syd()
stream_start_time_dict = {i:straem_start_time_ for i in range(no_of_streams)}
first_frame_processed = True
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer")
return Gst.PadProbeReturn.DROP
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
prev_frame = None
while l_frame is not None:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
stream_id = frame_meta.pad_index
pts = frame_meta.buf_pts
curr_time = stream_start_time_dict[stream_id] + (pts / 1e9)
if curr_time - start_time_syd_dict_json[stream_id] < no_of_sec_send_msg:
if prev_frame is None:
batch_meta.frame_meta_list = l_frame.next
else:
prev_frame.next = l_frame.next
pyds.nvds_remove_frame_meta_from_batch(batch_meta, frame_meta)
l_frame = l_frame.next
stream_index = "stream{0}".format(stream_id)
perf_data.update_fps(stream_index)
continue
start_time_syd_dict_json[stream_id] = curr_time
syd_time_pts_ = convert_pts_to_sydtimestamp(pts, stream_start_time_dict[stream_id])
syd_time_pts_file = convert_pts_to_sydtimestamp(pts, stream_start_time_dict[stream_id],"datetime")
display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
display_meta.num_labels = 1
text_params = display_meta.text_params[0]
# Set custom text and its properties
text_params.display_text = f"{syd_time_pts_}"
text_params.x_offset = 100 # X-coordinate
text_params.y_offset = 50 # Y-coordinate
text_params.font_params.font_name = "Serif"
text_params.font_params.font_size = 12
text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0) # RGBA (white)
text_params.set_bg_clr = 1
text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0) # RGBA (black with transparency)
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
frame_number = frame_meta.frame_num
num_rects = frame_meta.num_obj_meta
l_user = frame_meta.frame_user_meta_list
while l_user:
try:
user_meta = pyds.NvDsUserMeta.cast(l_user.data)
if user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type("NVIDIA.DSANALYTICSFRAME.USER_META"):
user_meta_data = pyds.NvDsAnalyticsFrameMeta.cast(user_meta.user_meta_data)
data = {
"stream_id": stream_id,
"camera_name": cam_ids[stream_id],
"unique_number_uuid": unique_number,
"batch_id": frame_meta.batch_id,
"frame_num": frame_number,
"buf_pts": pts,
"ntp_timestamp": frame_meta.ntp_timestamp,
"source_id": frame_meta.source_id,
"object_count": user_meta_data.objCnt,
"Number_of_objects": num_rects,
"object_in_ROI_count": user_meta_data.objInROIcnt,
"cumulative_line_cross_count": user_meta_data.objLCCumCnt,
"current_line_cross_count": user_meta_data.objLCCurrCnt,
"overcrowding_status": user_meta_data.ocStatus,
"unique_id": user_meta_data.unique_id,
"PTS_TIME": syd_time_pts_
}
p = os.path.join(json_dir_name, f"{syd_time_pts_file}_{stream_id}.json")
with open(p, "w") as f:
json.dump(data, f)
print(f"JSON saved: {p}")
# send_message_to_iothub(json.dumps(data))
except StopIteration:
break
l_user = l_user.next
stream_index = "stream{0}".format(stream_id)
perf_data.update_fps(stream_index)
prev_frame = l_frame
l_frame = l_frame.next
except StopIteration:
break
if not batch_meta.num_frames_in_batch:
#print("No elements in batch meta")
return Gst.PadProbeReturn.DROP
return Gst.PadProbeReturn.OK
def frame_filter_pad_probe(pad, info, user_data):
global start_time_syd_dict
base_path = user_data["base_path"]
cam_id_ip_port = user_data["cam_id"]
no_of_sec_send_msg = user_data["no_of_sec_send_msg"]
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return Gst.PadProbeReturn.OK
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
stream_id = frame_meta.pad_index
try:
pts = frame_meta.buf_pts
curr_time = stream_start_time_dict[stream_id] + (pts / 1e9)
if (curr_time - start_time_syd_dict[stream_id]) >= no_of_sec_send_msg:
syd_time_pts_file = convert_pts_to_sydtimestamp(pts, stream_start_time_dict[stream_id],"datetime")
frame_number = frame_meta.frame_num
start_time_syd_dict[stream_id] = curr_time
image_folder_path = create_dynamic_path(base_path, cam_id_ip_port)
image_name = f"{syd_time_pts_file}_{frame_number}.jpg"
image_save_path = os.path.join(image_folder_path,image_name)
print(f"image save sucessfully image path : {image_save_path} stream id : {stream_id}")
multifilesink = pad.get_parent_element()
multifilesink.set_property("location", image_save_path)
except Exception as e:
print(f"Error occurred while save the image : {e}")
try:
l_frame = l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
def main(args,base_path,no_of_sec_send_msg,analytics_config_path,infer_config_path,tracker_config_path):
if len(args) < 2:
sys.stderr.write("usage: %s <uri1> [uri2] ... [uriN]\n" % args[0])
sys.exit(1)
global perf_data
perf_data = PERF_DATA(len(args) - 1)
number_sources=len(args)-1
cam_ids = {stream_id:extract_rtsp_details(uri_name) for stream_id,uri_name in enumerate(args[1:])}
platform_info = PlatformInfo()
Gst.init(None)
print("Creating Pipeline \n ")
pipeline = Gst.Pipeline()
is_live = False
if not pipeline:
sys.stderr.write(" Unable to create Pipeline \n")
print("Creating streamux \n ")
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
sys.stderr.write(" Unable to create NvStreamMux \n")
pipeline.add(streammux)
for i in range(number_sources):
print("Creating source_bin ",i," \n ")
uri_name=args[i+1]
if uri_name.find("rtsp://") == 0 :
is_live = True
source_bin=create_source_bin(i, uri_name)
if not source_bin:
sys.stderr.write("Unable to create source bin \n")
pipeline.add(source_bin)
padname="sink_%u" %i
sinkpad= streammux.request_pad_simple(padname)
if not sinkpad:
sys.stderr.write("Unable to create sink pad bin \n")
srcpad=source_bin.get_static_pad("src")
if not srcpad:
sys.stderr.write("Unable to create src pad bin \n")
srcpad.link(sinkpad)
queue1=Gst.ElementFactory.make("queue","queue1")
queue2=Gst.ElementFactory.make("queue","queue2")
queue3=Gst.ElementFactory.make("queue","queue3")
queue4=Gst.ElementFactory.make("queue","queue4")
pipeline.add(queue1)
pipeline.add(queue2)
pipeline.add(queue3)
pipeline.add(queue4)
for q in [queue1,queue2,queue3,queue4]:
q.set_property("max-size-buffers", 100)
q.set_property("leaky", True)
print("Creating Pgie \n ")
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
if not pgie:
sys.stderr.write(" Unable to create pgie \n")
print("Creating nvtracker \n ")
tracker = Gst.ElementFactory.make("nvtracker", "tracker")
if not tracker:
sys.stderr.write(" Unable to create tracker \n")
print("Creating nvdsanalytics \n ")
nvanalytics = Gst.ElementFactory.make("nvdsanalytics", "analytics")
if not nvanalytics:
sys.stderr.write(" Unable to create nvanalytics \n")
nvanalytics.set_property("config-file", analytics_config_path)
print("Creating nvstreamdemux")
nvstreamdemux = Gst.ElementFactory.make("nvstreamdemux", "nvstreamdemux")
if not nvstreamdemux:
sys.stderr.write(" Unable to create nvstreamdemux \n")
if is_live:
print("Atleast one of the sources is live")
streammux.set_property('live-source', 1)
streammux.set_property('width', 1920)
streammux.set_property('height', 1080)
streammux.set_property('batch-size', number_sources)
streammux.set_property('batched-push-timeout', MUXER_BATCH_TIMEOUT_USEC)
pgie.set_property('config-file-path', infer_config_path)
pgie_batch_size=pgie.get_property("batch-size")
if(pgie_batch_size != number_sources):
print("WARNING: Overriding infer-config batch-size",pgie_batch_size," with number of sources ", number_sources," \n")
pgie.set_property("batch-size",number_sources)
config = configparser.ConfigParser()
config.read(tracker_config_path)
config.sections()
for key in config['tracker']:
if key == 'tracker-width' :
tracker_width = config.getint('tracker', key)
tracker.set_property('tracker-width', tracker_width)
if key == 'tracker-height' :
tracker_height = config.getint('tracker', key)
tracker.set_property('tracker-height', tracker_height)
if key == 'gpu-id' :
tracker_gpu_id = config.getint('tracker', key)
tracker.set_property('gpu_id', tracker_gpu_id)
if key == 'll-lib-file' :
tracker_ll_lib_file = config.get('tracker', key)
tracker.set_property('ll-lib-file', tracker_ll_lib_file)
if key == 'll-config-file' :
tracker_ll_config_file = config.get('tracker', key)
tracker.set_property('ll-config-file', tracker_ll_config_file)
pipeline.add(pgie)
pipeline.add(tracker)
pipeline.add(nvanalytics)
pipeline.add(nvstreamdemux)
streammux.link(queue1)
queue1.link(pgie)
pgie.link(queue2)
queue2.link(tracker)
tracker.link(queue3)
queue3.link(nvanalytics)
nvanalytics.link(queue4)
queue4.link(nvstreamdemux)
for i in range(number_sources):
queue5 = make_element("queue", i)
pipeline.add(queue5)
queue5.set_property("max-size-buffers", 100)
queue5.set_property("leaky", True)
nvvideoconvert = make_element("nvvideoconvert", i)
# nvvideoconvert = make_element("capsfilter", i)
pipeline.add(nvvideoconvert)
nvdsosd = make_element("nvdsosd", i)
pipeline.add(nvdsosd)
nvdsosd.set_property("process-mode", OSD_PROCESS_MODE)
nvdsosd.set_property("display-text", OSD_DISPLAY_TEXT)
# connect nvstreamdemux -> queue5
padname = "src_%u" % i
demuxsrcpad = nvstreamdemux.request_pad_simple(padname)
if not demuxsrcpad:
sys.stderr.write("Unable to create demux src pad \n")
queuesinkpad = queue5.get_static_pad("sink")
if not queuesinkpad:
sys.stderr.write("Unable to create queue sink pad \n")
demuxsrcpad.link(queuesinkpad)
queue6 = make_element("queue", f"queue6_{i}")
pipeline.add(queue6)
queue6.set_property("max-size-buffers", 100)
queue6.set_property("leaky", True) # upstream
nvvidconv2 = make_element("nvvideoconvert", f"nvvideoconvert_{i}")
# nvvidconv2 = make_element("capsfilter", f"capsfilter_{i}")
pipeline.add(nvvidconv2)
jpegenc = Gst.ElementFactory.make("nvjpegenc", f"jpeg-encoder_{i}")
jpegenc.set_property("quality", 50)
if not jpegenc:
sys.stderr.write("Unable to create nvjpegenc \n")
pipeline.add(jpegenc)
multifilesink = Gst.ElementFactory.make("multifilesink", f"multi-file-sink_{i}")
if not multifilesink:
sys.stderr.write(" Unable to create multifilesink \n")
multifilesink.set_property("post-messages", True)
pipeline.add(multifilesink)
queue5.link(nvvideoconvert)
nvvideoconvert.link(nvdsosd)
nvdsosd.link(queue6)
queue6.link(nvvidconv2)
nvvidconv2.link(jpegenc)
jpegenc.link(multifilesink)
uri_name = args[i + 1]
probe_data = {
"cam_id": cam_ids[i],
"base_path":base_path,
"no_of_sec_send_msg":no_of_sec_send_msg
}
sinkpad = multifilesink.get_static_pad("sink")
if not sinkpad:
sys.stderr.write("Unable to get sink pad of multifilesink \n")
sys.exit(1)
else:
sinkpad.add_probe(Gst.PadProbeType.BUFFER, frame_filter_pad_probe, probe_data)
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect ("message", bus_call, loop)
nvanalytics_src_pad=nvanalytics.get_static_pad("src")
if not nvanalytics_src_pad:
sys.stderr.write(" Unable to get src pad \n")
else:
nvanalytics_src_pad.add_probe(Gst.PadProbeType.BUFFER, nvanalytics_src_pad_buffer_probe, {"no_of_sec_send_msg":no_of_sec_send_msg,"cam_id":cam_ids})
GLib.timeout_add(5000*2, perf_data.perf_print_callback)
print("Now playing...")
for i, source in enumerate(args):
if (i != 0):
print(i, ": ", source)
print("Starting pipeline \n")
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except:
pass
print("Exiting app\n")
pipeline.set_state(Gst.State.NULL)
if __name__ == '__main__':
s = time.time()
print(f"start time ===================================> {convert_syd_epoch_to_datetime_str(get_current_epoch_in_syd(),'datetime')}")
yaml_con = read_yaml(config_file_path)
image_dir_name = yaml_con["image_dir_name"]
os.makedirs(image_dir_name,exist_ok=True)
no_of_sec_send_msg = yaml_con["no_of_sec_send_msg"]
analytics_config_path = yaml_con["analytics_config_path"]
infer_config_path = yaml_con["infer_config_path"]
tracker_config_path = yaml_con["tracker_config_path"]
main(sys.argv,image_dir_name,no_of_sec_send_msg,analytics_config_path,infer_config_path,tracker_config_path)
print(f"end time ===================================> {convert_syd_epoch_to_datetime_str(get_current_epoch_in_syd(),'datetime')}\n No of Seconds =====================> {(time.time() - s):.2f} seconds")