@junshengy, thank you. I will reach out to TensorRT, as you suggested.
Coming to the Deepstream pipeline , When I run the pipeline, I observe a latency between the OSD display and the processing. Specifically, after processing four frames, I can see the processing logs in the terminal for those four frames, and only then is the output displayed.
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject, GLib
import pyds
from pathlib import Path
from probes import *
import toml
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from bus_call import bus_call
from platform_info import PlatformInfo
from parse_cfg import *
MUXER_BATCH_TIMEOUT_USEC = 33000
def rtsp_pad_added(src, new_pad, data):
"""Handle pad-added signal from rtspsrc"""
rtph264depay = data
caps = new_pad.get_current_caps()
caps_str = caps.to_string() if caps else ""
print(f"RTSP Source pad added: {new_pad.get_name()} with caps: {caps_str}")
if "application/x-rtp" in caps_str:
sink_pad = rtph264depay.get_static_pad("sink")
if sink_pad and not sink_pad.is_linked():
ret = new_pad.link(sink_pad)
if ret == Gst.PadLinkReturn.OK:
print(f"Successfully linked {src.get_name()} -> {rtph264depay.get_name()}")
else:
print(f"Link failed: {ret}")
else:
print(f"Depay sink pad already linked or unavailable")
else:
print(f"Ignoring non-RTP pad: {new_pad.get_name()}")
def decoder_pad_added(src, new_pad, data):
"""Handle pad-added signal from decoder"""
streammux = data[0]
sink_idx = data[1]
print(f"Decoder {src.get_name()} got new pad: {new_pad.get_name()}")
# Request sink pad from streammux
sink_pad_name = f"sink_{sink_idx}"
sink_pad = streammux.get_request_pad(sink_pad_name)
if not sink_pad:
sys.stderr.write(f"Unable to get request pad '{sink_pad_name}' from streammux\n")
return
if sink_pad.is_linked():
print(f"Streammux sink pad '{sink_pad_name}' is already linked.")
else:
ret = new_pad.link(sink_pad)
if ret == Gst.PadLinkReturn.OK:
print(f"Decoder pad linked to streammux sink pad '{sink_pad_name}'.")
else:
sys.stderr.write(f"Failed to link decoder pad to streammux. Return: {ret}\n")
def create_sources(pipeline, camera_config, streammux):
"""Create RTSP sources from config"""
sources = []
for i, (cam_name, cam_url) in enumerate(camera_config.items()):
print(f"Setting up camera {cam_name} with URL: {cam_url}")
# Create elements
source = Gst.ElementFactory.make("rtspsrc", f"src-{i}")
rtph264depay = Gst.ElementFactory.make("rtph265depay", f"depay-{i}")
h264parse = Gst.ElementFactory.make("h265parse", f"parse-{i}")
decoder = Gst.ElementFactory.make("nvv4l2decoder", f"dec-{i}")
# Configure source
source.set_property("location", cam_url)
source.set_property("latency", 200) # ms buffer
# Add elements to pipeline
pipeline.add(source)
pipeline.add(rtph264depay)
pipeline.add(h264parse)
pipeline.add(decoder)
# Link static elements
rtph264depay.link(h264parse)
h264parse.link(decoder)
# Use the existing rtsp_pad_added function instead of defining a new callback
source.connect("pad-added", rtsp_pad_added, rtph264depay)
# Add to sources list with the camera name
sources.append((cam_name, decoder))
return sources
def create_pipeline(camera_config, pgie1_config_path, pgie2_config_path):
"""Create the full DeepStream pipeline"""
Gst.init(None)
pipeline = Gst.Pipeline()
platform_info = PlatformInfo()
if not pipeline:
sys.stderr.write("Unable to create Pipeline\n")
return None
# Create streammux first
streammux = Gst.ElementFactory.make("nvstreammux", "stream-muxer")
if not streammux:
sys.stderr.write("Unable to create nvstreammux\n")
return None
streammux.set_property("batch-size", len(camera_config))
streammux.set_property("width", 1920)
streammux.set_property("height", 1080)
streammux.set_property("batched-push-timeout", MUXER_BATCH_TIMEOUT_USEC)
pipeline.add(streammux)
# Create sources
sources = create_sources(pipeline, camera_config, streammux)
if not sources:
sys.stderr.write("Failed to create sources\n")
return None
# # Create streammux for batch processing
# streammux = Gst.ElementFactory.make("nvstreammux", "stream-muxer")
# if not streammux:
# sys.stderr.write("Unable to create nvstreammux\n")
# return None
# Configure streammux
# streammux.set_property("batch-size", len(sources))
# streammux.set_property("width", 1920)
# streammux.set_property("height", 1080)
# streammux.set_property("batched-push-timeout", MUXER_BATCH_TIMEOUT_USEC)
# Add streammux to pipeline
#pipeline.add(streammux)
# Connect sources to streammux
for i, (cam_name, decoder) in enumerate(sources):
sinkpad = streammux.get_request_pad(f"sink_{i}")
if not sinkpad:
sys.stderr.write(f"Unable to get sink pad from streammux for {cam_name}\n")
return None
srcpad = decoder.get_static_pad("src")
if not srcpad:
sys.stderr.write(f"Unable to get src pad from decoder for {cam_name}\n")
return None
srcpad.link(sinkpad)
# Create first primary inference engine (PGIE)
pgie1 = Gst.ElementFactory.make("nvinfer", "primary-inference-1")
if not pgie1:
sys.stderr.write("Unable to create primary inference engine 1\n")
return None
# Create second primary inference engine (PGIE)
pgie2 = Gst.ElementFactory.make("nvinfer", "primary-inference-2")
if not pgie2:
sys.stderr.write("Unable to create primary inference engine 2\n")
return None
# Configure PGIE1 and PGIE2
pgie1.set_property("config-file-path", pgie1_config_path)
pgie2.set_property("config-file-path", pgie2_config_path)
# Create converter for color space conversion
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "converter")
if not nvvidconv:
sys.stderr.write("Unable to create nvvideoconvert\n")
return None
# Create OSD for display
nvosd = Gst.ElementFactory.make("nvdsosd", "onscreen-display")
if not nvosd:
sys.stderr.write("Unable to create nvdsosd\n")
return None
# Create appropriate sink based on platform
# Platform-specific sink creation
if platform_info.is_integrated_gpu():
print("Creating nv3dsink \n")
sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
if not sink:
sys.stderr.write(" Unable to create nv3dsink \n")
return None
else:
if platform_info.is_platform_aarch64():
print("Creating nv3dsink \n")
sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
else:
print("Creating EGLSink \n")
sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
if not sink:
sys.stderr.write(" Unable to create egl sink \n")
return None
# Add all elements to pipeline
pipeline.add(pgie1)
pipeline.add(pgie2)
pipeline.add(nvvidconv)
pipeline.add(nvosd)
pipeline.add(sink)
# Link all elements in the pipeline
print("Linking elements in the pipeline")
streammux.link(pgie1)
pgie1.link(pgie2)
pgie2.link(nvvidconv)
nvvidconv.link(nvosd)
nvosd.link(sink)
# Add probes for monitoring and debugging
pgie1_src_pad = pgie1.get_static_pad("src")
if pgie1_src_pad:
pgie1_src_pad.add_probe(Gst.PadProbeType.BUFFER, pgie1_probe, "PGIE1")
pgie2_src_pad = pgie2.get_static_pad("src")
if pgie2_src_pad:
pgie2_src_pad.add_probe(Gst.PadProbeType.BUFFER, pgie2_probe, "PGIE2")
osd_sink_pad = nvosd.get_static_pad("sink")
if osd_sink_pad:
osd_sink_pad.add_probe(Gst.PadProbeType.BUFFER, osd_probe, 0)
return pipeline
def main(cfg):
# Create pipeline
print(cfg)
camera_config = cfg ['source']
pgie1_config_path = cfg["pgie"]["goggles_config"]
pgie2_config_path = cfg["pgie"]["full_ppe_config"]
pipeline = create_pipeline(camera_config,pgie1_config_path,pgie2_config_path)
# Setup bus and loop
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
# Start pipeline
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except:
pass
# Cleanup
pipeline.set_state(Gst.State.NULL)
gate_controller.cleanup()
if __name__ == "__main__":
cfg=parse_args(cfg_path="paths.toml")
main(cfg)
osd probe connected :
def osd_probe(pad, info, u_data):
"""Handles on-screen display and visual alerts"""
buf = info.get_buffer()
if not buf:
return Gst.PadProbeReturn.OK
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(buf))
# Initialize counters for all classes
# PGIE1 classes (0-4)
# PGIE2 classes (5-11)
obj_counter = {class_id: 0 for class_id in range(12)} # Classes 0-11
# Class labels for display
pgie1_labels = {
0: "Safety Goggle",
1: "ToeGuard",
2: "Non Safety Shoes",
3: "Safety Shoes",
4: "Non Safety Goggles"
}
pgie2_labels = {
5: "Safety Mask",
6: "Safety Gloves",
7: "Wheel Chock",
8: "Earthing Clamp",
9: "Safety Helmet",
10: "R Jacket",
11: "People Count"
}
# Use the same GList iteration approach that works in other functions
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
camera_id = frame_meta.source_id
# Reset counters
for class_id in obj_counter:
obj_counter[class_id] = 0
# Count all objects
l_obj = frame_meta.obj_meta_list
while l_obj:
try:
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
obj_counter[obj_meta.class_id] += 1
# Color coding
if obj_meta.class_id in [0, 1, 3]: # Gate-relevant
obj_meta.rect_params.border_color.set(0.0, 1.0, 0.0, 0.8) # Green
elif obj_meta.class_id in [5, 6, 9, 10]: # Safety-critical PPE
obj_meta.rect_params.border_color.set(1.0, 0.0, 0.0, 0.8) # Red
else:
obj_meta.rect_params.border_color.set(0.0, 0.0, 1.0, 0.8) # Blue
l_obj = l_obj.next
except StopIteration:
break
# Check violations
missing_ppe = check_ppe_violations(obj_counter)
people_count = obj_counter[11]
has_violation = (people_count != 3) or missing_ppe
# Visual warnings
if has_violation:
warning_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
warning_meta.num_labels = 1
warning_text = []
if people_count != 3:
warning_text.append(f"PEOPLE: {people_count} (Expected 3)")
if missing_ppe:
warning_text.append(f"MISSING: {', '.join(missing_ppe)}")
warning_params = warning_meta.text_params[0]
warning_params.display_text = "\n".join(warning_text)
warning_params.x_offset = 10
warning_params.y_offset = frame_meta.source_frame_height - 100
warning_params.font_params.font_color.set(1.0, 0.0, 0.0, 1.0)
warning_params.text_bg_clr.set(0.0, 0.0, 0.0, 0.5)
pyds.nvds_add_display_meta_to_frame(frame_meta, warning_meta)
# Main OSD
display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
display_meta.num_labels = 1
text_params = display_meta.text_params[0]
# Create OSD text with proper labels
osd_text = [f"Camera {camera_id} | Frame: {frame_meta.frame_num}"]
# PGIE1 detections
osd_text.append("--- PGIE1 Detections ---")
for class_id in range(5): # PGIE1 classes (0-4)
if obj_counter[class_id] > 0:
osd_text.append(f"{pgie1_labels[class_id]}: {obj_counter[class_id]}")
# PGIE2 detections
osd_text.append("--- PGIE2 Detections ---")
for class_id in range(5, 12): # PGIE2 classes (5-11)
if obj_counter[class_id] > 0:
osd_text.append(f"{pgie2_labels[class_id]}: {obj_counter[class_id]}")
text_params.display_text = "\n".join(osd_text)
# Text formatting
text_params.x_offset = 10
text_params.y_offset = 10
text_params.font_params.font_name = "Serif"
text_params.font_params.font_size = 10
text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
text_params.set_bg_clr = 1
text_params.text_bg_clr.set(0.0, 0.0, 0.0, 0.5)
# Gate status
gate_open = (gate_controller.goggles_detected and
(gate_controller.safety_shoes_detected or gate_controller.toeguard_detected))
if display_meta.num_labels < 2:
display_meta.num_labels = 2
gate_params = display_meta.text_params[1]
gate_params.display_text = f"GATE: {'OPEN' if gate_open else 'CLOSED'}"
gate_params.x_offset = frame_meta.source_frame_width - 200
gate_params.y_offset = 10
# Fix the color setting
if gate_open:
gate_params.font_params.font_color.set(0.0, 1.0, 0.0, 1.0) # Green when open
else:
gate_params.font_params.font_color.set(1.0, 0.0, 0.0, 1.0) # Red when closed
gate_params.text_bg_clr.set(0.0, 0.0, 0.0, 0.5)
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
l_frame = l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
Why is this happening? How can I make the output display immediately after processing?
Because, it is not acceptable to detect the object after a 1-2 seconds delay while it is standing in front of the camera.