• Hardware Platform (Jetson / GPU)
• Hardware Platform - GPU
• DeepStream Version - 7.0
• TensorRT Version - 10.6.0
• NVIDIA GPU Driver Version - 560.35.03
• Issue Type - questions
• Question -
Hi, I’m currently working on a video analytics project, and I’ve encountered an issue in my application. I’m using uridecodebin
as the source, but after some time, the RTSP connection is lost, causing the application to stop. To address this, I switched to nvurisrcbin
, which handles reconnection well, but I’m facing a pixelation issue.
I’m now attempting to implement my own reconnection logic, which works fine. However, for example, if my application has been running for 5 days and the RTSP connection is lost, when I attempt to reconnect (by deleting and adding the new bin), I get the following error:
[2025-04-01 15:07:33] INFO: Trying to create a new RTSP source for stream stream_id=0…
[2025-04-01 15:07:33] INFO:Flushing streammux pad for stream 0…
[2025-04-01 15:07:33] INFO: Successfully replaced RTSP source for stream 0
[2025-04-01 15:07:35] INFO: pipeline alive data : {‘stream0’: 0.0}
[2025-04-01 15:07:35] ERROR: gst-resource-error-quark: Could not write to resource. (10)
[2025-04-01 15:07:35] ERROR: gst-resource-error-quark: Could not write to resource. (10)
[2025-04-01 15:07:39] ERROR: gst-core-error-quark: Your GStreamer installation is missing a plug-in. (12)
[2025-04-01 15:07:39] ERROR: gst-stream-error-quark: Internal data stream error. (1)
[2025-04-01 15:09:13] INFO: reconnect att exist in stream_id=0 6
import configparser
import sys
import gi
import time
import uuid
import logging
import os
import pytz
from urllib.parse import urlparse
from datetime import datetime
from gi.repository import Gst, GLib
from utils_prod import ntp_to_sydney_time,extract_rtsp_details,convert_pts_to_sydtimestamp,create_dynamic_path,get_current_epoch_in_syd,convert_syd_epoch_to_datetime_str,get_current_datetime_str, send_msg
from threading import Lock
gi.require_version('Gst', '1.0')
import pyds
import json
start_time=time.time()
pipeline = None
fps_mutex = Lock()
syd_tz = pytz.timezone('Australia/Sydney')
ntp_epoch = datetime(1970, 1, 1, tzinfo=pytz.UTC)
try:
yaml_con = json.loads(os.environ["main_config"])
streams = yaml_con["streams"]
log_path = yaml_con["log_path"]
no_of_sec_to_send_data = int(yaml_con["no_of_sec_send_msg"])
img_dir_name = yaml_con["img_dir_name"]
osd_process_mode = int(yaml_con["osd_process_mode"])
osd_display_text = int(yaml_con["osd_display_text"])
sync_inputs = int(yaml_con["streammux_sync_inputs"])
attach_sys_ts = int(yaml_con["streammux_attach_sys_ts"])
max_latency = int(yaml_con["streammux_max_latency"])
frame_duration = int(yaml_con["streammux_frame_duration"])
is_live = int(yaml_con["is_live"])
frame_width = int(yaml_con["frame_width"])
frame_height = int(yaml_con["frame_height"])
drop_eos = int(yaml_con["drop_eos"])
streammux_muxer_batch_timeout_usec = int(yaml_con["streammux_muxer_batch_timeout_usec"])
analytics_config_path = yaml_con["analytics_config_path"]
infer_config_path = yaml_con["infer_config_path"]
redis_container_name = yaml_con["redis_container_name"]
redis_port = int(yaml_con["redis_port"])
redis_key = yaml_con["redis_key"]
redis_retry = 5
PGIE_CLASS_ID_FACE = 2
max_size_buffers = int(yaml_con["queue_max_size_buffers"])
im_quality = int(yaml_con["jpeg_encoder_quality"])
reconnect_interval = int(yaml_con["reconnect_interval"])
is_add_face_blur = int(yaml_con.get("is_add_face_blur",1))
face_blur_opacity = float(yaml_con.get("face_blur_opacity",0.8))
application_quit_ther = float(yaml_con.get("application_quit_ther",0.5))
reconnect_att = float(yaml_con.get("reconnect_att",5))
blob_base_img_path = yaml_con["blob_base_img_path"]
class_dict = {0:"person",1:"bag",2:"face"}
except Exception as e:
print(f"Error reading config file: {e}")
sys.exit(1)
if not all([ os.path.exists(p) for p in [analytics_config_path,infer_config_path]]):
raise FileNotFoundError(f"One or more of the required analytics, inference, or file not found: {analytics_config_path}, {infer_config_path}")
os.makedirs(img_dir_name,exist_ok=True)
class SydneyFormatter(logging.Formatter):
"""Custom logging formatter to use Sydney timezone."""
def formatTime(self, record, datefmt=None):
sydney_time = datetime.fromtimestamp(record.created, syd_tz)
return sydney_time.strftime("%Y-%m-%d %H:%M:%S")
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(levelname)s: %(message)s",
handlers=[logging.FileHandler(log_path), logging.StreamHandler()]
)
for handler in logging.getLogger().handlers:
handler.setFormatter(SydneyFormatter("[%(asctime)s] %(levelname)s: %(message)s"))
logger = logging.getLogger(__name__)
number_sources = len(streams)
start_time_streammux = { i:None for i in range(number_sources)}
start_time_streammux_ = { i:None for i in range(number_sources)}
start_time_streammux_first_frame = { i:False for i in range(number_sources)}
start_time_ids = { i:get_current_epoch_in_syd() for i in range(number_sources)}
src_ids = {}
lag_dict = {i:{ "data":{"cumulative_line_cross_count":0} ,"ds":0,"is_first":True,"data_num":0} for i in range(number_sources)}
streammux = None
is_first = True
def on_pad_added(src, pad, sink_element):
# print(f"New pad {pad.get_name()} added from {src.get_name()}")
sink_pad = sink_element.get_static_pad("sink")
if not sink_pad:
msg = (f"Error: Could not get sink pad for {sink_element.get_name()}")
logger.error(msg)
return
if sink_pad.is_linked():
msg = (f"Warning: Sink pad of {sink_element.get_name()} is already linked")
logger.warning(msg)
return
if pad.query_caps(None).is_any():
msg = (f"Warning: Pad {pad.get_name()} does not have valid caps yet!")
logger.warning(msg)
return
link_status = pad.link(sink_pad)
if link_status != Gst.PadLinkReturn.OK:
msg = f"Pad link failed: {link_status}"
logger.error(msg)
else:
msg = (f"Successfully linked {pad.get_name()} {sink_element.get_name()}")
logger.info(msg)
class GETFPS:
def __init__(self,stream_id):
global start_time
self.start_time=start_time
self.is_first=True
self.frame_count=0
self.stream_id=stream_id
def update_fps(self):
end_time = time.time()
if self.is_first:
self.start_time = end_time
self.is_first = False
else:
global fps_mutex
with fps_mutex:
self.frame_count = self.frame_count + 1
def get_fps(self):
end_time = time.time()
with fps_mutex:
stream_fps = float(self.frame_count/(end_time - self.start_time))
self.frame_count = 0
self.start_time = end_time
return round(stream_fps, 2)
def print_data(self):
print('frame_count=',self.frame_count)
print('start_time=',self.start_time)
def get_linked_element(pad):
if pad and pad.is_linked():
peer_pad = pad.get_peer()
if peer_pad:
peer_element = peer_pad.get_parent_element()
return peer_element.get_name() if peer_element else None
return None
class PERF_DATA:
def __init__(self, active_streams,main_loop):
self.perf_dict = {}
self.all_stream_fps = {}
self.fps_mutex = Lock() # Lock for thread safety
self.main_loop = main_loop
for i in active_streams:
self.all_stream_fps[f"stream{i}"] = GETFPS(i)
def update_fps(self, stream_index):
""" Update FPS for the given stream index """
with self.fps_mutex:
if stream_index in self.all_stream_fps:
self.all_stream_fps[stream_index].update_fps()
else:
print(f"Warning: Stream {stream_index} not registered. Ignoring update.")
def perf_print_callback(self):
with self.fps_mutex:
self.perf_dict = {stream_index: stream.get_fps() for stream_index, stream in self.all_stream_fps.items()}
print("\n**PERF Data : ", self.perf_dict, "\n")
deactive_streams = 0
for k, v in self.perf_dict.items():
src_id = src_ids[k]
last_id = src_id['id']
stream_id = src_id['streamid']
rtsp_url = src_id['src']
if not v:
if src_ids[k]["no_of_att"] > reconnect_att:
logger.info(f"reconnect att exist in {stream_id=} {src_ids[k]['no_of_att']}")
deactive_streams+=1
continue
valve_element_name = f"valve_{stream_id}"
valve = pipeline.get_by_name(valve_element_name)
# sink_valve = valve.get_static_pad("sink")
# new_id = last_id
new_id = str(uuid.uuid1())
del_add_rtspsrc(last_id,new_id,stream_id,rtsp_url)
src_ids[k]['id'] = new_id
valve.set_property("drop", False)
time.sleep(2)
src_ids[k]["no_of_att"]+=1
if (deactive_streams/number_sources) > application_quit_ther:
logger.info(f"XXXXXXXXXXXXXXXXXXXXXXXXXX no of deactivate streams {deactive_streams} and no of streams {number_sources} ther {(deactive_streams/number_sources)} > {application_quit_ther} so quit the application XXXXXXXXXXXXXXXXXXXXXXXXXX")
self.main_loop.quit()
logger.info(f"pipeline alive data : {self.perf_dict}")
return True
def wait_for_null_state(element, timeout=3):
# print(f"Waiting for {element.get_name()} to reach NULL state...")
element.set_state(Gst.State.NULL)
success, state, _ = element.get_state(timeout * Gst.SECOND)
if success == Gst.StateChangeReturn.SUCCESS and state == Gst.State.NULL:
# msg = (f"{element.get_name()} successfully reached NULL state.")
return True
else:
msg = (f"Warning: {element.get_name()} did not reach NULL state within {timeout} seconds.")
logger.warning(msg)
return False
def flush_ele(stream_id):
streammux = pipeline.get_by_name("streammuxer")
if not streammux :
msg = (f"[ERROR] Streammux or Demux not found in pipeline")
logger.error(msg)
return
streammux_sink_pad = streammux.get_static_pad(f"sink_{stream_id}")
if streammux_sink_pad:
msg = (f"🛑 Flushing streammux pad for stream {stream_id}...")
logger.info(msg)
streammux_sink_pad.send_event(Gst.Event.new_flush_start())
time.sleep(0.1)
streammux_sink_pad.send_event(Gst.Event.new_flush_stop(True))
return
def del_add_rtspsrc(old_id, new_id, stream_id, rtsp_url):
msg = f"Trying to create a new RTSP source for stream {stream_id=}"
logger.info(msg)
bin_name = f"source-bin-{old_id}"
old_uridecodebin = pipeline.get_by_name(bin_name)
if not old_uridecodebin:
msg = f"[ERROR] No existing uridecodebin found for stream {stream_id}."
logger.error(msg)
return
valve = pipeline.get_by_name(f"valve_{stream_id}")
valve.set_property("drop", True)
valve_sink = valve.get_static_pad("sink")
src_pad = old_uridecodebin.get_static_pad("src")
if src_pad and src_pad.is_linked():
src_pad.unlink(valve_sink)
old_uridecodebin.send_event(Gst.Event.new_eos())
wait_for_null_state(old_uridecodebin)
flush_ele(stream_id)
pipeline.remove(old_uridecodebin)
new_uridecodebin = create_source_bin(new_id, stream_id,rtsp_url)
if not new_uridecodebin:
msg = f"[ERROR] Failed to create new uridecodebin for stream {stream_id}."
logger.error(msg)
return
pipeline.add(new_uridecodebin)
new_uridecodebin_src_pad=new_uridecodebin.get_static_pad("src")
if new_uridecodebin_src_pad.link(valve_sink) != Gst.PadLinkReturn.OK:
msg = f"Error linking {new_uridecodebin.get_name()} to {valve.get_name()}"
logger.error(msg)
return
new_uridecodebin.set_state(Gst.State.READY)
time.sleep(0.1)
new_uridecodebin.sync_state_with_parent()
new_uridecodebin.set_state(Gst.State.PLAYING)
msg = f"Successfully replaced RTSP source for stream {stream_id}"
logger.info(msg)
def cb_newpad(decodebin, decoder_src_pad,data):
print("In cb_newpad\n")
caps=decoder_src_pad.get_current_caps()
gststruct=caps.get_structure(0)
gstname=gststruct.get_name()
source_bin=data
features=caps.get_features(0)
print("gstname=",gstname)
if(gstname.find("video")!=-1):
print("features=",features)
if features.contains("memory:NVMM"):
# Get the source bin ghost pad
bin_ghost_pad=source_bin.get_static_pad("src")
if not bin_ghost_pad.set_target(decoder_src_pad):
msg = "Failed to link decoder src pad to source bin ghost pad"
logger.error(msg)
sys.stderr.write(msg)
else:
msg = " Error: Decodebin did not pick nvidia decoder plugin"
logger.error(msg)
sys.stderr.write(msg)
def decodebin_child_added(child_proxy,Object,name,user_data):
msg = ("Decodebin child added:", name, "\n")
logger.info(msg)
if(name.find("decodebin") != -1):
Object.connect("child-added",decodebin_child_added,user_data)
def on_pad_added(decodebin, pad, data):
# print(f"Pad added: {pad.get_name()} (RTSP connection successful)")
data["is_ready"] = True
def test_rtsp_connection(uri, stream_id, timeout=10):
pipeline = Gst.Pipeline.new("test-pipeline")
uridecodebin = Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
if not uridecodebin:
msg = f"Failed to create uridecodebin {stream_id=}"
logger.error(msg)
return False
uridecodebin.set_property("uri", uri)
d = {"is_ready": False}
uridecodebin.connect("pad-added", on_pad_added,d )
fakesink = Gst.ElementFactory.make("fakesink","fakesink")
pipeline.add(uridecodebin)
pipeline.add(fakesink)
uridecodebin.link(fakesink)
pipeline.set_state(Gst.State.PLAYING)
start_time = time.time()
while time.time() - start_time < timeout:
success, state, _ = pipeline.get_state(Gst.CLOCK_TIME_NONE)
if d["is_ready"]:
msg = f"RTSP stream is active {stream_id=}"
logger.info(msg)
pipeline.set_state(Gst.State.NULL)
return True,uridecodebin
time.sleep(0.5)
print(f"wating for frame .... {time.time() - start_time}")
msg = f"RTSP connection failed (No `pad-added` event) {stream_id=}"
logger.error(msg)
pipeline.set_state(Gst.State.NULL)
return False,None
def create_source_bin(index ,stream_id, uri):
uri_decode_bin=Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
if is_first:
is_suc,uri_decode_bin = test_rtsp_connection(uri,stream_id)
if not is_suc:
msg = (f"Error creating source {stream_id=}")
logger.error(msg)
return False
bin_name=f"source-bin-{index}"
nbin=Gst.Bin.new(bin_name)
if not nbin:
msg = f" Unable to create source bin {bin_name=}"
logger.error(msg)
sys.stderr.write(msg)
if not uri_decode_bin:
msg = " Unable to create uri decode bin "
logger.error(msg)
sys.stderr.write(msg)
uri_decode_bin.set_property("uri",uri)
uri_decode_bin.connect("pad-added",cb_newpad,nbin)
uri_decode_bin.connect("child-added",decodebin_child_added,nbin)
Gst.Bin.add(nbin,uri_decode_bin)
bin_pad=nbin.add_pad(Gst.GhostPad.new_no_target("src",Gst.PadDirection.SRC))
if not bin_pad:
msg = f" Failed to add ghost pad in source bin "
logger.error(msg)
sys.stderr.write(msg)
return None
return nbin
def pgie_src_pad_prob(pad, info, user_data):
gst_buffer = info.get_buffer()
if not gst_buffer:
msg = f"Unable to get GstBuffer pgie src pad prob"
logger.error(msg)
return Gst.PadProbeReturn.DROP
try:
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
l_obj=frame_meta.obj_meta_list
while l_obj:
try:
obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)
except StopIteration:
break
if is_add_face_blur:
if (obj_meta.class_id == PGIE_CLASS_ID_FACE):
obj_meta.rect_params.border_width = 1
obj_meta.rect_params.has_bg_color = 1
obj_meta.rect_params.bg_color.red = 1.0
obj_meta.rect_params.bg_color.green = 0.8
obj_meta.rect_params.bg_color.blue = 0.6
obj_meta.rect_params.bg_color.alpha = face_blur_opacity
obj_meta.tracker_confidence = 1.0
obj_meta.object_id = 999999
try:
l_obj=l_obj.next
except StopIteration:
break
except Exception as e:
msg = (f"Error processing frame pgie src pad prob : {e}")
logger.error(msg)
try:
l_frame = l_frame.next
except StopIteration:
break
except Exception as e:
msg = (f"Error processing buffer pgie src pad prob : {e}")
logger.error(msg)
return Gst.PadProbeReturn.DROP
return Gst.PadProbeReturn.OK
def demux_src_pad_buffer_probe(pad, info, user_data):
gst_buffer = info.get_buffer()
if not gst_buffer:
msg = "Unable to get GstBuffer demux src pad prob"
logger.error(msg)
return Gst.PadProbeReturn.DROP
try:
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
stream_id = frame_meta.pad_index
pts = frame_meta.buf_pts
syd_time_pts_ = convert_pts_to_sydtimestamp(pts, start_time_streammux[stream_id])
display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
display_meta.num_labels = 1
text_params = display_meta.text_params[0]
text_params.display_text = f"buf ts : {syd_time_pts_}"
text_params.x_offset = 50
text_params.y_offset = 50
text_params.font_params.font_name = "Serif"
text_params.font_params.font_size = 12
text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
text_params.set_bg_clr = 1
text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
l_user = frame_meta.frame_user_meta_list
while l_user:
try:
user_meta = pyds.NvDsUserMeta.cast(l_user.data)
if user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type(
"NVIDIA.DSANALYTICSFRAME.USER_META"
):
user_meta_data = pyds.NvDsAnalyticsFrameMeta.cast(user_meta.user_meta_data)
obj_in_roi = user_meta_data.objInROIcnt
display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
no_of_labels = len(obj_in_roi)
display_meta.num_labels = no_of_labels
start_x,start_y = 50,80
for i,(label_id,count) in enumerate(obj_in_roi.items()):
text_params = display_meta.text_params[i]
text_params.display_text = f"{label_id} : {count}"
text_params.x_offset = start_x
text_params.y_offset = start_y
text_params.font_params.font_name = "Serif"
text_params.font_params.font_size = 12
text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
text_params.set_bg_clr = 1
text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
start_y+=30
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
except Exception as e:
msg = (f"Error processing demux src pad prob user metadata: {e}")
logger.error(msg)
try:
l_user = l_user.next
except StopIteration:
break
except Exception as e:
msg = f"Error processing demux src pad prob frame: {e}"
logger.error(msg)
try:
l_frame = l_frame.next
except StopIteration:
break
except Exception as e:
msg = f"Error processing demux src pad prob buffer: {e}"
logger.error(msg)
# return Gst.PadProbeReturn.DROP
return Gst.PadProbeReturn.OK
def bus_call(bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
msg = (f"======================= End-of-stream =======================")
logger.error(msg)
# loop.quit()
elif t==Gst.MessageType.WARNING:
err, debug = message.parse_warning()
logger.debug(f"{err}")
print("Warning: %s: %s" % (err, debug))
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
logger.error(f"{err}")
# print("Error: %s: %s" % (err, debug))
return True
def valve_prob(pad, info, user_data):
stream_id = user_data["stream_id"]
gst_buffer = info.get_buffer()
if not gst_buffer:
msg = f"Unable to get GstBuffer valve prob"
logger.error(msg)
return Gst.PadProbeReturn.OK
try:
stream_index = "stream{0}".format(stream_id)
perf_data.update_fps(stream_index)
except Exception as e:
msg = f"Error accessing batch valve prob metadata: {e} "
logger.error(msg)
return Gst.PadProbeReturn.OK
return Gst.PadProbeReturn.OK
def make_element(element_name, i):
element = Gst.ElementFactory.make(element_name, element_name)
if not element:
sys.stderr.write(" Unable to create {0}".format(element_name))
element.set_property("name", "{0}-{1}".format(element_name, str(i)))
return element
def format_location_callback(splitmuxsink, fragment_id, video_dir_path, stream_id):
# Get the current date and time
current_datetime = convert_syd_epoch_to_datetime_str(get_current_epoch_in_syd(),"date_time")
# Construct the filename with the current datetime
return os.path.join(video_dir_path, f"stream_{stream_id}_rtsp_out_{current_datetime}_uri.mp4")
def frame_filter_pad_probe(pad, info, user_data):
global lag_dict
base_img_path = user_data["base_img_path"]
cam_id_ip_port = user_data["cam_id"]
blob_base_img_path = user_data["blob_base_img_path"]
gst_buffer = info.get_buffer()
if not gst_buffer:
msg = "Unable to get GstBuffer framefilter prob "
logger.error(msg)
return Gst.PadProbeReturn.OK
try:
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
stream_id = frame_meta.pad_index
pts = frame_meta.buf_pts
curr_time = start_time_streammux_.get(stream_id, 0)
syd_time_pts_hhmmss = convert_syd_epoch_to_datetime_str(curr_time, "hhmmss")
current_yymmdd = get_current_datetime_str("yymmdd")
current_hhmmss = get_current_datetime_str("hhmmss")
frame_number = frame_meta.frame_num
ntp_sec = frame_meta.ntp_timestamp / 1e9
syd_time_ntp_hhmmss = ntp_to_sydney_time(ntp_sec,"hhmmss")
syd_time_ntp_sql = ntp_to_sydney_time(ntp_sec)
image_folder_path,blob_path = create_dynamic_path(curr_time, base_img_path, cam_id_ip_port)
image_name = f"CAM-{cam_id_ip_port[:7]}_DATE-{current_yymmdd}_PTS-{syd_time_pts_hhmmss}_NTP-{syd_time_ntp_hhmmss}_SYS-{current_hhmmss}.jpg"
image_save_path = os.path.join(image_folder_path, image_name)
# print(f"Saving image: {image_save_path}, stream_id: {stream_id}")
multifilesink = pad.get_parent_element()
multifilesink.set_property("location", image_save_path)
l_user = frame_meta.frame_user_meta_list
while l_user:
try:
user_meta = pyds.NvDsUserMeta.cast(l_user.data)
if user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type(
"NVIDIA.DSANALYTICSFRAME.USER_META"
):
user_meta_data = pyds.NvDsAnalyticsFrameMeta.cast(user_meta.user_meta_data)
objInROIcnt = user_meta_data.objInROIcnt
objLCCumCnt = user_meta_data.objLCCumCnt
objLCCurrCnt = user_meta_data.objLCCurrCnt
num_rects = frame_meta.num_obj_meta
if lag_dict[stream_id]["is_first"]:
lag_dict[stream_id]["data"] = {"cumulative_line_cross_count":objLCCumCnt}
lag_dict[stream_id]["is_first"] = False
syd_time_pts_ = convert_syd_epoch_to_datetime_str(curr_time, "datetime_sql")
data = {
"stream_id": stream_id,
"camera_name": cam_id_ip_port,
"unique_number_uuid": 10,
"batch_id": frame_meta.batch_id,
"frame_num": frame_number,
"buf_pts": pts,
"ntp_timestamp": frame_meta.ntp_timestamp,
"source_id": frame_meta.source_id,
"object_count": {class_dict.get(int(cls_id),cls_id):count for cls_id,count in user_meta_data.objCnt.items()},
"Number_of_objects": num_rects,
"object_in_ROI_count": objInROIcnt,
"cumulative_line_cross_count": user_meta_data.objLCCumCnt,
"current_line_cross_count": objLCCurrCnt,
"overcrowding_status": user_meta_data.ocStatus,
"unique_id": user_meta_data.unique_id,
"PTS_TIME": syd_time_pts_,
"NTP_TS": syd_time_ntp_sql,
"blob_image_path":os.path.join(blob_base_img_path,blob_path,image_name),
"reconnect_id": src_ids[f'stream{stream_id}']["id"],
"data_num":lag_dict[stream_id]["data_num"]
}
diff_lag = {k: {k_:(v_-v[k_]) for k_,v_ in data[k].items()} for k, v in lag_dict[stream_id]["data"].items()}["cumulative_line_cross_count"]
data["lag_diff_cumulative_line_cross_count"] = diff_lag
lag_dict[stream_id]["data"]["cumulative_line_cross_count"] = data["cumulative_line_cross_count"]
lag_dict[stream_id]["ds"] = curr_time
lag_dict[stream_id]["data_num"]+=1
send_msg(data,redis_key)
except Exception as e:
msg = f"Error processing framefilter prob user metadata: {e}"
logger.error(msg)
try:
l_user = l_user.next
except StopIteration:
break
except Exception as e:
msg = f"Error processing framefilter prob frame metadata: {e}"
logger.error(msg)
try:
l_frame = l_frame.next
except StopIteration:
break
except Exception as e:
msg = f"Error accessing framefilter prob batch metadata: {e}"
logger.error(msg)
# return Gst.PadProbeReturn.OK
return Gst.PadProbeReturn.OK
def dummy_prob(pad, info, user_data):
print(f" inside the {user_data} prob ")
return Gst.PadProbeReturn.OK
def streammux_prob(pad, info, user_data):
global streammux_first_frame_processed
global start_time_streammux
global start_time_streammux_
stream_id = user_data.get('stream_id')
no_of_sec_to_predict = user_data.get('no_of_sec_to_predict')
if not start_time_streammux_first_frame[stream_id]:
start_time_streammux[stream_id] = get_current_epoch_in_syd()
start_time_streammux_[stream_id] = get_current_epoch_in_syd()
start_time_streammux_first_frame[stream_id] = True
gst_buffer = info.get_buffer()
if not gst_buffer:
msg = f"Unable to get GstBuffer streammux "
logger.error(msg)
return Gst.PadProbeReturn.OK
try:
pts = start_time_streammux[stream_id] + gst_buffer.pts/1000000000.0
if (pts - start_time_streammux_[stream_id] ) < no_of_sec_to_predict:
return Gst.PadProbeReturn.DROP
start_time_streammux_[stream_id] = pts
except Exception as e:
msg = f"Error accessing streammux prob batch metadata: {e}"
logger.error(msg)
return Gst.PadProbeReturn.OK
msg = f"frame flow from streammux prob to downstream elements"
logger.info(msg)
return Gst.PadProbeReturn.OK
def main():
global perf_data, pipeline, src_ids, streammux, number_sources,is_first
Gst.init(None)
pipeline_name = f"rtsp-pipeline-{uuid.uuid1()}"
msg = f"pipeline create start {pipeline_name=} ..."
logger.info(msg)
pipeline = Gst.Pipeline.new(pipeline_name)
src_ids = {f'stream{i}': {'id': None, 'src': None, 'streamid': i,"no_of_att":0} for i in range(number_sources)}
cam_ids = {stream_id:extract_rtsp_details(uri_name) for stream_id,uri_name in enumerate(streams)}
streammux = Gst.ElementFactory.make("nvstreammux", "streammuxer")
if not streammux:
msg = "Error: Could not create streammuxer!"
logger.error(msg)
return
streammux.set_property('batch-size', number_sources)
streammux.set_property('batched-push-timeout', streammux_muxer_batch_timeout_usec)
streammux.set_property("sync-inputs",sync_inputs)
streammux.set_property("attach-sys-ts",attach_sys_ts)
streammux.set_property("max-latency",max_latency)
streammux.set_property("frame-duration",frame_duration)
streammux.set_property("width",frame_width)
streammux.set_property("height",frame_height)
streammux.set_property("live-source",is_live)
streammux.set_property("drop-pipeline-eos",drop_eos)
pipeline.add(streammux)
for src_id, src in enumerate(streams):
last_id = str(uuid.uuid1())
src_ids[f'stream{src_id}']['id'] = last_id
src_ids[f'stream{src_id}']['src'] = src
bin = create_source_bin(last_id,src_id,src)
if not bin:
msg = f"Error: Could not create source bin for {src_id}"
logger.error(msg)
del src_ids[f'stream{src_id}']
number_sources-=1
continue
msg = (f"uridecodebin_{last_id} created for {src_id=}")
logger.info(msg)
valve = Gst.ElementFactory.make("valve", f"valve_{src_id}")
queue = Gst.ElementFactory.make("queue", f"queue_{src_id}")
pipeline.add(bin)
pipeline.add(valve)
pipeline.add(queue)
bin_src = bin.get_static_pad("src")
user_data={"stream_id":src_id}
sink_pad = valve.get_static_pad("sink")
sink_pad.add_probe(Gst.PadProbeType.BUFFER, valve_prob, user_data)
padname=f"sink_{src_id}"
valve_src_pad = valve.get_static_pad("src")
valve_sink_pad = valve.get_static_pad("sink")
if not valve_src_pad or not valve_sink_pad:
msg = (f"Error: Could not get src pad for valve_{src_id} or sink_{src_id}")
logger.error(msg)
return
streammux_sinkpad= streammux.request_pad_simple(padname)
if not streammux_sinkpad:
msg = (f"Error: Could not get sink pad for streammux_{src_id}")
logger.error(msg)
return
user_meta = {"stream_id":src_id,'no_of_sec_to_predict':no_of_sec_to_send_data}
streammux_sinkpad.add_probe(Gst.PadProbeType.BUFFER, streammux_prob, user_meta)
queue_sink_pad = queue.get_static_pad("sink")
queue_src_pad = queue.get_static_pad("src")
bin_src.link(valve_sink_pad)
valve_src_pad.link(queue_sink_pad)
queue_src_pad.link(streammux_sinkpad)
active_streams = [ src["streamid"] for src in src_ids.values()]
is_first = False
if not number_sources:
msg = f"no activate streams {number_sources=}"
logger.error(msg)
sys.exit(1)
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
pgie.set_property('config-file-path', infer_config_path)
pgie_batch_size=pgie.get_property("batch-size")
if(pgie_batch_size != number_sources):
msg = ("WARNING: Overriding infer-config batch-size",pgie_batch_size," with number of sources ", number_sources)
logger.warning(msg)
pgie.set_property("batch-size",number_sources)
nvanalytics = Gst.ElementFactory.make("nvdsanalytics", "analytics")
if not nvanalytics:
msg = " Unable to create nvanalytics "
logger.error(msg)
sys.stderr.write(msg)
nvanalytics.set_property("config-file", analytics_config_path)
nvstreamdemux = Gst.ElementFactory.make("nvstreamdemux", "nvstreamdemux")
if not nvstreamdemux:
msg = " Unable to create nvstreamdemux "
logger.error(msg)
sys.stderr.write(msg)
queue1 = Gst.ElementFactory.make("queue", "queue1")
queue2 = Gst.ElementFactory.make("queue", "queue2")
queue3 = Gst.ElementFactory.make("queue", "queue3")
for q in [queue1,queue2,queue3]:
q.set_property("max-size-buffers", max_size_buffers)
q.set_property("leaky", True)
pipeline.add(q)
pgie_src = pgie.get_static_pad("src")
pgie_src.add_probe(Gst.PadProbeType.BUFFER, pgie_src_pad_prob, 0)
pipeline.add(pgie)
pipeline.add(nvanalytics)
pipeline.add(nvstreamdemux)
streammux.link(queue1)
queue1.link(pgie)
pgie.link(queue2)
queue2.link(nvanalytics)
nvanalytics.link(queue3)
queue3.link(nvstreamdemux)
for i in active_streams:
queue4_ = Gst.ElementFactory.make("queue", f"queue__{i}")
pipeline.add(queue4_)
queue4_.set_property("max-size-buffers", max_size_buffers)
queue4_.set_property("leaky", 2)
nvvideoconvert = Gst.ElementFactory.make("nvvideoconvert", f"nvvideoconvert_{i}")
# nvvideoconvertsink = nvvideoconvert.get_static_pad("sink")
# nvvideoconvert = make_element("capsfilter", i)
pipeline.add(nvvideoconvert)
nvdsosd = Gst.ElementFactory.make("nvdsosd", f"nvdsosd_{i}")
pipeline.add(nvdsosd)
nvdsosd.set_property("process-mode", osd_process_mode)
nvdsosd.set_property("display-text", osd_display_text)
padname = "src_%u" % i
demuxsrcpad = nvstreamdemux.request_pad_simple(padname)
if not demuxsrcpad:
sys.stderr.write("Unable to create demux src pad \n")
queuesinkpad = queue4_.get_static_pad("sink")
if not queuesinkpad:
sys.stderr.write("Unable to create queue sink pad \n")
demuxsrcpad.link(queuesinkpad)
queue5 = Gst.ElementFactory.make("queue", f"queue5_{i}")
pipeline.add(queue5)
queue5.set_property("max-size-buffers", max_size_buffers)
queue5.set_property("leaky", 2)
queue5_src = queue5.get_static_pad("src")
queue5_src.add_probe(Gst.PadProbeType.BUFFER, dummy_prob, "queue5 src")
nvvidconv2 = Gst.ElementFactory.make("nvvideoconvert", f"nvvideoconvert1_{i}")
pipeline.add(nvvidconv2)
jpegenc = Gst.ElementFactory.make("nvjpegenc", f"jpeg-encoder_{i}")
jpegenc.set_property("quality", im_quality)
if not jpegenc:
msg = "Unable to create nvjpegenc "
logger.error(msg)
sys.stderr.write(msg)
pipeline.add(jpegenc)
multifilesink = Gst.ElementFactory.make("multifilesink", f"multi-file-sink_{i}")
if not multifilesink:
msg = " Unable to create multifilesink "
logger.error(msg)
sys.stderr.write(msg)
multifilesink.set_property("post-messages", True)
sinkpad = multifilesink.get_static_pad("sink")
probe_data = {
"cam_id": cam_ids[i],
"base_img_path":img_dir_name,
"no_of_sec_send_msg":no_of_sec_to_send_data,
"blob_base_img_path":blob_base_img_path
}
pipeline.add(multifilesink)
queue4_.link(nvvideoconvert)
nvvideoconvert.link(nvdsosd)
nvdsosd.link(queue5)
queue5.link(nvvidconv2)
nvvidconv2.link(jpegenc)
jpegenc.link(multifilesink)
jpegenc_sinkpad = jpegenc.get_static_pad("sink")
jpegenc_sinkpad.add_probe(Gst.PadProbeType.BUFFER, dummy_prob, "jpegenc_sinkpad")
nvvidconv2_sinkpad = nvvidconv2.get_static_pad("sink")
nvvidconv2_sinkpad.add_probe(Gst.PadProbeType.BUFFER, dummy_prob, "nvvidconv2_sinkpad")
sinkpad = multifilesink.get_static_pad("sink")
if not sinkpad:
msg = "Unable to get sink pad of multifilesink"
logger.error(msg)
sys.stderr.write(msg)
sys.exit(1)
else:
sinkpad.add_probe(Gst.PadProbeType.BUFFER, frame_filter_pad_probe, probe_data)
demuxsrcpad.add_probe(Gst.PadProbeType.BUFFER, demux_src_pad_buffer_probe, probe_data)
msg = "Pipeline setup complete"
logger.info(msg)
# Get the bus
bus = pipeline.get_bus()
bus.add_signal_watch()
loop = GLib.MainLoop()
perf_data = PERF_DATA(active_streams,loop)
GLib.timeout_add(reconnect_interval, perf_data.perf_print_callback)
bus.connect("message", bus_call, loop)
# Start the pipeline
pipeline.set_state(Gst.State.PLAYING)
msg = "Pipeline is playing..."
logger.info(msg)
try:
loop.run()
except KeyboardInterrupt:
msg = "Pipeline interrupted by user."
logger.info(msg)
finally:
pipeline.set_state(Gst.State.NULL)
msg = "Pipeline stopped."
logger.info(msg)
if __name__ == "__main__":
main()