Please provide complete information as applicable to your setup.
• Hardware Platform (GPU)
• DeepStream Version 7.0
• TensorRT Version 10.6.0
• NVIDIA GPU Driver Version - 560.35.03
• Issue Type( questions, new requirements, bugs)
**• How to reproduce the issue ,
Hi, currently I’m working on a video analytics project, and in our application, we use uridecodebin
. However, after some time, we encounter an issue where the uridecodebin
RTSP connection fails, causing the application to crash. To address this, I switched to using nvurisrcbin
, but now I am facing an image dilation issue.
To resolve this, I plan to implement a custom reconnect logic. For example, if the stream doesn’t process a frame within a certain time period, I will remove the affected uridecodebin
and re-add it. While this reconnection works fine, after some time, my downstream elements (such as multisink
, etc.) stop receiving data, even though the application appears to continue functioning normally. I’m unsure why this happens.
import configparser
import sys
from urllib.parse import urlparse
import gi
from gi.repository import Gst, GLib
import time
import uuid
import logging
import os
import pytz
from datetime import datetime,timedelta
from utils import send_message_to_iothub,read_yaml,ntp_to_sydney_time,extract_rtsp_details,convert_pts_to_sydtimestamp,create_dynamic_path,get_current_epoch_in_syd,convert_syd_epoch_to_datetime_str,get_current_datetime_str
from threading import Lock
gi.require_version('Gst', '1.0')
import random
import json
start_time=time.time()
pipeline = None
fps_mutex = Lock()
import pyds
syd_tz = pytz.timezone('Australia/Sydney')
ntp_epoch = datetime(1970, 1, 1, tzinfo=pytz.UTC)
format = f"[ %(asctime)s %(levelname)s %(message)s ]\n"
main_yaml_file_path = "main_config_reconnect_roi.yaml"
yaml_con = read_yaml(main_yaml_file_path)
streams = yaml_con["streams"]
log_path = yaml_con["log_path"]
no_of_sec_to_send_data = int(yaml_con["no_of_sec_send_msg"])
img_dir_name = yaml_con["img_dir_name"]
osd_process_mode = int(yaml_con["osd_process_mode"])
osd_display_text = int(yaml_con["osd_display_text"])
sync_inputs = int(yaml_con["streammux_sync_inputs"])
attach_sys_ts = int(yaml_con["streammux_attach_sys_ts"])
max_latency = int(yaml_con["streammux_max_latency"])
frame_duration = int(yaml_con["streammux_frame_duration"])
streammux_muxer_batch_timeout_usec = int(yaml_con["streammux_muxer_batch_timeout_usec"])
analytics_config_path = yaml_con["analytics_config_path"]
infer_config_path = yaml_con["infer_config_path"]
tracker_config_path = yaml_con["tracker_config_path"]
max_size_buffers = int(yaml_con["queue_max_size_buffers"])
im_quality = int(yaml_con["jpeg_encoder_quality"])
os.makedirs(img_dir_name,exist_ok=True)
logging.basicConfig(filename=log_path,level=logging.INFO,format=format)
number_sources = len(streams)
start_time_streammux = { i:None for i in range(number_sources)}
start_time_streammux_ = { i:None for i in range(number_sources)}
start_time_streammux_first_frame = { i:False for i in range(number_sources)}
src_ids = {}
start_time_ids = { i:get_current_epoch_in_syd() for i in range(number_sources)}
streammux = None
def on_pad_added(src, pad, sink_element):
print(f"New pad {pad.get_name()} added from {src.get_name()}")
sink_pad = sink_element.get_static_pad("sink")
if not sink_pad:
print(f"Error: Could not get sink pad for {sink_element.get_name()}")
return
if sink_pad.is_linked():
print(f"Warning: Sink pad of {sink_element.get_name()} is already linked")
return
if pad.query_caps(None).is_any():
print(f"Warning: Pad {pad.get_name()} does not have valid caps yet!")
return
link_status = pad.link(sink_pad)
if link_status != Gst.PadLinkReturn.OK:
print(f"Pad link failed: {link_status}")
else:
print(f"Successfully linked {pad.get_name()} {sink_element.get_name()}")
class GETFPS:
def __init__(self,stream_id):
global start_time
self.start_time=start_time
self.is_first=True
self.frame_count=0
self.stream_id=stream_id
def update_fps(self):
end_time = time.time()
if self.is_first:
self.start_time = end_time
self.is_first = False
else:
global fps_mutex
with fps_mutex:
self.frame_count = self.frame_count + 1
def get_fps(self):
end_time = time.time()
with fps_mutex:
stream_fps = float(self.frame_count/(end_time - self.start_time))
self.frame_count = 0
self.start_time = end_time
return round(stream_fps, 2)
def print_data(self):
print('frame_count=',self.frame_count)
print('start_time=',self.start_time)
def on_pad_added_1(src, pad,sink_element):
print("==============="*4)
pad_caps = pad.query_caps(None)
pad_name = pad.get_name()
pad_structure = pad_caps.to_string()
print(f"New pad {pad_name} added with caps: {pad_structure}")
print(f"New pad added by: {src.get_name()}")
sink_pad = sink_element.get_static_pad("sink")
if not sink_pad.is_linked():
print(f"valve Sink pad linked to uridecodebin")
pad.link(sink_pad)
print("==============="*4)
def get_linked_element(pad):
"""Finds the element that a given pad is linked to."""
if pad and pad.is_linked():
# print(f"==================== pad {pad} is linked to uridecodebin ==================== ")
peer_pad = pad.get_peer() # Get the pad it is linked to
if peer_pad:
peer_element = peer_pad.get_parent_element() # Get the element that owns the pad
return peer_element.get_name() if peer_element else None
return None
class PERF_DATA:
def __init__(self, num_streams,q_list):
self.perf_dict = {} # FPS for each stream
self.all_stream_fps = {}
self.q_list = q_list
self.fps_mutex = Lock() # Lock for thread safety
for i in range(num_streams):
self.all_stream_fps[f"stream{i}"] = GETFPS(i)
def add_stream(self, stream_index):
""" Add a new stream dynamically """
with self.fps_mutex:
if stream_index not in self.all_stream_fps:
print(f"Adding new stream: {stream_index}")
self.all_stream_fps[stream_index] = GETFPS(stream_index)
def delete_stream(self, stream_index):
with self.fps_mutex:
if stream_index in self.all_stream_fps:
print(f"Deleting stream: {stream_index}")
self.all_stream_fps.pop(stream_index)
else:
print(f"Stream {stream_index} not found.")
def update_fps(self, stream_index):
""" Update FPS for the given stream index """
with self.fps_mutex:
if stream_index in self.all_stream_fps:
self.all_stream_fps[stream_index].update_fps()
else:
print(f"Warning: Stream {stream_index} not registered. Ignoring update.")
def perf_print_callback(self):
with self.fps_mutex:
self.perf_dict = {stream_index: stream.get_fps() for stream_index, stream in self.all_stream_fps.items()}
print("\n**PERF Data : ", self.perf_dict, "\n")
for q in self.q_list:
# element = pipeline.get_by_name("your_element_name")
if q:
state_return, state, pending = q.get_state(Gst.CLOCK_TIME_NONE)
print(f"Element {q.get_name()} is in state: {state.value_nick}")
else:
print("Element not found")
queue5 = q
if queue5:
src_pad = queue5.get_static_pad("src")
peer_pad = src_pad.get_peer() if src_pad else None
if peer_pad:
print(f"{q.get_name()} is linked to {peer_pad.get_parent_element().get_name()}")
else:
print(f"{q.get_name()} is NOT linked to any element!")
num_frames = q.get_property("current-level-buffers")
print(f"=========================> {q.get_name()} {num_frames=}")
# list_pipeline_elements(pipeline)
for k, v in self.perf_dict.items():
src_id = src_ids[k]
last_id = src_id['id']
stream_id = src_id['streamid']
rtsp_url = src_id['src']
valve_element_name = f"valve_{stream_id}"
valve = pipeline.get_by_name(valve_element_name)
sink_valve = valve.get_static_pad("sink")
print(f"=============================> {valve_element_name} {get_linked_element(sink_valve)}" )
if not v:
new_id = str(uuid.uuid1())
del_add_rtspsrc("perf 0","perf 0",last_id,new_id,stream_id,rtsp_url)
src_ids[k]['id'] = new_id
valve_element_name = f"valve_{stream_id}"
valve = pipeline.get_by_name(valve_element_name)
sink_valve = valve.get_static_pad("sink")
current_value = valve.get_property("drop")
print(f"==================> valve current value {current_value=}")
logging.info(f"pipeline alive data : {self.perf_dict}")
return True
def wait_for_null_state(element, timeout=3):
print(f"Waiting for {element.get_name()} to reach NULL state...")
element.set_state(Gst.State.NULL)
success, state, _ = element.get_state(timeout * Gst.SECOND)
if success == Gst.StateChangeReturn.SUCCESS and state == Gst.State.NULL:
print(f"{element.get_name()} successfully reached NULL state.")
return True
else:
print(f"Warning: {element.get_name()} did not reach NULL state within {timeout} seconds.")
return False
def del_add_rtspsrc_1(return_type, msg, old_id, new_id, stream_id, rtsp_url):
logging.error(f"{return_type} : {msg}")
print(f"Trying to create a new RTSP source ... {old_id=}")
old_uridecodebin_name = f"uridecodebin_{old_id}"
old_uridecodebin = pipeline.get_by_name(old_uridecodebin_name)
valve_element_name = f"valve_{stream_id}"
valve = pipeline.get_by_name(valve_element_name)
valve.set_property("drop", True)
if not old_uridecodebin:
print(f"Error: Could not find uridecodebin_{old_id} in the pipeline.")
return
src_pad = old_uridecodebin.get_static_pad("src_0")
if src_pad and src_pad.is_linked():
print(f"unlink the valve rtspsrc {old_uridecodebin_name} and valve ")
src_pad.unlink(valve.get_static_pad("sink"))
if wait_for_null_state(old_uridecodebin):
old_uridecodebin.set_state(Gst.State.NULL)
pipeline.remove(old_uridecodebin)
# new_uridecodebin = create_rtsp_source(rtsp_url, new_id, stream_id)
new_uridecodebin = Gst.ElementFactory.make("uridecodebin", f"uridecodebin_{new_id}")
new_uridecodebin.set_property("uri", rtsp_url)
new_uridecodebin.connect("pad-added", on_pad_added,valve)
pipeline.add(new_uridecodebin)
new_uridecodebin.set_state(Gst.State.READY)
new_uridecodebin.sync_state_with_parent()
time.sleep(0.1)
new_uridecodebin.set_state(Gst.State.PLAYING)
valve.set_property("drop", False)
print(f"Successfully replaced RTSP source for stream {stream_id}")
def list_pipeline_elements(pipeline):
if not pipeline:
print(" Error: Pipeline is None!")
return
print("-" * 50)
print("Listing all elements in the pipeline:\n")
for element in pipeline.iterate_elements():
element_name = element.get_name()
# element_type = Gst.ElementFactory.get_element_name(element)
print(f"Element: {element_name}")
state_return, state, pending = element.get_state(Gst.CLOCK_TIME_NONE)
print(f"Element {element.get_name()} is in state: {state.value_nick}")
# Check linked pads
for pad in element.iterate_pads():
pad_name = pad.get_name()
peer_pad = pad.get_peer()
peer_element_name = peer_pad.get_parent_element().get_name() if peer_pad else "None"
print(f"Pad: {pad_name} Linked to: {peer_element_name}")
print("-" * 50)
def del_add_rtspsrc(return_type, msg, old_id, new_id, stream_id, rtsp_url):
logging.error(f"{return_type} : {msg}")
print(f"Trying to create a new RTSP source ... {old_id=}")
old_uridecodebin_name = f"uridecodebin_{old_id}"
old_uridecodebin = pipeline.get_by_name(old_uridecodebin_name)
valve = pipeline.get_by_name(f"valve_{stream_id}")
valve.set_property("drop", True)
if not old_uridecodebin:
print(f"Error: Could not find uridecodebin_{old_id} in the pipeline.")
return
queue5 = pipeline.get_by_name(f"queue5_{stream_id}")
queue5.set_state(Gst.State.NULL) # Flush stuck frames
queue5.set_state(Gst.State.PLAYING)
src_pad = old_uridecodebin.get_static_pad("src_0")
if src_pad and src_pad.is_linked():
print(f"Unlinking old uridecodebin: {old_uridecodebin_name}")
src_pad.unlink(valve.get_static_pad("sink"))
old_uridecodebin.set_state(Gst.State.NULL)
pipeline.remove(old_uridecodebin)
new_uridecodebin = Gst.ElementFactory.make("uridecodebin", f"uridecodebin_{new_id}")
new_uridecodebin.set_property("uri", rtsp_url)
new_uridecodebin.connect("pad-added", on_pad_added, valve)
new_uridecodebin.set_state(Gst.State.READY)
pipeline.add(new_uridecodebin)
new_uridecodebin.sync_state_with_parent()
time.sleep(0.1)
new_uridecodebin.set_state(Gst.State.PLAYING)
valve.set_property("drop", False)
print(f"Successfully replaced RTSP source for stream {stream_id}")
def demux_src_pad_buffer_probe(pad, info, user_data):
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer")
return Gst.PadProbeReturn.DROP
try:
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
stream_id = frame_meta.pad_index
pts = frame_meta.buf_pts
# curr_time = start_time_streammux.get(stream_id, 0) + (pts / 1e9)
curr_time = start_time_streammux_.get(stream_id, 0)
# ntp_sec = frame_meta.ntp_timestamp / 1e9
# curr_time = pts
syd_time_pts_ = convert_pts_to_sydtimestamp(pts, start_time_streammux[stream_id])
display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
display_meta.num_labels = 1
text_params = display_meta.text_params[0]
# text_params.display_text = f"buf ts : {syd_time_pts_} ntp ts : {ntp_to_sydney_time(ntp_sec)}"
text_params.display_text = f"buf ts : {syd_time_pts_}"
text_params.x_offset = 50
text_params.y_offset = 50
text_params.font_params.font_name = "Serif"
text_params.font_params.font_size = 12
text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
text_params.set_bg_clr = 1
text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
l_user = frame_meta.frame_user_meta_list
while l_user:
try:
user_meta = pyds.NvDsUserMeta.cast(l_user.data)
if user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type(
"NVIDIA.DSANALYTICSFRAME.USER_META"
):
user_meta_data = pyds.NvDsAnalyticsFrameMeta.cast(user_meta.user_meta_data)
obj_in_roi = user_meta_data.objInROIcnt
display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
no_of_labels = len(obj_in_roi)
display_meta.num_labels = no_of_labels
start_x,start_y = 50,80
for i,(label_id,count) in enumerate(obj_in_roi.items()):
text_params = display_meta.text_params[i]
text_params.display_text = f"{label_id} : {count}"
text_params.x_offset = start_x
text_params.y_offset = start_y
text_params.font_params.font_name = "Serif"
text_params.font_params.font_size = 12
text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
text_params.set_bg_clr = 1
text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
start_y+=30
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
except Exception as e:
print(f"Error processing user metadata: {e}")
try:
l_user = l_user.next
except StopIteration:
break
except Exception as e:
print(f"Error processing frame: {e}")
try:
l_frame = l_frame.next
except StopIteration:
break
except Exception as e:
print(f"Error processing buffer: {e}")
return Gst.PadProbeReturn.DROP
stream_index = "stream{0}".format(stream_id)
perf_data.update_fps(stream_index)
return Gst.PadProbeReturn.OK
def create_dynamic_path(epoch_sec, base_path, ip_port):
# Convert Unix epoch seconds to a datetime object
converted_time = datetime.utcfromtimestamp(epoch_sec).replace(tzinfo=pytz.utc)
now = converted_time.astimezone(syd_tz)
# Construct the dynamic path
dynamic_path = os.path.join(
base_path,
ip_port,
f"{now.year}_year",
f"{now.month:02d}_month",
f"{now.day:02d}_day",
f"{now.hour:02d}_hour",
# f"{now.minute:02d}_minute",
)
os.makedirs(dynamic_path, exist_ok=True)
return dynamic_path
def bus_call(bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
print(f"\n ======================= End-of-stream =======================")
# logging.error(f"END OF STREAM : {err}")
# loop.quit()
elif t==Gst.MessageType.WARNING:
err, debug = message.parse_warning()
logging.debug(f"{err}")
print("Warning: %s: %s" % (err, debug))
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
logging.error(f"{err}")
print("Error: %s: %s" % (err, debug))
return True
def valve_prob(pad, info, user_data):
# print("inside the valve probe")
stream_id = user_data["stream_id"]
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer")
return Gst.PadProbeReturn.OK
try:
stream_index = "stream{0}".format(stream_id)
perf_data.update_fps(stream_index)
except Exception as e:
print(f"Error accessing batch metadata: {e}")
return Gst.PadProbeReturn.OK
return Gst.PadProbeReturn.OK
def make_element(element_name, i):
element = Gst.ElementFactory.make(element_name, element_name)
if not element:
sys.stderr.write(" Unable to create {0}".format(element_name))
element.set_property("name", "{0}-{1}".format(element_name, str(i)))
return element
def format_location_callback(splitmuxsink, fragment_id, video_dir_path, stream_id):
# Get the current date and time
current_datetime = convert_syd_epoch_to_datetime_str(get_current_epoch_in_syd(),"date_time")
# Construct the filename with the current datetime
return os.path.join(video_dir_path, f"stream_{stream_id}_rtsp_out_{current_datetime}_uri.mp4")
def frame_filter_pad_probe(pad, info, user_data):
print(f"=================================================== inside the multifilesink =================================================== ")
base_img_path = user_data["base_img_path"]
cam_id_ip_port = user_data["cam_id"]
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer")
return Gst.PadProbeReturn.OK
try:
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
stream_id = frame_meta.pad_index
pts = frame_meta.buf_pts
# curr_time = start_time_streammux.get(stream_id, 0) + (pts / 1e9)
curr_time = start_time_streammux_.get(stream_id, 0)
# syd_time_pts_file = convert_syd_epoch_to_datetime_str(curr_time, "datetime")
syd_time_pts_hhmmss = convert_syd_epoch_to_datetime_str(curr_time, "hhmmss")
current_yymmdd = get_current_datetime_str("yymmdd")
current_hhmmss = get_current_datetime_str("hhmmss")
frame_number = frame_meta.frame_num
ntp_sec = frame_meta.ntp_timestamp / 1e9
# syd_time_ntp_file = ntp_to_sydney_time(ntp_sec,"file_name")
syd_time_ntp_hhmmss = ntp_to_sydney_time(ntp_sec,"hhmmss")
syd_time_ntp_sql = ntp_to_sydney_time(ntp_sec)
# image_folder_path = create_dynamic_path(ntp_sec,base_img_path, cam_id_ip_port)
image_folder_path = create_dynamic_path(curr_time, base_img_path, cam_id_ip_port)
image_name = f"CAM-{cam_id_ip_port[:7]}_DATE-{current_yymmdd}_PTS-{syd_time_pts_hhmmss}_NTP-{syd_time_ntp_hhmmss}_SYS-{current_hhmmss}.jpg"
# image_name = f"{syd_time_ntp_file}_{frame_number}.jpg"
# image_name = f"{syd_time_pts_file}_{frame_number}.jpg"
image_save_path = os.path.join(image_folder_path, image_name)
print(f"Saving image: {image_save_path}, stream_id: {stream_id}")
multifilesink = pad.get_parent_element()
multifilesink.set_property("location", image_save_path)
l_user = frame_meta.frame_user_meta_list
while l_user:
try:
user_meta = pyds.NvDsUserMeta.cast(l_user.data)
if user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type(
"NVIDIA.DSANALYTICSFRAME.USER_META"
):
user_meta_data = pyds.NvDsAnalyticsFrameMeta.cast(user_meta.user_meta_data)
num_rects = frame_meta.num_obj_meta
syd_time_pts_ = convert_syd_epoch_to_datetime_str(curr_time, "datetime_sql")
data = {
"stream_id": stream_id,
"camera_name": cam_id_ip_port,
"unique_number_uuid": 10,
"batch_id": frame_meta.batch_id,
"frame_num": frame_number,
"buf_pts": pts,
"ntp_timestamp": frame_meta.ntp_timestamp,
"source_id": frame_meta.source_id,
"object_count": user_meta_data.objCnt,
"Number_of_objects": num_rects,
"object_in_ROI_count": user_meta_data.objInROIcnt,
"cumulative_line_cross_count": user_meta_data.objLCCumCnt,
"current_line_cross_count": user_meta_data.objLCCurrCnt,
"overcrowding_status": user_meta_data.ocStatus,
"unique_id": user_meta_data.unique_id,
"PTS_TIME": syd_time_pts_,
"NTP_TS": syd_time_ntp_sql,
}
# p = os.path.join(base_json_path, f"{syd_time_ntp_file}_{stream_id}.json")
# with open(p, "w") as f:
# json.dump(data, f)
# print(f"JSON saved: {p}")
print(data)
# send_message_to_iothub(json.dumps(data),stream_id,syd_time_pts_)
except Exception as e:
print(f"Error processing user metadata: {e}")
try:
l_user = l_user.next
except StopIteration:
break
except Exception as e:
print(f"Error processing frame metadata: {e}")
try:
l_frame = l_frame.next
except StopIteration:
break
except Exception as e:
print(f"Error accessing batch metadata: {e}")
return Gst.PadProbeReturn.OK
return Gst.PadProbeReturn.OK
def dummy_prob(pad, info, user_data):
print(f"================================================================= inside the {user_data} prob =================================================================")
return Gst.PadProbeReturn.OK
def streammux_prob(pad, info, user_data):
global streammux_first_frame_processed
global start_time_streammux
global start_time_streammux_
stream_id = user_data.get('stream_id')
no_of_sec_to_predict = user_data.get('no_of_sec_to_predict')
if not start_time_streammux_first_frame[stream_id]:
start_time_streammux[stream_id] = get_current_epoch_in_syd()
start_time_streammux_[stream_id] = get_current_epoch_in_syd()
start_time_streammux_first_frame[stream_id] = True
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer")
return Gst.PadProbeReturn.OK
try:
pts = start_time_streammux[stream_id] + gst_buffer.pts/1000000000.0
# print(f"pts time === {gst_buffer.pts/1000000000.0}")
# print(f"start time{start_time_streammux_[stream_id]=}")
if (pts - start_time_streammux_[stream_id] ) < no_of_sec_to_predict:
return Gst.PadProbeReturn.DROP
start_time_streammux_[stream_id] = pts
except Exception as e:
print(f"Error accessing batch metadata: {e}")
return Gst.PadProbeReturn.OK
print(f"=================================== Gst.PadProbeReturn.OK ===================================")
return Gst.PadProbeReturn.OK
def main():
global perf_data, pipeline, src_ids, streammux
q_list = []
perf_data = PERF_DATA(number_sources,q_list)
Gst.init(None)
pipeline = Gst.Pipeline.new(f"rtsp-pipeline-{uuid.uuid1()}")
GLib.timeout_add(10000, perf_data.perf_print_callback)
src_ids = {f'stream{i}': {'id': None, 'src': None, 'streamid': i} for i in range(number_sources)}
cam_ids = {stream_id:extract_rtsp_details(uri_name) for stream_id,uri_name in enumerate(streams)}
streammux = Gst.ElementFactory.make("nvstreammux", "streammuxer")
if not streammux:
print("Error: Could not create streammuxer!")
return
streammux.set_property('batch-size', number_sources)
streammux.set_property('batched-push-timeout', streammux_muxer_batch_timeout_usec)
streammux.set_property("sync-inputs",sync_inputs)
streammux.set_property("attach-sys-ts",attach_sys_ts)
streammux.set_property("max-latency",max_latency)
streammux.set_property("frame-duration",frame_duration)
pipeline.add(streammux)
for src_id, src in enumerate(streams):
last_id = str(uuid.uuid1())
src_ids[f'stream{src_id}']['id'] = last_id
src_ids[f'stream{src_id}']['src'] = src
uridecodebin = Gst.ElementFactory.make("uridecodebin", f"uridecodebin_{last_id}")
uridecodebin.set_property("uri", src)
print(f"uridecodebin_{last_id} created for {src}")
valve = Gst.ElementFactory.make("valve", f"valve_{src_id}")
uridecodebin.connect("pad-added", on_pad_added,valve)
user_data={"stream_id":src_id}
pipeline.add(valve)
queue = Gst.ElementFactory.make("queue", f"queue_{src_id}")
pipeline.add(queue)
q_list.append(queue)
sink_pad = valve.get_static_pad("sink")
print(f"================= valve created ============== ")
sink_pad.add_probe(Gst.PadProbeType.BUFFER, valve_prob, user_data)
print(f"================= valve perob added ============== ")
pipeline.add(uridecodebin)
padname=f"sink_{src_id}"
valve_src_pad = valve.get_static_pad("src")
if not valve_src_pad:
print(f"Error: Could not get src pad for valve_{src_id}")
return
streammux_sinkpad= streammux.request_pad_simple(padname)
if not streammux_sinkpad:
print(f"Error: Could not get sink pad for streammux_{src_id}")
return
print(f"================================> {no_of_sec_to_send_data=}")
user_meta = {"stream_id":src_id,'no_of_sec_to_predict':no_of_sec_to_send_data}
streammux_sinkpad.add_probe(Gst.PadProbeType.BUFFER, streammux_prob, user_meta)
queue_sink_pad = queue.get_static_pad("sink")
queue_src_pad = queue.get_static_pad("src")
valve_src_pad.link(queue_sink_pad)
queue_src_pad.link(streammux_sinkpad)
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
pgie.set_property('config-file-path', infer_config_path)
pgie_batch_size=pgie.get_property("batch-size")
# pgie_sink = pgie.get_static_pad("sink")
# nvanalytics_sink = nvanalytics.get_static_pad("sink")
if(pgie_batch_size != number_sources):
print("WARNING: Overriding infer-config batch-size",pgie_batch_size," with number of sources ", number_sources," \n")
pgie.set_property("batch-size",number_sources)
tracker = Gst.ElementFactory.make("nvtracker", "tracker")
if not tracker:
sys.stderr.write(" Unable to create tracker \n")
config = configparser.ConfigParser()
config.read(tracker_config_path)
config.sections()
for key in config['tracker']:
if key == 'tracker-width' :
tracker_width = config.getint('tracker', key)
tracker.set_property('tracker-width', tracker_width)
if key == 'tracker-height' :
tracker_height = config.getint('tracker', key)
tracker.set_property('tracker-height', tracker_height)
if key == 'gpu-id' :
tracker_gpu_id = config.getint('tracker', key)
tracker.set_property('gpu_id', tracker_gpu_id)
if key == 'll-lib-file' :
tracker_ll_lib_file = config.get('tracker', key)
tracker.set_property('ll-lib-file', tracker_ll_lib_file)
if key == 'll-config-file' :
tracker_ll_config_file = config.get('tracker', key)
tracker.set_property('ll-config-file', tracker_ll_config_file)
nvanalytics = Gst.ElementFactory.make("nvdsanalytics", "analytics")
if not nvanalytics:
sys.stderr.write(" Unable to create nvanalytics \n")
nvanalytics.set_property("config-file", analytics_config_path)
nvstreamdemux = Gst.ElementFactory.make("nvstreamdemux", "nvstreamdemux")
if not nvstreamdemux:
sys.stderr.write(" Unable to create nvstreamdemux \n")
queue1 = Gst.ElementFactory.make("queue", "queue1")
queue2 = Gst.ElementFactory.make("queue", "queue2")
queue3 = Gst.ElementFactory.make("queue", "queue3")
for q in [queue1,queue2,queue3]:
q.set_property("max-size-buffers", max_size_buffers)
q.set_property("leaky", True)
pipeline.add(q)
q_list.append(q)
# queue1_sink = queue1.get_static_pad("sink")
# queue1_sink.add_probe(Gst.PadProbeType.BUFFER, queue1_prob, 0)
# pgie_sink.add_probe(Gst.PadProbeType.BUFFER, pgie_prob, 0)
# nvanalytics_sink.add_probe(Gst.PadProbeType.BUFFER, nvanalytics_prob, 0)
pipeline.add(pgie)
pipeline.add(nvanalytics)
pipeline.add(nvstreamdemux)
streammux.link(queue1)
queue1.link(pgie)
pgie.link(queue2)
queue2.link(nvanalytics)
nvanalytics.link(queue3)
queue3.link(nvstreamdemux)
for i in range(number_sources):
queue4_ = Gst.ElementFactory.make("queue", f"queue__{i}")
pipeline.add(queue4_)
q_list.append(queue4_)
queue4_.set_property("max-size-buffers", max_size_buffers)
queue4_.set_property("leaky", 2)
nvvideoconvert = Gst.ElementFactory.make("nvvideoconvert", f"nvvideoconvert_{i}")
nvvideoconvertsink = nvvideoconvert.get_static_pad("sink")
# nvvideoconvert = make_element("capsfilter", i)
pipeline.add(nvvideoconvert)
nvdsosd = Gst.ElementFactory.make("nvdsosd", f"nvdsosd_{i}")
pipeline.add(nvdsosd)
nvdsosd.set_property("process-mode", osd_process_mode)
nvdsosd.set_property("display-text", osd_display_text)
padname = "src_%u" % i
demuxsrcpad = nvstreamdemux.request_pad_simple(padname)
if not demuxsrcpad:
sys.stderr.write("Unable to create demux src pad \n")
queuesinkpad = queue4_.get_static_pad("sink")
if not queuesinkpad:
sys.stderr.write("Unable to create queue sink pad \n")
demuxsrcpad.link(queuesinkpad)
demuxsrcpad.add_probe(Gst.PadProbeType.BUFFER, dummy_prob, f"demuxsrcpad")
queue5 = Gst.ElementFactory.make("queue", f"queue5_{i}")
pipeline.add(queue5)
q_list.append(queue5)
queue5.set_property("max-size-buffers", max_size_buffers)
queue5.set_property("leaky", 2)
queue5_src = queue5.get_static_pad("src")
queue5_src.add_probe(Gst.PadProbeType.BUFFER, dummy_prob, "queue5 src")
nvvidconv2 = Gst.ElementFactory.make("nvvideoconvert", f"nvvideoconvert1_{i}")
# nvvidconv2 = make_element("capsfilter", f"capsfilter_{i}")
pipeline.add(nvvidconv2)
jpegenc = Gst.ElementFactory.make("nvjpegenc", f"jpeg-encoder_{i}")
jpegenc.set_property("quality", im_quality)
if not jpegenc:
sys.stderr.write("Unable to create nvjpegenc \n")
pipeline.add(jpegenc)
multifilesink = Gst.ElementFactory.make("multifilesink", f"multi-file-sink_{i}")
if not multifilesink:
sys.stderr.write(" Unable to create multifilesink \n")
multifilesink.set_property("post-messages", True)
sinkpad = multifilesink.get_static_pad("sink")
probe_data = {
"cam_id": cam_ids[i],
"base_img_path":img_dir_name,
"no_of_sec_send_msg":no_of_sec_to_send_data
}
pipeline.add(multifilesink)
queue4_.link(nvvideoconvert)
nvvideoconvert.link(nvdsosd)
nvdsosd.link(queue5)
queue5.link(nvvidconv2)
# queue5.link(nvvidconv2)
nvvidconv2.link(jpegenc)
jpegenc.link(multifilesink)
# queue5_sinkpad = queue5.get_static_pad("sink")
# queue5_srcpad = queue5.get_static_pad("src")
# queue5_sinkpad.add_probe(Gst.PadProbeType.BUFFER, queue5_prob, 0)
# queue5_srcpad.add_probe(Gst.PadProbeType.BUFFER, queue5_src_prob, 0)
jpegenc_sinkpad = jpegenc.get_static_pad("sink")
jpegenc_sinkpad.add_probe(Gst.PadProbeType.BUFFER, dummy_prob, "jpegenc_sinkpad")
nvvidconv2_sinkpad = nvvidconv2.get_static_pad("sink")
nvvidconv2_sinkpad.add_probe(Gst.PadProbeType.BUFFER, dummy_prob, "nvvidconv2_sinkpad")
sinkpad = multifilesink.get_static_pad("sink")
if not sinkpad:
sys.stderr.write("Unable to get sink pad of multifilesink \n")
sys.exit(1)
else:
sinkpad.add_probe(Gst.PadProbeType.BUFFER, frame_filter_pad_probe, probe_data)
print("Pipeline setup complete")
# Get the bus
bus = pipeline.get_bus()
bus.add_signal_watch()
loop = GLib.MainLoop()
bus.connect("message", bus_call, loop)
# Start the pipeline
pipeline.set_state(Gst.State.PLAYING)
print("Pipeline is playing...")
try:
loop.run()
except KeyboardInterrupt:
print("Pipeline interrupted by user.")
finally:
# Stop the pipeline
pipeline.set_state(Gst.State.NULL)
print("Pipeline stopped.")
if __name__ == "__main__":
main()