the preprocessing steps are already defined in the sgie, color mode is gray scale and input output is 64x64, and i checked the scale factor is as 0.003921568 by checking the model output in offline mode by dividing the output with 255.
enabling or disabling the parser does not make any difference anyway.
i defined the logs in main deepstream.py pipeline, to see if the pgie meta data se successfully being sent to the sgie and its bein gread from sgie, but after the sgie im not seeing any output data of classification.
here is my deepstream.py pipeline code reference:
#!/usr/bin/env python3
import gi
gi.require_version(‘Gst’, ‘1.0’)
from gi.repository import Gst, GLib
import os
import sys
import time
import argparse
import platform
from ctypes import *
sys.path.append(‘/opt/nvidia/deepstream/deepstream/lib’)
import pyds
MAX_ELEMENTS_IN_DISPLAY_META = 16
Define class IDs
PGIE_CLASS_ID_FACE = 0 # Add this line
Global Variables
SOURCE = ‘’
CONFIG_INFER = ‘’
CONFIG_SGIE = ‘’
STREAMMUX_BATCH_SIZE = 1
STREAMMUX_WIDTH = 1920
STREAMMUX_HEIGHT = 1080
GPU_ID = 0
PERF_MEASUREMENT_INTERVAL_SEC = 5
Skeleton for facial landmarks (if needed)
skeleton = [[16, 14], [14, 12], [17, 15], [15, 13], [12, 13], [6, 12], [7, 13],
[6, 7], [6, 8], [7, 9], [8, 10], [9, 11], [2, 3], [1, 2],
[1, 3], [2, 4], [3, 5], [4, 6], [5, 7]]
start_time = time.time()
fps_streams = {}
class GETFPS:
def init(self, stream_id):
global start_time
self.start_time = start_time
self.is_first = True
self.frame_count = 0
self.stream_id = stream_id
self.total_fps_time = 0
self.total_frame_count = 0
def get_fps(self):
end_time = time.time()
if self.is_first:
self.start_time = end_time
self.is_first = False
current_time = end_time - self.start_time
if current_time > PERF_MEASUREMENT_INTERVAL_SEC:
self.total_fps_time += current_time
self.total_frame_count += self.frame_count
current_fps = float(self.frame_count) / current_time
avg_fps = float(self.total_frame_count) / self.total_fps_time
sys.stdout.write('DEBUG: FPS of stream %d: %.2f (Average: %.2f)\n' %
(self.stream_id + 1, current_fps, avg_fps))
self.start_time = end_time
self.frame_count = 0
else:
self.frame_count += 1
def set_custom_bbox(obj_meta):
border_width = 6
font_size = 18
x_offset = int(min(STREAMMUX_WIDTH - 1, max(0, obj_meta.rect_params.left - (border_width / 2))))
y_offset = int(min(STREAMMUX_HEIGHT - 1, max(0, obj_meta.rect_params.top - (font_size * 2) + 1)))
obj_meta.rect_params.border_width = border_width
obj_meta.rect_params.border_color.red = 0.0
obj_meta.rect_params.border_color.green = 0.0
obj_meta.rect_params.border_color.blue = 1.0
obj_meta.rect_params.border_color.alpha = 1.0
obj_meta.text_params.font_params.font_name = 'Ubuntu'
obj_meta.text_params.font_params.font_size = font_size
obj_meta.text_params.x_offset = x_offset
obj_meta.text_params.y_offset = y_offset
obj_meta.text_params.font_params.font_color.red = 1.0
obj_meta.text_params.font_params.font_color.green = 1.0
obj_meta.text_params.font_params.font_color.blue = 1.0
obj_meta.text_params.font_params.font_color.alpha = 1.0
obj_meta.text_params.set_bg_clr = 1
obj_meta.text_params.text_bg_clr.red = 0.0
obj_meta.text_params.text_bg_clr.green = 0.0
obj_meta.text_params.text_bg_clr.blue = 1.0
obj_meta.text_params.text_bg_clr.alpha = 1.0
def parse_face_from_meta(frame_meta, obj_meta):
# For face landmarks, if your model outputs them
num_joints = int(obj_meta.mask_params.size / (sizeof(c_float) * 3))
gain = min(obj_meta.mask_params.width / STREAMMUX_WIDTH,
obj_meta.mask_params.height / STREAMMUX_HEIGHT)
pad_x = (obj_meta.mask_params.width - STREAMMUX_WIDTH * gain) / 2.0
pad_y = (obj_meta.mask_params.height - STREAMMUX_HEIGHT * gain) / 2.0
batch_meta = frame_meta.base_meta.batch_meta
display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
for i in range(num_joints):
data = obj_meta.mask_params.get_mask_array()
xc = int((data[i * 3 + 0] - pad_x) / gain)
yc = int((data[i * 3 + 1] - pad_y) / gain)
confidence = data[i * 3 + 2]
if confidence < 0.5:
continue
if display_meta.num_circles == MAX_ELEMENTS_IN_DISPLAY_META:
display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
circle_params = display_meta.circle_params[display_meta.num_circles]
circle_params.xc = xc
circle_params.yc = yc
circle_params.radius = 6
circle_params.circle_color.red = 1.0
circle_params.circle_color.green = 1.0
circle_params.circle_color.blue = 1.0
circle_params.circle_color.alpha = 1.0
circle_params.has_bg_color = 1
circle_params.bg_color.red = 0.0
circle_params.bg_color.green = 0.0
circle_params.bg_color.blue = 1.0
circle_params.bg_color.alpha = 1.0
display_meta.num_circles += 1
import sys
import gi
gi.require_version(‘Gst’, ‘1.0’)
from gi.repository import Gst
import pyds
Define label map
label_map = [
“Angry”,
“Disgust”,
“Fear”,
“Happy”,
“Sad”,
“Surprise”,
“Neutral”
]
PGIE_CLASS_ID_FACE = 0 # Assuming PGIE assigns class ID 0 to faces
<<< ADDED: Additional Pad Probe for SGIE >>>
def sgie_src_pad_buffer_probe(pad, info, u_data):
“”"
Probe to confirm classification metadata is attached AFTER SGIE runs.
“”"
gst_buffer = info.get_buffer()
if not gst_buffer:
print(“[SGIE Probe] Unable to get GstBuffer”)
return Gst.PadProbeReturn.OK
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
if not batch_meta:
return Gst.PadProbeReturn.OK
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
l_obj = frame_meta.obj_meta_list
while l_obj is not None:
try:
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
except StopIteration:
break
# Print out the object class_id so we know which IDs are coming through
print(f"[SGIE Probe] Object ID={obj_meta.object_id}, class_id={obj_meta.class_id}")
# If classification metadata is attached, this list won't be None
if obj_meta.classifier_meta_list is not None:
print(f"[SGIE Probe] Classifier meta found for object ID={obj_meta.object_id}")
else:
print(f"[SGIE Probe] No classifier meta for object ID={obj_meta.object_id}")
l_obj = l_obj.next
l_frame = l_frame.next
return Gst.PadProbeReturn.OK
def tracker_src_pad_buffer_probe(pad, info, u_data):
“”"
Existing tracker probe.
Shows when faces are detected, tries to read classification meta.
“”"
gst_buffer = info.get_buffer()
if not gst_buffer:
print(“Unable to get GstBuffer”)
return Gst.PadProbeReturn.OK
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
l_obj_meta = frame_meta.obj_meta_list
while l_obj_meta is not None:
try:
obj_meta = pyds.NvDsObjectMeta.cast(l_obj_meta.data)
except StopIteration:
break
# <<< ADDED: Print out the class_id after PGIE >>>
print(f"[Tracker Probe] Object ID={obj_meta.object_id}, class_id={obj_meta.class_id}")
if obj_meta.class_id == PGIE_CLASS_ID_FACE:
try:
if obj_meta.classifier_meta_list is not None:
classifier_meta = pyds.NvDsClassifierMeta.cast(obj_meta.classifier_meta_list.data)
if classifier_meta is not None:
class_info_list = classifier_meta.class_info_list
if class_info_list is None:
print(f"[Python Probe] No class info list for object ID {obj_meta.object_id}")
else:
while class_info_list is not None:
try:
class_info = pyds.NvDsInferObjectDetectionInfo.cast(class_info_list.data)
except StopIteration:
break
emotion_id = class_info.classId
confidence = class_info.detectionConfidence
if 0 <= emotion_id < len(label_map):
emotion_label = label_map[emotion_id]
else:
emotion_label = "Unknown"
# Update the display text with emotion_label
obj_meta.text_params.display_text = f"ID:{obj_meta.object_id} {emotion_label}"
print(f"[Python Probe] Object ID {obj_meta.object_id}: {emotion_label} ({confidence:.2f})")
class_info_list = class_info_list.next
else:
print(f"[Python Probe] No classifier meta list for object ID {obj_meta.object_id}")
except Exception as e:
print(f"Error accessing classifier meta: {e}")
l_obj_meta = l_obj_meta.next
l_frame = l_frame.next
return Gst.PadProbeReturn.OK
def decodebin_child_added(child_proxy, Object, name, user_data):
if name.find(‘decodebin’) != -1:
Object.connect(‘child-added’, decodebin_child_added, user_data)
if name.find(‘nvv4l2decoder’) != -1:
Object.set_property(‘drop-frame-interval’, 0)
Object.set_property(‘num-extra-surfaces’, 1)
if is_aarch64():
Object.set_property(‘enable-max-performance’, 1)
else:
Object.set_property(‘cudadec-memtype’, 0)
Object.set_property(‘gpu-id’, GPU_ID)
def cb_newpad(decodebin, pad, user_data):
streammux_sink_pad = user_data
caps = pad.get_current_caps()
if not caps:
caps = pad.query_caps()
structure = caps.get_structure(0)
name = structure.get_name()
features = caps.get_features(0)
if name.find(‘video’) != -1:
if features.contains(‘memory:NVMM’):
if pad.link(streammux_sink_pad) != Gst.PadLinkReturn.OK:
sys.stderr.write(‘ERROR: Failed to link source to streammux sink pad\n’)
else:
sys.stderr.write(‘ERROR: decodebin did not pick NVIDIA decoder plugin’)
def create_uridecode_bin(stream_id, uri, streammux):
bin_name = ‘source-bin-%04d’ % stream_id
bin = Gst.ElementFactory.make(‘uridecodebin’, bin_name)
if ‘rtsp://’ in uri:
pyds.configure_source_for_ntp_sync(bin)
bin.set_property(‘uri’, uri)
pad_name = ‘sink_%u’ % stream_id
streammux_sink_pad = streammux.get_request_pad(pad_name)
bin.connect(‘pad-added’, cb_newpad, streammux_sink_pad)
bin.connect(‘child-added’, decodebin_child_added, 0)
fps_streams[‘stream{0}’.format(stream_id)] = GETFPS(stream_id)
return bin
def bus_call(bus, message, user_data):
loop = user_data
t = message.type
if t == Gst.MessageType.EOS:
sys.stdout.write(‘DEBUG: EOS\n’)
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
sys.stderr.write(‘WARNING: %s: %s\n’ % (err, debug))
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
sys.stderr.write(‘ERROR: %s: %s\n’ % (err, debug))
loop.quit()
return True
def is_aarch64():
return platform.machine() == ‘aarch64’
def main():
Gst.init(None)
loop = GLib.MainLoop()
pipeline = Gst.Pipeline()
if not pipeline:
sys.stderr.write('ERROR: Failed to create pipeline\n')
sys.exit(1)
# Create StreamMuxer
streammux = Gst.ElementFactory.make('nvstreammux', 'Stream-muxer')
if not streammux:
sys.stderr.write('ERROR: Failed to create nvstreammux\n')
sys.exit(1)
pipeline.add(streammux)
# Create Source Bin
source_bin = create_uridecode_bin(0, SOURCE, streammux)
if not source_bin:
sys.stderr.write('ERROR: Failed to create source_bin\n')
sys.exit(1)
pipeline.add(source_bin)
# Create Primary Inference (PGIE)
pgie = Gst.ElementFactory.make('nvinfer', 'primary-inference')
if not pgie:
sys.stderr.write('ERROR: Failed to create PGIE\n')
sys.exit(1)
pgie.set_property('config-file-path', CONFIG_INFER)
pipeline.add(pgie)
# Create Tracker
tracker = Gst.ElementFactory.make('nvtracker', 'tracker')
if not tracker:
sys.stderr.write('ERROR: Failed to create tracker\n')
sys.exit(1)
# Configure tracker properties
tracker.set_property('tracker-width', 640)
tracker.set_property('tracker-height', 384)
tracker.set_property('gpu_id', GPU_ID)
tracker.set_property('ll-lib-file', '/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so')
tracker.set_property('ll-config-file',
'/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_tracker_NvDCF_perf.yml')
tracker.set_property('display-tracking-id', 1)
pipeline.add(tracker)
# Create Secondary Inference (SGIE - Emotion Classification)
sgie = Gst.ElementFactory.make('nvinfer', 'secondary-inference')
if not sgie:
sys.stderr.write('ERROR: Failed to create SGIE\n')
sys.exit(1)
sgie.set_property('config-file-path', CONFIG_SGIE)
pipeline.add(sgie)
# Link Elements: StreamMuxer -> PGIE -> Tracker -> SGIE
streammux.link(pgie)
pgie.link(tracker)
tracker.link(sgie)
# Create Video Converter for General Conversion
converter = Gst.ElementFactory.make('nvvideoconvert', 'converter')
if not converter:
sys.stderr.write('ERROR: Failed to create converter\n')
sys.exit(1)
pipeline.add(converter)
# Create On-Screen Display (OSD)
osd = Gst.ElementFactory.make('nvdsosd', 'nvdsosd')
if not osd:
sys.stderr.write('ERROR: Failed to create nvdsosd\n')
sys.exit(1)
osd.set_property('process-mode', int(pyds.MODE_GPU))
pipeline.add(osd)
# Create Sink
sink = None
if is_aarch64():
sink = Gst.ElementFactory.make('nv3dsink', 'nv3d-sink')
if not sink:
sys.stderr.write('ERROR: Failed to create nv3dsink\n')
sys.exit(1)
else:
sink = Gst.ElementFactory.make('nveglglessink', 'nvvideo-renderer')
if not sink:
sys.stderr.write('ERROR: Failed to create nveglglessink\n')
sys.exit(1)
sink.set_property('async', 1)
sink.set_property('sync', 1)
sink.set_property('qos', 1)
pipeline.add(sink)
# Link SGIE -> Converter -> OSD -> Sink
sgie.link(converter)
converter.link(osd)
osd.link(sink)
# Configure StreamMuxer Properties
streammux.set_property('batch-size', STREAMMUX_BATCH_SIZE)
streammux.set_property('batched-push-timeout', 25000)
streammux.set_property('width', STREAMMUX_WIDTH)
streammux.set_property('height', STREAMMUX_HEIGHT)
streammux.set_property('enable-padding', 0)
streammux.set_property('live-source', 1)
streammux.set_property('attach-sys-ts', 1)
# Additional Property Configurations for PGIE and Tracker
if 'file://' in SOURCE:
streammux.set_property('live-source', 0)
if tracker.find_property('enable_batch_process') is not None:
tracker.set_property('enable_batch_process', 1)
if tracker.find_property('enable_past_frame') is not None:
tracker.set_property('enable_past_frame', 1)
if not is_aarch64():
streammux.set_property('nvbuf-memory-type', 0)
streammux.set_property('gpu_id', GPU_ID)
pgie.set_property('gpu_id', GPU_ID)
tracker.set_property('gpu_id', GPU_ID)
sgie.set_property('gpu_id', GPU_ID)
converter.set_property('gpu_id', GPU_ID)
osd.set_property('gpu_id', GPU_ID)
# Add Probe to Tracker's Source Pad
tracker_src_pad = tracker.get_static_pad('src')
if not tracker_src_pad:
sys.stderr.write('ERROR: Failed to get tracker src pad\n')
sys.exit(1)
else:
tracker_src_pad.add_probe(Gst.PadProbeType.BUFFER, tracker_src_pad_buffer_probe, 0)
# <<< ADDED: Also add a Probe to SGIE's Source Pad >>>
sgie_src_pad = sgie.get_static_pad('src')
if not sgie_src_pad:
sys.stderr.write('ERROR: Failed to get SGIE src pad\n')
sys.exit(1)
else:
sgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, sgie_src_pad_buffer_probe, 0)
# Add Bus Call Function
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect('message', bus_call, loop)
# Start Playing
pipeline.set_state(Gst.State.PLAYING)
sys.stdout.write('\n')
try:
loop.run()
except:
pass
# Cleanup
pipeline.set_state(Gst.State.NULL)
sys.stdout.write('\n')
def parse_args():
global SOURCE, CONFIG_INFER, CONFIG_SGIE, STREAMMUX_BATCH_SIZE, STREAMMUX_WIDTH, STREAMMUX_HEIGHT, GPU_ID,
PERF_MEASUREMENT_INTERVAL_SEC
parser = argparse.ArgumentParser(description='DeepStream Face Detection with Emotion Classification')
parser.add_argument('-s', '--source', required=True, help='Source stream/file')
parser.add_argument('-c', '--config-infer', required=True, help='Config infer file for PGIE')
parser.add_argument('-c_sgie', '--config-sgie', required=True, help='Config infer file for SGIE')
parser.add_argument('-b', '--streammux-batch-size', type=int, default=1, help='Streammux batch-size (default: 1)')
parser.add_argument('-w', '--streammux-width', type=int, default=1920, help='Streammux width (default: 1920)')
parser.add_argument('-e', '--streammux-height', type=int, default=1080, help='Streammux height (default: 1080)')
parser.add_argument('-g', '--gpu-id', type=int, default=0, help='GPU id (default: 0)')
parser.add_argument('-f', '--fps-interval', type=int, default=5, help='FPS measurement interval (default: 5)')
args = parser.parse_args()
if args.source == '':
sys.stderr.write('ERROR: Source not found\n')
sys.exit(1)
if args.config_infer == '' or not os.path.isfile(args.config_infer):
sys.stderr.write('ERROR: Config infer not found\n')
sys.exit(1)
if args.config_sgie == '' or not os.path.isfile(args.config_sgie):
sys.stderr.write('ERROR: Config infer for SGIE not found\n')
sys.exit(1)
SOURCE = args.source
CONFIG_INFER = args.config_infer
CONFIG_SGIE = args.config_sgie
STREAMMUX_BATCH_SIZE = args.streammux_batch_size
STREAMMUX_WIDTH = args.streammux_width
STREAMMUX_HEIGHT = args.streammux_height
GPU_ID = args.gpu_id
PERF_MEASUREMENT_INTERVAL_SEC = args.fps_interval
if name == ‘main’:
parse_args()
sys.exit(main())