Please debug and modify it yourself. This feature does not depend on any deepstream functionality.
This sample is based on deepstream_test_2.py. Clicking on the corresponding object will make the bounding box thicker.
#!/usr/bin/env python3
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2019-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import sys
sys.path.append("../")
import platform
import configparser
import gi
gi.require_version("Gst", "1.0")
gi.require_version("Gtk", "3.0")
gi.require_version("GdkX11", "3.0")
gi.require_version("GstVideo", "1.0")
from gi.repository import GLib, Gst, Gtk, Gdk, GdkX11, GstVideo
from common.platform_info import PlatformInfo
from common.bus_call import bus_call
import pyds
PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3
MUXER_BATCH_TIMEOUT_USEC = 33000
# Global statistics tracking
global_stats = {
"frame_count": 0,
"vehicle_count": 0,
"person_count": 0,
"bicycle_count": 0,
"roadsign_count": 0,
"total_objects": 0,
}
# Global variables for bbox selection
current_bboxes = [] # Store current frame bboxes
selected_object_id = None # ID of selected object
video_scale = 1.0 # Video scaling factor
video_offset_x = 0.0 # Video X offset in widget
video_offset_y = 0.0 # Video Y offset in widget
video_display_width = 0.0 # Actual video display width in widget
video_display_height = 0.0 # Actual video display height in widget
class DeepStreamGUI:
def __init__(self, pipeline=None):
self.pipeline = pipeline
self.is_playing = False
# Create the main window
self.window = Gtk.Window()
self.window.set_title("DeepStream Test2 - Auto-Play GUI")
self.window.set_default_size(1200, 800)
self.window.connect("delete-event", self.on_window_delete)
# Create main layout
main_box = Gtk.VBox(spacing=10)
self.window.add(main_box)
# Create control panel
control_frame = Gtk.Frame(label="Control Panel")
control_box = Gtk.HBox(spacing=10)
control_frame.add(control_box)
# File chooser button
self.file_button = Gtk.Button(label="Select File")
self.file_button.connect("clicked", self.on_file_choose_clicked)
control_box.pack_start(self.file_button, False, False, 0)
# Clear selection button
self.clear_button = Gtk.Button(label="Clear Selection")
self.clear_button.connect("clicked", self.on_clear_selection_clicked)
control_box.pack_start(self.clear_button, False, False, 0)
main_box.pack_start(control_frame, False, False, 0)
# Create video area
video_frame = Gtk.Frame(label="Video Display")
self.video_area = Gtk.DrawingArea()
self.video_area.set_size_request(640, 480)
self.video_area.set_double_buffered(
False
) # Important: disable double buffering for video overlay
# Enable mouse events for bbox selection
self.video_area.set_events(Gdk.EventMask.BUTTON_PRESS_MASK)
self.video_area.connect("button-press-event", self.on_video_click)
video_frame.add(self.video_area)
main_box.pack_start(video_frame, True, True, 0)
# Create statistics panel
stats_frame = Gtk.Frame(label="Statistics")
stats_box = Gtk.VBox(spacing=5)
stats_frame.add(stats_box)
# Statistics labels
self.frame_label = Gtk.Label(label="Frames: 0")
self.vehicle_label = Gtk.Label(label="Vehicles: 0")
self.person_label = Gtk.Label(label="Persons: 0")
self.bicycle_label = Gtk.Label(label="Bicycles: 0")
self.roadsign_label = Gtk.Label(label="Road Signs: 0")
self.total_label = Gtk.Label(label="Total Objects: 0")
# Add separator
separator = Gtk.Separator(orientation=Gtk.Orientation.HORIZONTAL)
# Selected object info
self.selected_label = Gtk.Label(label="Selected: None")
self.selected_label.set_markup("<b>Selected: None</b>")
stats_box.pack_start(self.frame_label, False, False, 0)
stats_box.pack_start(self.vehicle_label, False, False, 0)
stats_box.pack_start(self.person_label, False, False, 0)
stats_box.pack_start(self.bicycle_label, False, False, 0)
stats_box.pack_start(self.roadsign_label, False, False, 0)
stats_box.pack_start(self.total_label, False, False, 0)
stats_box.pack_start(separator, False, False, 5)
stats_box.pack_start(self.selected_label, False, False, 0)
main_box.pack_start(stats_frame, False, False, 0)
# Update timer
GLib.timeout_add(100, self.update_stats)
self.selected_file = None
def on_window_delete(self, widget, event):
if self.pipeline:
self.pipeline.set_state(Gst.State.NULL)
Gtk.main_quit()
return False
def on_file_choose_clicked(self, button):
dialog = Gtk.FileChooserDialog(
title="Select Video File",
parent=self.window,
action=Gtk.FileChooserAction.OPEN,
)
dialog.add_buttons(
Gtk.STOCK_CANCEL,
Gtk.ResponseType.CANCEL,
Gtk.STOCK_OPEN,
Gtk.ResponseType.OK,
)
# Add file filters
filter_video = Gtk.FileFilter()
filter_video.set_name("Video Files")
filter_video.add_mime_type("video/*")
filter_video.add_pattern("*.h264")
dialog.add_filter(filter_video)
response = dialog.run()
if response == Gtk.ResponseType.OK:
self.selected_file = dialog.get_filename()
print(f"Selected file: {self.selected_file}")
dialog.destroy()
def update_stats(self):
global selected_object_id, current_bboxes
self.frame_label.set_text(f"Frames: {global_stats['frame_count']}")
self.vehicle_label.set_text(f"Vehicles: {global_stats['vehicle_count']}")
self.person_label.set_text(f"Persons: {global_stats['person_count']}")
self.bicycle_label.set_text(f"Bicycles: {global_stats['bicycle_count']}")
self.roadsign_label.set_text(f"Road Signs: {global_stats['roadsign_count']}")
self.total_label.set_text(f"Total Objects: {global_stats['total_objects']}")
# Update selected object info
if selected_object_id is not None:
# Find the selected object in current bboxes
selected_info = None
for bbox_info in current_bboxes:
left, top, width, height, obj_id, class_id = bbox_info
if obj_id == selected_object_id:
selected_info = bbox_info
break
if selected_info:
left, top, width, height, obj_id, class_id = selected_info
class_names = {
PGIE_CLASS_ID_VEHICLE: "Vehicle",
PGIE_CLASS_ID_PERSON: "Person",
PGIE_CLASS_ID_BICYCLE: "Bicycle",
PGIE_CLASS_ID_ROADSIGN: "Road Sign",
}
class_name = class_names.get(class_id, "Unknown")
self.selected_label.set_markup(
f"<b>Selected: {class_name} (ID: {obj_id})</b>"
)
else:
self.selected_label.set_markup("<b>Selected: None (object lost)</b>")
else:
self.selected_label.set_markup("<b>Selected: None</b>")
return True
def on_video_click(self, widget, event):
"""Handle mouse click on video area for bbox selection"""
global selected_object_id, current_bboxes, video_scale, video_offset_x, video_offset_y
# Get click coordinates in widget
click_x = event.x
click_y = event.y
# Check if click is within the actual video display area
if (
click_x < video_offset_x
or click_x > video_offset_x + video_display_width
or click_y < video_offset_y
or click_y > video_offset_y + video_display_height
):
print(f"Click outside video area: ({click_x:.1f}, {click_y:.1f})")
return True
# Convert widget coordinates to video original coordinates (1920x1080)
# Step 1: Remove offset to get coordinates relative to video display area
video_display_x = click_x - video_offset_x
video_display_y = click_y - video_offset_y
# Step 2: Convert from video display area to original video coordinates
# video_display_width = 1920 * video_scale
# So: video_original_x = video_display_x * 1920 / video_display_width
# video_original_x = video_display_x / video_scale
video_original_x = (
(video_display_x * 1920.0) / video_display_width
if video_display_width > 0
else 0
)
video_original_y = (
(video_display_y * 1080.0) / video_display_height
if video_display_height > 0
else 0
)
print(f"Click at widget: ({click_x:.1f}, {click_y:.1f})")
print(f"Video display: ({video_display_x:.1f}, {video_display_y:.1f})")
print(
f"Video original (1920x1080): ({video_original_x:.1f}, {video_original_y:.1f})"
)
print(
f"Video area: offset=({video_offset_x:.1f}, {video_offset_y:.1f}), size=({video_display_width:.1f}x{video_display_height:.1f}), scale={video_scale:.3f}"
)
# Find bbox that contains the click point (bbox coordinates are in original video space)
selected_object_id = None
for bbox_info in current_bboxes:
left, top, width, height, obj_id, class_id = bbox_info
right = left + width
bottom = top + height
if left <= video_original_x <= right and top <= video_original_y <= bottom:
selected_object_id = obj_id
print(f"Selected object ID: {obj_id}, class: {class_id}")
print(
f"Object bbox (original coords): ({left:.1f}, {top:.1f}, {width:.1f}x{height:.1f})"
)
break
if selected_object_id is None:
print("No object selected, showing all bboxes")
return True
def on_clear_selection_clicked(self, button):
"""Clear the selected object and show all bboxes"""
global selected_object_id
selected_object_id = None
print("Selection cleared, showing all objects")
def show(self):
self.window.show_all()
def get_video_widget(self):
return self.video_area
def get_window_handle(self):
"""Get video_area window handle for GstVideoOverlay"""
# Ensure window is realized
if self.video_area.get_window():
return self.video_area.get_window().get_xid()
return None
def osd_sink_pad_buffer_probe(pad, info, u_data):
global global_stats, current_bboxes, selected_object_id
frame_number = 0
# Intiallizing object counter with 0.
obj_counter = {
PGIE_CLASS_ID_VEHICLE: 0,
PGIE_CLASS_ID_PERSON: 0,
PGIE_CLASS_ID_BICYCLE: 0,
PGIE_CLASS_ID_ROADSIGN: 0,
}
num_rects = 0
# Clear current bboxes for this frame
current_bboxes = []
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return
# Retrieve batch metadata from the gst_buffer
# Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
# C address of gst_buffer as input, which is obtained with hash(gst_buffer)
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
# Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
# The casting is done by pyds.NvDsFrameMeta.cast()
# The casting also keeps ownership of the underlying memory
# in the C code, so the Python garbage collector will leave
# it alone.
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
frame_number = frame_meta.frame_num
num_rects = frame_meta.num_obj_meta
l_obj = frame_meta.obj_meta_list
while l_obj is not None:
try:
# Casting l_obj.data to pyds.NvDsObjectMeta
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
except StopIteration:
break
obj_counter[obj_meta.class_id] += 1
# Store bbox information for click detection
bbox_info = (
obj_meta.rect_params.left,
obj_meta.rect_params.top,
obj_meta.rect_params.width,
obj_meta.rect_params.height,
obj_meta.object_id,
obj_meta.class_id,
)
current_bboxes.append(bbox_info)
# Control bbox visibility based on selection
if selected_object_id is not None:
if obj_meta.object_id == selected_object_id:
# Show selected object with highlighted color and thick border
obj_meta.rect_params.border_color.set(1.0, 0.0, 0.0, 1.0) # Red
obj_meta.rect_params.border_width = 6
obj_meta.text_params.font_params.font_color.set(
1.0, 0.0, 0.0, 1.0
) # Red text
else:
# Show other objects with normal colors but thinner border
if obj_meta.class_id == PGIE_CLASS_ID_PERSON:
obj_meta.rect_params.border_color.set(
0.0, 1.0, 0.0, 1.0
) # Green
elif obj_meta.class_id == PGIE_CLASS_ID_VEHICLE:
obj_meta.rect_params.border_color.set(
0.0, 0.0, 1.0, 1.0
) # Blue
elif obj_meta.class_id == PGIE_CLASS_ID_BICYCLE:
obj_meta.rect_params.border_color.set(
1.0, 1.0, 0.0, 1.0
) # Yellow
elif obj_meta.class_id == PGIE_CLASS_ID_ROADSIGN:
obj_meta.rect_params.border_color.set(
1.0, 0.0, 1.0, 1.0
) # Magenta
obj_meta.rect_params.border_width = (
1 # Thinner border for non-selected objects
)
else:
# Show all objects with default colors and normal border width
if obj_meta.class_id == PGIE_CLASS_ID_PERSON:
obj_meta.rect_params.border_color.set(0.0, 1.0, 0.0, 1.0) # Green
elif obj_meta.class_id == PGIE_CLASS_ID_VEHICLE:
obj_meta.rect_params.border_color.set(0.0, 0.0, 1.0, 1.0) # Blue
elif obj_meta.class_id == PGIE_CLASS_ID_BICYCLE:
obj_meta.rect_params.border_color.set(1.0, 1.0, 0.0, 1.0) # Yellow
elif obj_meta.class_id == PGIE_CLASS_ID_ROADSIGN:
obj_meta.rect_params.border_color.set(1.0, 0.0, 1.0, 1.0) # Magenta
obj_meta.rect_params.border_width = 2
try:
l_obj = l_obj.next
except StopIteration:
break
# Acquiring a display meta object. The memory ownership remains in
# the C code so downstream plugins can still access it. Otherwise
# the garbage collector will claim it when this probe function exits.
display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
display_meta.num_labels = 1
py_nvosd_text_params = display_meta.text_params[0]
# Setting display text to be shown on screen
# Note that the pyds module allocates a buffer for the string, and the
# memory will not be claimed by the garbage collector.
# Reading the display_text field here will return the C address of the
# allocated string. Use pyds.get_string() to get the string content.
py_nvosd_text_params.display_text = "Frame Number={} Number of Objects={} Vehicle_count={} Person_count={}".format(
frame_number,
num_rects,
obj_counter[PGIE_CLASS_ID_VEHICLE],
obj_counter[PGIE_CLASS_ID_PERSON],
)
# Now set the offsets where the string should appear
py_nvosd_text_params.x_offset = 10
py_nvosd_text_params.y_offset = 12
# Font , font-color and font-size
py_nvosd_text_params.font_params.font_name = "Serif"
py_nvosd_text_params.font_params.font_size = 10
# set(red, green, blue, alpha); set to White
py_nvosd_text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
# Text background color
py_nvosd_text_params.set_bg_clr = 1
# set(red, green, blue, alpha); set to Black
py_nvosd_text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
# Using pyds.get_string() to get display_text as string
print(pyds.get_string(py_nvosd_text_params.display_text))
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
try:
l_frame = l_frame.next
except StopIteration:
break
# past tracking meta data
l_user = batch_meta.batch_user_meta_list
while l_user is not None:
try:
# Note that l_user.data needs a cast to pyds.NvDsUserMeta
# The casting is done by pyds.NvDsUserMeta.cast()
# The casting also keeps ownership of the underlying memory
# in the C code, so the Python garbage collector will leave
# it alone
user_meta = pyds.NvDsUserMeta.cast(l_user.data)
except StopIteration:
break
if (
user_meta
and user_meta.base_meta.meta_type
== pyds.NvDsMetaType.NVDS_TRACKER_PAST_FRAME_META
):
try:
# Note that user_meta.user_meta_data needs a cast to pyds.NvDsTargetMiscDataBatch
# The casting is done by pyds.NvDsTargetMiscDataBatch.cast()
# The casting also keeps ownership of the underlying memory
# in the C code, so the Python garbage collector will leave
# it alone
pPastDataBatch = pyds.NvDsTargetMiscDataBatch.cast(
user_meta.user_meta_data
)
except StopIteration:
break
for miscDataStream in pyds.NvDsTargetMiscDataBatch.list(pPastDataBatch):
print("streamId=", miscDataStream.streamID)
print("surfaceStreamID=", miscDataStream.surfaceStreamID)
for miscDataObj in pyds.NvDsTargetMiscDataStream.list(miscDataStream):
print("numobj=", miscDataObj.numObj)
print("uniqueId=", miscDataObj.uniqueId)
print("classId=", miscDataObj.classId)
print("objLabel=", miscDataObj.objLabel)
for miscDataFrame in pyds.NvDsTargetMiscDataObject.list(
miscDataObj
):
print("frameNum:", miscDataFrame.frameNum)
print("tBbox.left:", miscDataFrame.tBbox.left)
print("tBbox.width:", miscDataFrame.tBbox.width)
print("tBbox.top:", miscDataFrame.tBbox.top)
print("tBbox.right:", miscDataFrame.tBbox.height)
print("confidence:", miscDataFrame.confidence)
print("age:", miscDataFrame.age)
try:
l_user = l_user.next
except StopIteration:
break
# Update global statistics
global_stats["frame_count"] = frame_number
global_stats["vehicle_count"] = obj_counter[PGIE_CLASS_ID_VEHICLE]
global_stats["person_count"] = obj_counter[PGIE_CLASS_ID_PERSON]
global_stats["bicycle_count"] = obj_counter[PGIE_CLASS_ID_BICYCLE]
global_stats["roadsign_count"] = obj_counter[PGIE_CLASS_ID_ROADSIGN]
global_stats["total_objects"] = num_rects
return Gst.PadProbeReturn.OK
def custom_bus_call(bus, message, loop, gui, sink):
"""
Custom bus callback function for handling GstVideoOverlay window setup
"""
t = message.type
if t == Gst.MessageType.EOS:
print("End-of-stream\n")
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
print("Warning: %s: %s\n" % (err, debug))
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print("Error: %s: %s\n" % (err, debug))
loop.quit()
elif t == Gst.MessageType.ELEMENT:
# Handle GstVideoOverlay prepare-window-handle message
if message.get_structure().get_name() == "prepare-window-handle":
print("Setting window handle for video overlay...")
# Wait a bit to ensure window is realized
GLib.timeout_add(100, set_window_handle, gui, sink)
return True
def set_window_handle(gui, sink):
"""Set window handle for video overlay"""
window_handle = gui.get_window_handle()
if window_handle:
print(f"Setting window handle: {window_handle}")
sink.set_window_handle(window_handle)
return False # Execute only once
else:
print("Window handle not ready, retrying...")
return True # Continue retrying
def on_sync_message(bus, message, gui, sink):
"""Handle sync messages, especially prepare-window-handle"""
if message.get_structure() is None:
return
message_name = message.get_structure().get_name()
if message_name == "prepare-window-handle":
print("Received prepare-window-handle, setting up video overlay...")
# Delay setting window handle to ensure GTK window is fully realized
GLib.timeout_add(200, set_window_handle, gui, sink)
def main():
# Initialize GUI for file selection
Gtk.init(None)
gui = DeepStreamGUI()
gui.show()
dialog = Gtk.MessageDialog(
parent=gui.window,
flags=0,
message_type=Gtk.MessageType.INFO,
buttons=Gtk.ButtonsType.OK,
text="Please use the GUI to select a video file",
)
dialog.run()
dialog.destroy()
# Wait for file selection or quit
while not gui.selected_file:
Gtk.main_iteration_do(False)
if not gui.window.get_visible():
sys.exit(1)
input_file = gui.selected_file
platform_info = PlatformInfo()
# Standard GStreamer initialization
Gst.init(None)
# Create gstreamer elements
# Create Pipeline element that will form a connection of other elements
print("Creating Pipeline \n ")
pipeline = Gst.Pipeline()
if not pipeline:
sys.stderr.write(" Unable to create Pipeline \n")
# Source element for reading from the file
print("Creating Source \n ")
source = Gst.ElementFactory.make("filesrc", "file-source")
if not source:
sys.stderr.write(" Unable to create Source \n")
# Since the data format in the input file is elementary h264 stream,
# we need a h264parser
print("Creating H264Parser \n")
h264parser = Gst.ElementFactory.make("h264parse", "h264-parser")
if not h264parser:
sys.stderr.write(" Unable to create h264 parser \n")
# Use nvdec_h264 for hardware accelerated decode on GPU
print("Creating Decoder \n")
decoder = Gst.ElementFactory.make("nvv4l2decoder", "nvv4l2-decoder")
if not decoder:
sys.stderr.write(" Unable to create Nvv4l2 Decoder \n")
# Create nvstreammux instance to form batches from one or more sources.
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
sys.stderr.write(" Unable to create NvStreamMux \n")
# Use nvinfer to run inferencing on decoder's output,
# behaviour of inferencing is set through config file
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
if not pgie:
sys.stderr.write(" Unable to create pgie \n")
tracker = Gst.ElementFactory.make("nvtracker", "tracker")
if not tracker:
sys.stderr.write(" Unable to create tracker \n")
sgie1 = Gst.ElementFactory.make("nvinfer", "secondary1-nvinference-engine")
if not sgie1:
sys.stderr.write(" Unable to make sgie1 \n")
sgie2 = Gst.ElementFactory.make("nvinfer", "secondary2-nvinference-engine")
if not sgie2:
sys.stderr.write(" Unable to make sgie2 \n")
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
if not nvvidconv:
sys.stderr.write(" Unable to create nvvidconv \n")
# Create OSD to draw on the converted RGBA buffer
nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
if not nvosd:
sys.stderr.write(" Unable to create nvosd \n")
# Create video sink using GstVideoOverlay for better compatibility
print("Creating video sink with GstVideoOverlay \n")
sink = None
if platform_info.is_integrated_gpu():
print("Using nv3dsink for integrated GPU \n")
sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
if not sink:
sys.stderr.write(" Unable to create nv3dsink \n")
else:
if platform_info.is_platform_aarch64():
print("Using nv3dsink for aarch64 platform \n")
sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
else:
print("Using nveglglessink for x86 platform \n")
sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
if not sink:
print("nveglglessink not available, trying nv3dsink \n")
sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
if not sink:
sys.stderr.write(" Unable to create any video sink \n")
return -1
if not isinstance(sink, GstVideo.VideoOverlay):
print("Warning: Sink does not support VideoOverlay interface")
else:
print("Sink supports VideoOverlay interface")
print("Playing file %s " % input_file)
source.set_property("location", input_file)
# Set GUI pipeline reference
gui.pipeline = pipeline
streammux.set_property("width", 1920)
streammux.set_property("height", 1080)
streammux.set_property("batch-size", 1)
streammux.set_property("batched-push-timeout", MUXER_BATCH_TIMEOUT_USEC)
# Set properties of pgie and sgie
pgie.set_property("config-file-path", "dstest2_pgie_config.txt")
sgie1.set_property("config-file-path", "dstest2_sgie1_config.txt")
sgie2.set_property("config-file-path", "dstest2_sgie2_config.txt")
# Set properties of tracker
config = configparser.ConfigParser()
config.read("dstest2_tracker_config.txt")
config.sections()
for key in config["tracker"]:
if key == "tracker-width":
tracker_width = config.getint("tracker", key)
tracker.set_property("tracker-width", tracker_width)
if key == "tracker-height":
tracker_height = config.getint("tracker", key)
tracker.set_property("tracker-height", tracker_height)
if key == "gpu-id":
tracker_gpu_id = config.getint("tracker", key)
tracker.set_property("gpu_id", tracker_gpu_id)
if key == "ll-lib-file":
tracker_ll_lib_file = config.get("tracker", key)
tracker.set_property("ll-lib-file", tracker_ll_lib_file)
if key == "ll-config-file":
tracker_ll_config_file = config.get("tracker", key)
tracker.set_property("ll-config-file", tracker_ll_config_file)
print("Adding elements to Pipeline \n")
pipeline.add(source)
pipeline.add(h264parser)
pipeline.add(decoder)
pipeline.add(streammux)
pipeline.add(pgie)
pipeline.add(tracker)
pipeline.add(sgie1)
pipeline.add(sgie2)
pipeline.add(nvvidconv)
pipeline.add(nvosd)
pipeline.add(sink)
# we link the elements together
# file-source -> h264-parser -> nvh264-decoder ->
# nvinfer -> nvvidconv -> nvosd -> videoconvert -> video-renderer
print("Linking elements in the Pipeline \n")
source.link(h264parser)
h264parser.link(decoder)
sinkpad = streammux.request_pad_simple("sink_0")
if not sinkpad:
sys.stderr.write(" Unable to get the sink pad of streammux \n")
srcpad = decoder.get_static_pad("src")
if not srcpad:
sys.stderr.write(" Unable to get source pad of decoder \n")
srcpad.link(sinkpad)
streammux.link(pgie)
pgie.link(tracker)
tracker.link(sgie1)
sgie1.link(sgie2)
sgie2.link(nvvidconv)
nvvidconv.link(nvosd)
nvosd.link(sink)
# create and event loop and feed gstreamer bus mesages to it
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", custom_bus_call, loop, gui, sink)
bus.enable_sync_message_emission()
bus.connect("sync-message::element", on_sync_message, gui, sink)
# Lets add probe to get informed of the meta data generated, we add probe to
# the sink pad of the osd element, since by that time, the buffer would have
# had got all the metadata.
osdsinkpad = nvosd.get_static_pad("sink")
if not osdsinkpad:
sys.stderr.write(" Unable to get sink pad of nvosd \n")
osdsinkpad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0)
# Show GUI
gui.show()
# Calculate video to widget scaling factors and positioning
def update_scaling_factors():
global video_scale, video_offset_x, video_offset_y, video_display_width, video_display_height
widget_width = gui.video_area.get_allocated_width()
widget_height = gui.video_area.get_allocated_height()
# Video dimensions are 1920x1080 (set in streammux)
video_width = 1920.0
video_height = 1080.0
if widget_width > 0 and widget_height > 0:
# Calculate scaling to fit video in widget while maintaining aspect ratio
scale_x = widget_width / video_width
scale_y = widget_height / video_height
video_scale = min(
scale_x, scale_y
) # Use uniform scaling to maintain aspect ratio
# Calculate actual video display size after scaling
video_display_width = video_width * video_scale
video_display_height = video_height * video_scale
# Calculate offset to center the video in the widget
video_offset_x = (widget_width - video_display_width) / 2.0
video_offset_y = (widget_height - video_display_height) / 2.0
print(f"Widget: {widget_width}x{widget_height}")
print(
f"Video display: {video_display_width:.1f}x{video_display_height:.1f}"
)
print(f"Offset: ({video_offset_x:.1f}, {video_offset_y:.1f})")
print(f"Scale: {video_scale:.3f}")
return True # Continue calling this function
# Update scaling factors periodically
GLib.timeout_add(1000, update_scaling_factors)
# Ensure GTK window is fully realized before starting pipeline
def start_pipeline():
print("Starting pipeline - Auto-play enabled \n")
# start play back and play to completion
pipeline.set_state(Gst.State.PLAYING)
print("Video will play automatically to completion")
return False # Execute only once
# Delay pipeline start to ensure window is realized
GLib.timeout_add(500, start_pipeline)
try:
# Use GTK main loop instead of GLib.MainLoop
Gtk.main()
except:
pass
# cleanup
pipeline.set_state(Gst.State.NULL)
if __name__ == "__main__":
sys.exit(main())