my utils/probe.py is like this:
import os
import sys
import numpy as np
import traceback
from datetime import datetime
from .attendance import log_attendance
# GStreamer imports
try:
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
except Exception as e:
print(f"ERROR: Failed to import GStreamer: {str(e)}")
sys.exit(1)
# Initialize GStreamer
if not Gst.is_initialized():
Gst.init(None)
import pyds
# Detection area configuration
DETECTION_AREA = {
'x1': 150, # Left boundary
'y1': 100, # Top boundary
'x2': 1200, # Right boundary
'y2': 800, # Bottom boundary
}
AREA_COLOR = (0, 255, 0, 0.3) # RGBA color
LINE_COLOR = (0, 255, 0, 0.8) # RGBA color
TRACKED_OBJECTS = {} # Track objects in detection area
def draw_detection_area(frame_meta, batch_meta):
"""Draw the detection area on the frame"""
try:
display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
if display_meta:
display_meta.num_rects = 1
rect = display_meta.rect_params[0]
rect.left = DETECTION_AREA['x1']
rect.top = DETECTION_AREA['y1']
rect.width = DETECTION_AREA['x2'] - DETECTION_AREA['x1']
rect.height = DETECTION_AREA['y2'] - DETECTION_AREA['y1']
rect.border_width = 2
rect.border_color.set(*LINE_COLOR)
rect.has_bg_color = 1
rect.bg_color.set(*AREA_COLOR)
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
except Exception as e:
print(f"Error drawing detection area: {str(e)}")
def is_in_detection_area(obj_meta):
"""Check if object is within the detection area"""
try:
center_x = obj_meta.rect_params.left + obj_meta.rect_params.width / 2
center_y = obj_meta.rect_params.top + obj_meta.rect_params.height / 2
return (DETECTION_AREA['x1'] <= center_x <= DETECTION_AREA['x2'] and
DETECTION_AREA['y1'] <= center_y <= DETECTION_AREA['y2'])
except Exception as e:
print(f"Error checking detection area: {str(e)}")
return False
def check_detection_area(obj_meta, frame_meta, name):
"""Check if object has entered the detection area"""
obj_id = obj_meta.object_id
current_in_area = is_in_detection_area(obj_meta)
prev_data = TRACKED_OBJECTS.get(obj_id, {'in_area': False, 'name': None})
if current_in_area and not prev_data['in_area']:
TRACKED_OBJECTS[obj_id] = {'in_area': True, 'name': name}
return True
TRACKED_OBJECTS[obj_id] = {'in_area': current_in_area, 'name': name}
return current_in_area
def display_name_on_frame(obj_meta, frame_meta, batch_meta, name):
"""Display the name on the frame"""
try:
display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
if not display_meta:
return False
display_meta.num_labels = 1
text_params = display_meta.text_params[0]
text_params.display_text = name
text_params.x_offset = int(obj_meta.rect_params.left)
text_params.y_offset = int(max(10, obj_meta.rect_params.top - 10))
text_params.font_params.font_name = "Serif"
text_params.font_params.font_size = 20
text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
text_params.set_bg_clr = 1
text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
return pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
except Exception as e:
return False
def pgie_src_filter_probe(pad, info, u_data):
"""Filter face detections after primary inference"""
try:
if not pad or not info:
print("ERROR: Invalid probe parameters")
return Gst.PadProbeReturn.OK
gst_buffer = info.get_buffer()
if not gst_buffer:
print("ERROR: Unable to get GstBuffer")
return Gst.PadProbeReturn.OK
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
if not batch_meta:
return Gst.PadProbeReturn.OK
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
if not frame_meta:
break
l_obj = frame_meta.obj_meta_list
while l_obj is not None:
try:
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
if obj_meta and hasattr(obj_meta, 'confidence') and obj_meta.confidence <= 0.6:
pyds.nvds_remove_obj_meta_from_frame(frame_meta, obj_meta)
except Exception as e:
print(f"ERROR processing object meta: {str(e)}")
traceback.print_exc()
finally:
l_obj = l_obj.next if hasattr(l_obj, 'next') else None
except Exception as e:
print(f"ERROR processing frame meta: {str(e)}")
traceback.print_exc()
finally:
l_frame = l_frame.next if hasattr(l_frame, 'next') else None
except Exception as e:
print(f"CRITICAL ERROR in pgie_src_filter_probe: {str(e)}")
traceback.print_exc()
return Gst.PadProbeReturn.OK
def sgie_feature_extract_probe(pad, info, data):
"""Extract face features and match against known faces"""
try:
if not pad or not info or not data:
print("ERROR: Invalid probe parameters")
return Gst.PadProbeReturn.OK
loaded_faces = data[0] if data and len(data) > 0 else None
if not loaded_faces:
print("WARNING: No loaded faces provided")
return Gst.PadProbeReturn.OK
gst_buffer = info.get_buffer()
if not gst_buffer:
print("ERROR: Unable to get GstBuffer")
return Gst.PadProbeReturn.OK
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
if not batch_meta:
return Gst.PadProbeReturn.OK
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
if not frame_meta:
break
draw_detection_area(frame_meta, batch_meta)
l_obj = frame_meta.obj_meta_list
while l_obj is not None:
try:
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
if not obj_meta:
continue
face_feature = get_face_feature(obj_meta, frame_meta.frame_num, data)
if face_feature is not None and loaded_faces:
best_match_id, best_score = match_faces(
obj_meta, frame_meta, batch_meta, face_feature, loaded_faces
)
# Only process matches with sufficient confidence
if best_match_id and best_score > 0.3:
in_area = check_detection_area(obj_meta, frame_meta, str(best_match_id))
if in_area:
# Get user details from database
from .attendance import get_user_info, log_attendance
first_name, last_name = get_user_info(best_match_id)
if first_name and last_name:
# Log attendance
log_attendance(best_match_id)
display_name = f"{first_name} {last_name}"
display_name_on_frame(obj_meta, frame_meta, batch_meta, display_name)
# Debug output
print(f"Recognized: ID {best_match_id} ({display_name}) with score {best_score:.2f}")
else:
print(f"User ID {best_match_id} not found in database")
except Exception as e:
print(f"ERROR processing object meta: {str(e)}")
traceback.print_exc()
finally:
l_obj = l_obj.next if hasattr(l_obj, 'next') else None
except Exception as e:
print(f"ERROR processing frame meta: {str(e)}")
traceback.print_exc()
finally:
l_frame = l_frame.next if hasattr(l_frame, 'next') else None
except Exception as e:
print(f"CRITICAL ERROR in sgie_feature_extract_probe: {str(e)}")
traceback.print_exc()
return Gst.PadProbeReturn.OK
def match_faces(obj_meta, frame_meta, batch_meta, face_feature, loaded_faces):
"""Match faces against known faces and return the best matching ID"""
best_match_id = None
best_score = 0.0
try:
for user_id, known_feature in loaded_faces.items():
try:
if known_feature is None:
continue
# Calculate similarity score and ensure it's a scalar float
score = float(np.dot(face_feature, known_feature).item())
# Update best match if this score is higher
if score > best_score:
best_score = score
best_match_id = user_id
# Debug output
print(f"Frame {frame_meta.frame_num}, Face {obj_meta.object_id} vs User {user_id}: Score {score:.4f}")
except Exception as e:
print(f"ERROR matching face {user_id}: {str(e)}")
traceback.print_exc()
except Exception as e:
print(f"CRITICAL ERROR in match_faces: {str(e)}")
traceback.print_exc()
return best_match_id, best_score
def get_face_feature(obj_meta, frame_num, data):
"""Extract face features from tensor meta"""
try:
if not obj_meta or not hasattr(obj_meta, 'obj_user_meta_list'):
return None
l_user_meta = obj_meta.obj_user_meta_list
while l_user_meta:
try:
user_meta = pyds.NvDsUserMeta.cast(l_user_meta.data)
if not user_meta:
break
if user_meta.base_meta.meta_type == pyds.NvDsMetaType.NVDSINFER_TENSOR_OUTPUT_META:
tensor_meta = pyds.NvDsInferTensorMeta.cast(user_meta.user_meta_data)
if not tensor_meta:
break
layer = pyds.get_nvds_LayerInfo(tensor_meta, 0)
if not layer or not layer.buffer:
break
output = []
for i in range(512):
try:
val = pyds.get_detections(layer.buffer, i)
if val is None:
break
output.append(val)
except Exception as e:
print(f"ERROR getting detection {i}: {str(e)}")
break
if len(output) == 512:
res = np.reshape(output, (1, -1))
norm = np.linalg.norm(res)
if norm > 0:
normal_array = res / norm
if data and len(data) > 1 and data[1] and len(data) > 2 and data[2]:
try:
save_path = os.path.join(data[2], f"{obj_meta.object_id}-{frame_num}.npy")
np.save(save_path, normal_array)
except Exception as e:
print(f"ERROR saving feature: {str(e)}")
return normal_array
except Exception as e:
print(f"ERROR processing user meta: {str(e)}")
traceback.print_exc()
finally:
l_user_meta = l_user_meta.next if hasattr(l_user_meta, 'next') else None
except Exception as e:
print(f"CRITICAL ERROR in get_face_feature: {str(e)}")
traceback.print_exc()
return None
And my main.py is like this:
import sys
import math
import traceback
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
from utils.probe import *
from utils.parser_cfg import *
from utils.bus_call import bus_call
import os
def log_error(message):
sys.stderr.write(f"ERROR: {message}\n")
sys.stderr.flush()
def log_info(message):
print(f"INFO: {message}")
sys.stdout.flush()
def print_pad_info(element, name):
try:
pads = element.pads
log_info(f"{name} pads:")
for pad in pads:
caps = pad.get_current_caps()
caps_str = caps.to_string() if caps else "None"
log_info(f" {pad.name}: {pad.direction} (caps: {caps_str})")
except Exception as e:
log_error(f"Error getting pad info for {name}: {str(e)}")
def cb_newpad(decodebin, decoder_src_pad, data):
try:
log_info("Entered cb_newpad callback")
caps = decoder_src_pad.get_current_caps()
if not caps:
log_info("No current caps, querying caps...")
caps = decoder_src_pad.query_caps()
log_info(f"Pad caps: {caps.to_string()}")
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()
source_bin = data
features = caps.get_features(0)
log_info(f"Structure name: {gstname}")
if "video" in gstname.lower():
log_info("Found video pad")
if features.contains("memory:NVMM"):
log_info("NVMM memory feature found")
bin_ghost_pad = source_bin.get_static_pad("src")
if not bin_ghost_pad:
log_error("Failed to get source bin ghost pad")
return
if not bin_ghost_pad.set_target(decoder_src_pad):
log_error("Failed to link decoder src pad to source bin ghost pad")
else:
log_info("Successfully linked decoder src pad to source bin ghost pad")
else:
log_error("Decodebin did not pick nvidia decoder plugin - no NVMM memory feature")
else:
log_info(f"Ignoring non-video pad: {gstname}")
except Exception as e:
log_error(f"Exception in cb_newpad: {str(e)}\n{traceback.format_exc()}")
def decodebin_child_added(child_proxy, Object, name, user_data):
try:
log_info(f"Decodebin child added: {name}")
if "decodebin" in name:
log_info("Connecting child-added signal")
Object.connect("child-added", decodebin_child_added, user_data)
if "source" in name:
log_info("Found source element")
source_element = child_proxy.get_by_name("source")
if source_element.find_property('drop-on-latency') is not None:
log_info("Setting drop-on-latency property")
Object.set_property("drop-on-latency", True)
except Exception as e:
log_error(f"Exception in decodebin_child_added: {str(e)}\n{traceback.format_exc()}")
def create_source_bin(index, uri):
try:
log_info(f"Creating source bin {index} for URI: {uri}")
bin_name = f"source-bin-{index:02d}"
nbin = Gst.Bin.new(bin_name)
if not nbin:
log_error(f"Unable to create source bin {index}")
return None
uri_decode_bin = Gst.ElementFactory.make("uridecodebin", f"uri-decode-bin-{index}")
if not uri_decode_bin:
log_error(f"Unable to create uri decode bin {index}")
return None
log_info(f"Setting URI: {uri}")
uri_decode_bin.set_property("uri", uri)
log_info("Connecting signals...")
uri_decode_bin.connect("pad-added", cb_newpad, nbin)
uri_decode_bin.connect("child-added", decodebin_child_added, nbin)
log_info("Adding elements to bin...")
Gst.Bin.add(nbin, uri_decode_bin)
log_info("Creating ghost pad...")
bin_pad = nbin.add_pad(Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC))
if not bin_pad:
log_error("Failed to add ghost pad in source bin")
return None
log_info(f"Successfully created source bin {index}")
return nbin
except Exception as e:
log_error(f"Exception in create_source_bin: {str(e)}\n{traceback.format_exc()}")
return None
def link_elements_with_checks(pipeline, element1, element2, name1, name2):
try:
log_info(f"Attempting to link {name1} to {name2}")
if not element1 or not element2:
log_error(f"Cannot link - {'element1' if not element1 else 'element2'} is None")
return False
src_pad = element1.get_static_pad("src")
if not src_pad:
log_error(f"No src pad found on {name1}")
print_pad_info(element1, name1)
return False
sink_pad = element2.get_static_pad("sink")
if not sink_pad:
log_error(f"No sink pad found on {name2}")
print_pad_info(element2, name2)
return False
log_info(f"Linking {name1} src pad to {name2} sink pad")
link_result = src_pad.link(sink_pad)
if link_result != Gst.PadLinkReturn.OK:
log_error(f"Failed to link {name1} to {name2}: {link_result}")
log_info(f"{name1} src pad info:")
log_info(f" Caps: {src_pad.get_current_caps()}")
log_info(f" Direction: {src_pad.direction}")
log_info(f"{name2} sink pad info:")
log_info(f" Caps: {sink_pad.get_current_caps()}")
log_info(f" Direction: {sink_pad.direction}")
return False
else:
log_info(f"Successfully linked {name1} to {name2}")
return True
except Exception as e:
log_error(f"Exception linking {name1} to {name2}: {str(e)}\n{traceback.format_exc()}")
return False
def main(cfg):
try:
log_info("Starting pipeline initialization")
log_info(f"Configuration: {cfg}")
faces_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "recognized_faces")
os.makedirs(faces_dir, exist_ok=True)
log_info(f"Face images will be saved to: {faces_dir}")
log_info("Loading known faces features...")
try:
known_face_features = load_faces(cfg['pipeline']['known_face_dir'])
log_info(f"Loaded {len(known_face_features)} known face features")
except Exception as e:
log_error(f"Failed to load known faces: {str(e)}\n{traceback.format_exc()}")
return
save_feature = cfg['pipeline']['save_feature']
save_path = None
if save_feature:
try:
save_path = cfg['pipeline']['save_feature_path']
log_info(f"Features will be saved to: {save_path}")
except Exception as e:
log_info(f"No save path specified or error getting path: {str(e)}")
sources = cfg['source']
number_sources = len(sources)
log_info(f"Number of sources: {number_sources}")
log_info("Initializing GStreamer...")
Gst.init(None)
log_info("Creating Pipeline...")
pipeline = Gst.Pipeline()
if not pipeline:
log_error("Unable to create Pipeline")
return
log_info("Creating streammux...")
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
log_error("Unable to create NvStreamMux")
return
pipeline.add(streammux)
source_idx = 0
is_live = False
for k, v in sources.items():
log_info(f"Processing source {k}: {v}")
uri_name = v
if uri_name.find("rtsp://") == 0:
is_live = True
if "?" not in uri_name:
uri_name += "?latency=0&drop-on-latency=true&buffer-mode=auto"
log_info(f"Modified RTSP URI: {uri_name}")
log_info(f"Creating source_bin {source_idx}")
source_bin = create_source_bin(source_idx, uri_name)
if not source_bin:
log_error(f"Unable to create source bin {source_idx}")
continue
pipeline.add(source_bin)
padname = f"sink_{source_idx}"
log_info(f"Getting request pad {padname}")
try:
sinkpad = streammux.get_request_pad(padname)
if not sinkpad:
log_error(f"Unable to create sink pad {padname}")
continue
srcpad = source_bin.get_static_pad("src")
if not srcpad:
log_error(f"Unable to create src pad for source {source_idx}")
continue
log_info(f"Linking pads for source {source_idx}")
if srcpad.link(sinkpad) != Gst.PadLinkReturn.OK:
log_error(f"Failed to link pads for source {source_idx}")
else:
log_info(f"Successfully linked pads for source {source_idx}")
except Exception as e:
log_error(f"Exception linking source {source_idx}: {str(e)}\n{traceback.format_exc()}")
source_idx += 1
# Create queues
log_info("Creating queues...")
queues = []
for i in range(1, 8):
queue = Gst.ElementFactory.make("queue", f"queue{i}")
if not queue:
log_error(f"Unable to create queue{i}")
return
pipeline.add(queue)
queues.append(queue)
log_info(f"Created queue{i}")
log_info("Creating Pgie...")
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
if not pgie:
log_error("Unable to create pgie")
return
log_info("Creating Sgie...")
sgie = Gst.ElementFactory.make("nvinfer", "secondary-inference")
if not sgie:
log_error("Unable to create sgie")
return
log_info("Creating Tracker...")
tracker = Gst.ElementFactory.make("nvtracker", "tracker")
if not tracker:
log_error("Unable to create tracker")
return
log_info("Creating tiler...")
tiler = Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
if not tiler:
log_error("Unable to create tiler")
return
log_info("Creating nvvidconv...")
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
if not nvvidconv:
log_error("Unable to create nvvidconv")
return
log_info("Creating nvosd...")
nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
if not nvosd:
log_error("Unable to create nvosd")
return
if not cfg['pipeline']['display']:
log_info("Creating Fakesink...")
sink = Gst.ElementFactory.make("fakesink", "fakesink")
sink.set_property('enable-last-sample', 0)
sink.set_property('sync', 0)
else:
if cfg['pipeline']['is_aarch64']:
log_info("Creating nv3dsink...")
sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
else:
log_info("Creating EGLSink...")
sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
if cfg['pipeline']['enable_rtsp_streaming']:
log_info("Creating RTSP streaming elements...")
if not sink:
log_error("Unable to create sink element")
return
if is_live:
log_info("At least one of the sources is live")
streammux.set_property('live-source', 1)
# Set properties
log_info("Setting streammux properties...")
set_property(cfg, streammux, "streammux")
log_info("Setting pgie properties...")
set_property(cfg, pgie, "pgie")
log_info("Setting sgie properties...")
set_property(cfg, sgie, "sgie")
log_info("Setting nvosd properties...")
set_property(cfg, nvosd, "nvosd")
log_info("Setting tiler properties...")
set_property(cfg, tiler, "tiler")
log_info("Setting sink properties...")
set_property(cfg, sink, "sink")
log_info("Setting tracker properties...")
set_tracker_properties(tracker, cfg['tracker']['config-file-path'])
tiler_rows = int(math.sqrt(number_sources))
tiler_columns = int(math.ceil((1.0 * number_sources) / tiler_rows))
log_info(f"Setting tiler rows: {tiler_rows}, columns: {tiler_columns}")
tiler.set_property("rows", tiler_rows)
tiler.set_property("columns", tiler_columns)
log_info("Adding elements to Pipeline...")
elements_to_add = [pgie, sgie, tracker, tiler, nvvidconv, nvosd, sink]
for element in elements_to_add:
pipeline.add(element)
log_info(f"Added {element.name} to pipeline")
log_info("Linking elements in the Pipeline...")
elements = [streammux] + queues + [pgie, tracker, sgie, tiler, nvvidconv, nvosd, sink]
# Link elements with verification
for i in range(len(elements)-1):
if not link_elements_with_checks(pipeline, elements[i], elements[i+1], elements[i].name, elements[i+1].name):
log_error("Pipeline linking failed")
return
log_info("All elements linked successfully")
log_info("Attaching probes...")
try:
pgie_src_pad = pgie.get_static_pad("src")
if pgie_src_pad:
log_info("Adding probe to pgie src pad")
pgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, pgie_src_filter_probe, 0)
else:
log_error("Could not get pgie src pad")
sgie_src_pad = sgie.get_static_pad("src")
if sgie_src_pad:
log_info("Adding probe to sgie src pad")
data = [known_face_features, save_feature, save_path]
sgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, sgie_feature_extract_probe, data)
else:
log_error("Could not get sgie src pad")
except Exception as e:
log_error(f"Exception while attaching probes: {str(e)}\n{traceback.format_exc()}")
log_info("Listing sources:")
for key, value in sources.items():
log_info(f"{key}: {value}")
log_info("Starting pipeline...")
pipeline.set_state(Gst.State.PLAYING)
log_info("Pipeline state set to PLAYING")
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
log_info("Entering main loop...")
try:
loop.run()
except KeyboardInterrupt:
log_info("Received keyboard interrupt")
except Exception as e:
log_error(f"Exception in main loop: {str(e)}\n{traceback.format_exc()}")
finally:
log_info("Stopping pipeline...")
pipeline.set_state(Gst.State.NULL)
log_info("Pipeline stopped")
except Exception as e:
log_error(f"Exception in main: {str(e)}\n{traceback.format_exc()}")
if 'pipeline' in locals():
pipeline.set_state(Gst.State.NULL)
if __name__ == '__main__':
try:
log_info("Loading configuration...")
cfg = parse_args(cfg_path="config/config_pipeline.toml")
log_info("Starting main execution...")
main(cfg)
except Exception as e:
log_error(f"Exception in __main__: {str(e)}\n{traceback.format_exc()}")
i am using single rtsp as input source yet. I am Very new to deepstream. Couldn’t find solution for many days. Sometime an error of color format occurs and sometime other errors occur. I just want to capture face images of person. Can you help me by checking my code please. I will be so thankful