This is my pipeline in Python.
#!/usr/bin/env python3
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2019-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import sys
sys.path.append(
"/opt/nvidia/deepstream/deepstream-7.0/sources/deepstream_python_apps/apps"
)
import os
import gi
import cv2
import argparse
gi.require_version("Gst", "1.0")
from gi.repository import GLib, Gst
from common.platform_info import PlatformInfo
from common.bus_call import bus_call
from common.FPS import PERF_DATA
from termcolor import cprint
from pathlib import Path
import configparser
import pandas as pd
import datetime
import pyds
import glob
import numpy as np
from camera import CAMERA
from analysis import Analysis
from utils import buffer_probe, get_env_bool
from video_utils import VideoLoader
from dotenv import load_dotenv
load_dotenv(override=True)
LEFT_VIDEO_PATH = os.getenv('LEFT_VIDEO_PATH')
RIGHT_VIDEO_PATH = os.getenv('RIGHT_VIDEO_PATH')
print(LEFT_VIDEO_PATH)
print(RIGHT_VIDEO_PATH)
current_date = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
perf_data = None
MUXER_BATCH_TIMEOUT_USEC = 40000
WIDTH = 800
HEIGHT = 600
FPS = 500
BATCH_SIZE = 8
QUEUE_MAX_SIZE_BUFFER = 4294967295
QUEUE_MAX_SIZE_BYTES = 10485760
QUEUE_MAX_SIZE_TIME = 1000000000
IS_REALTIME = get_env_bool("IS_REALTIME")
IS_TRACKER_ENABLED = get_env_bool("IS_TRACKER_ENABLED")
IS_RECORDING = get_env_bool("IS_RECORDING")
IS_LOOPING = get_env_bool("IS_LOOPING")
IS_RECORDING_INPUT = get_env_bool("IS_RECORDING_INPUT")
images = glob.glob("images/*.jpg")
dummy_frames = [cv2.imread(img) for img in images]
dummy_frames = [cv2.resize(frame, (800, 600)) for frame in dummy_frames]
# Dictionary to store timestamps for buffers entering nvvideoconvert
timestamps = {}
def sink_pad_probe_nvvideoconvert(pad, info, u_data):
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return Gst.PadProbeReturn.OK
# Get the buffer PTS (Presentation Timestamp)
buffer_pts = gst_buffer.pts
# Store the timestamp with the buffer PTS as the key
timestamps[buffer_pts] = time.time()
return Gst.PadProbeReturn.OK
def src_pad_probe_nvvideoconvert(pad, info, u_data):
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return Gst.PadProbeReturn.OK
# Get the buffer PTS (Presentation Timestamp)
buffer_pts = gst_buffer.pts
# Calculate the latency
entry_time = timestamps.pop(buffer_pts, None)
if entry_time is not None:
exit_time = time.time()
latency = exit_time - entry_time
print(f"nvvideoconvert latency: {latency:.6f} seconds")
return Gst.PadProbeReturn.OK
# Define the callback for keyboard input
def on_keyboard_input(fd, condition, pipeline):
if condition == GLib.IOCondition.IN:
input_char = os.read(fd, 1).decode('utf-8')
print(f"Input received: {input_char}")
if input_char == 'q':
print("Keyboard interrupt received, quitting...")
pipeline.send_event(Gst.Event.new_eos())
return False # Remove the event source
return True
def osd_sink_pad_buffer_probe(pad, info, nvinfer):
frame_number = 0
num_rects = 0
is_ball_detected=False
ball_detected_timer = 0
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return
# Retrieve batch metadata from the gst_buffer
# Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
# C address of gst_buffer as input, which is obtained with hash(gst_buffer)
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
# Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
# The casting is done by pyds.NvDsFrameMeta.cast()
# The casting also keeps ownership of the underlying memory
# in the C code, so the Python garbage collector will leave
# it alone.
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
frame_number = frame_meta.frame_num
num_rects = frame_meta.num_obj_meta
raw_detection_data = []
l_obj = frame_meta.obj_meta_list
source_id = frame_meta.source_id
while l_obj is not None:
try:
# Casting l_obj.data to pyds.NvDsObjectMeta
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
except StopIteration:
break
uniqueId = obj_meta.object_id
classId = obj_meta.class_id
objLabel = obj_meta.obj_label
tBbox = obj_meta.detector_bbox_info.org_bbox_coords
confidence = obj_meta.confidence
left = tBbox.left
width = tBbox.width
top = tBbox.top
height = tBbox.height
center_x = left + width / 2
center_y = top + height / 2
raw_detection_data.append(
[
uniqueId,
classId,
objLabel,
left,
width,
top,
height,
confidence,
]
)
# obj_counter[obj_meta.class_id] += 1
obj_meta.rect_params.border_color.set(
0.0, 0.0, 1.0, 0.8
) # 0.8 is alpha (opacity)
try:
l_obj = l_obj.next
except StopIteration:
break
# Acquiring a display meta object. The memory ownership remains in
# the C code so downstream plugins can still access it. Otherwise
# the garbage collector will claim it when this probe function exits.
display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
display_meta.num_labels = 1
py_nvosd_text_params = display_meta.text_params[0]
# Setting display text to be shown on screen
# Note that the pyds module allocates a buffer for the string, and the
# memory will not be claimed by the garbage collector.
# Reading the display_text field here will return the C address of the
# allocated string. Use pyds.get_string() to get the string content.
py_nvosd_text_params.display_text = (
"Frame Number={} Number of Objects={}".format(
frame_number,
num_rects,
)
)
# Now set the offsets where the string should appear
py_nvosd_text_params.x_offset = 10
py_nvosd_text_params.y_offset = 12
# Font , font-color and font-size
py_nvosd_text_params.font_params.font_name = "Serif"
py_nvosd_text_params.font_params.font_size = 10
# set(red, green, blue, alpha); set to White
py_nvosd_text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
# Text background color
py_nvosd_text_params.set_bg_clr = 1
# set(red, green, blue, alpha); set to Black
py_nvosd_text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
# Using pyds.get_string() to get display_text as string
# print(pyds.get_string(py_nvosd_text_params.display_text))
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
# Update frame rate through this probe
stream_index = "stream{0}".format(frame_meta.pad_index)
global perf_data
perf_data.update_fps(stream_index)
df = pd.DataFrame(
raw_detection_data,
columns=[
"uniqueId",
"classId",
"objLabel",
"left",
"width",
"top",
"height",
"confidence",
],
)
filepath = f"./backup/{current_date}/detection/{source_id}_{frame_number}.csv"
df.to_csv(filepath, index=False)
try:
l_frame = l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
# Initialize the Analysis class
analysis = Analysis(None, None, None, None, None, None)
def tracker_src_pad_buffer_probe(pad, info, u_data):
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
num_frames_in_batch = batch_meta.num_frames_in_batch
# print("==============================")
# print("num_frames_in_batch=",num_frames_in_batch)
while l_frame is not None:
try:
# Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
# The casting is done by pyds.NvDsFrameMeta.cast()
# The casting also keeps ownership of the underlying memory
# in the C code, so the Python garbage collector will leave
# it alone.
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
frame_number = frame_meta.frame_num
l_obj = frame_meta.obj_meta_list
num_rects = frame_meta.num_obj_meta
source_id = frame_meta.source_id
pad_index = frame_meta.pad_index
# print("frame_number=",frame_number)
# print("num_rects=",num_rects)
# print("source_id=",source_id)
# print("pad_index=",pad_index)
raw_tracking_data = []
while l_obj is not None:
try:
# Casting l_obj.data to pyds.NvDsObjectMeta
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
except StopIteration:
break
uniqueId = obj_meta.object_id
classId = obj_meta.class_id
objLabel = obj_meta.obj_label
tBbox = obj_meta.tracker_bbox_info.org_bbox_coords
confidence = obj_meta.confidence
tracker_confidence = (
obj_meta.tracker_confidence
) # This is assuming you want to use the tracker confidence
left = tBbox.left
width = tBbox.width
top = tBbox.top
height = tBbox.height
center_x = left + width / 2
center_y = top + height / 2
raw_tracking_data.append(
[
uniqueId,
classId,
objLabel,
left,
width,
top,
height,
confidence,
tracker_confidence,
]
)
analysis.add_to_trajectory(source_id, uniqueId, frame_number, center_x, center_y)
try:
l_obj = l_obj.next
except StopIteration:
break
df = pd.DataFrame(
raw_tracking_data,
columns=[
"uniqueId",
"classId",
"objLabel",
"left",
"width",
"top",
"height",
"confidence",
"age",
],
)
# print(df)
filepath = f"./backup/{current_date}/tracking/{source_id}_{frame_number}.csv"
df.to_csv(filepath, index=False)
try:
l_frame = l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
def on_demux_pad_added(demuxer, pad, bin, streammux, idx):
demuxer = bin["demuxer"]
capsfilter = bin["filter"]
videoconvert = bin["video-converter"]
queue = bin["queue"]
caps = pad.get_current_caps()
name = caps.to_string()
if name.startswith("video/x-raw"):
if demuxer.link(queue):
cprint("demuxer -> queue link successful", "green")
if queue.link(capsfilter):
cprint("queue -> capsfilter link successful", "green")
if capsfilter.link(videoconvert):
cprint("capsfilter -> videoconvert link successful", "green")
srcpad_videoconvert = videoconvert.get_static_pad("src")
sinkpad_streammux = streammux.request_pad_simple(f"sink_{idx}")
if srcpad_videoconvert.link(sinkpad_streammux) == Gst.PadLinkReturn.OK:
cprint("videoconvert -> streammux link successful", "green")
else:
cprint("Failed to link videoconvert -> streammux", "red")
else:
cprint("Unknown format %s" % name, "red")
images = glob.glob("images/*.jpg")
dummy_frames = [cv2.imread(img) for img in images]
dummy_frames = [cv2.resize(frame, (WIDTH, HEIGHT)) for frame in dummy_frames]
import time
records = []
def cb_need_data(appsource, _pad, video_loader, video_idx):
if type(video_loader) == CAMERA:
start = time.time()
frame = video_loader.capture_frames()
end = time.time()
if video_idx == 0:
latency = end - start
records.append([start, end, latency])
else:
frame = video_loader.get_next_frame()
if frame is None:
appsource.emit("end-of-stream")
else:
appsource.emit("push-buffer", Gst.Buffer.new_wrapped(frame.tobytes()))
def main(args):
tracking_dir = Path(f"./backup/{current_date}/tracking/")
tracking_dir.mkdir(parents=True, exist_ok=True)
detection_dir = Path(f"./backup/{current_date}/detection/")
detection_dir.mkdir(parents=True, exist_ok=True)
video_dir = Path(f"./backup/{current_date}/videos/")
video_dir.mkdir(parents=True, exist_ok=True)
raw_dir = Path(f"./backup/{current_date}/raw/")
raw_dir.mkdir(parents=True, exist_ok=True)
config_path = args.config
# Standard GStreamer initialization
Gst.init(None)
global perf_data
perf_data = PERF_DATA(2)
# Create gstreamer elements
cprint("Creating Pipeline ...", "yellow")
pipeline = Gst.Pipeline()
if not pipeline:
cprint("Unable to create Pipeline", "red")
"""
Cameras
"""
cam1 = {
"source": Gst.ElementFactory.make("appsrc", "cam1"),
"queue": Gst.ElementFactory.make("queue", "cam1_queue"),
"filter": Gst.ElementFactory.make("capsfilter", "cam1_filter"),
"video-converter": Gst.ElementFactory.make("nvvideoconvert", "cam1_vidconv"),
"tee": Gst.ElementFactory.make("tee", "cam1_tee"),
}
cam2 = {
"source": Gst.ElementFactory.make("appsrc", "cam2"),
"queue": Gst.ElementFactory.make("queue", "cam2_queue"),
"filter": Gst.ElementFactory.make("capsfilter", "cam2_filter"),
"video-converter": Gst.ElementFactory.make("nvvideoconvert", "cam2_vidconv"),
"tee": Gst.ElementFactory.make("tee", "cam2_tee"),
}
cam1_record = {
"encoder": Gst.ElementFactory.make("nvv4l2h264enc", "cam1_encoder"),
"parser": Gst.ElementFactory.make("h264parse", "cam1_parser"),
"muxer": Gst.ElementFactory.make("qtmux", "cam1_muxer"),
"sink": Gst.ElementFactory.make("filesink", "cam1_sink"),
}
cam2_record = {
"encoder": Gst.ElementFactory.make("nvv4l2h264enc", "cam2_encoder"),
"parser": Gst.ElementFactory.make("h264parse", "cam2_parser"),
"muxer": Gst.ElementFactory.make("qtmux", "cam2_muxer"),
"sink": Gst.ElementFactory.make("filesink", "cam2_sink"),
}
if not (cam2["source"] and cam2["video-converter"] and cam2["filter"] and cam2["queue"]):
cprint("Unable to create Source", "red")
if not (cam1["source"] and cam1["video-converter"]and cam1["filter"] and cam1["queue"]):
cprint("Unable to create Source", "red")
cprint("Adding elements to Pipeline ...", "yellow")
for cam_id, cam in enumerate([cam1, cam2]):
if not pipeline.add(cam["source"]):
cprint("Failed to add source to pipeline", "red")
else:
caps = Gst.Caps.from_string("video/x-raw, format=(string)BGR, width=(int)%d, height=(int)%d, framerate=(fraction)%d/1" % (WIDTH, HEIGHT, FPS))
print(caps.to_string())
cam["source"].set_property("caps", caps)
# cam["source"].set_property('is-live', True)
# cam["source"].set_property('block', True)
# cam["source"].set_property('max-bytes', 200000)
# cam["source"].set_property('do-timestamp', True)
# cam["source"].set_property("format", Gst.Format.TIME)
if not pipeline.add(cam["queue"]):
cprint("Failed to add queue to pipeline", "red")
else:
cam["queue"].set_property("max-size-buffers", QUEUE_MAX_SIZE_BUFFER)
cam["queue"].set_property("max-size-bytes", QUEUE_MAX_SIZE_BYTES)
cam["queue"].set_property("max-size-time", QUEUE_MAX_SIZE_TIME)
if not pipeline.add(cam["video-converter"]):
cprint("Failed to add video-converter to pipeline", "red")
# else:
# if cam_id == 0:
# nvvidconv = cam["video-converter"]
# # Attach probes to the sink and src pads of nvvideoconvert
# nvvidconv_sink_pad = nvvidconv.get_static_pad("sink")
# if not nvvidconv_sink_pad:
# cprint("Unable to get sink pad of nvvideoconvert", "red")
# else:
# nvvidconv_sink_pad.add_probe(Gst.PadProbeType.BUFFER, sink_pad_probe_nvvideoconvert, 0)
# nvvidconv_src_pad = nvvidconv.get_static_pad("src")
# if not nvvidconv_src_pad:
# cprint("Unable to get src pad of nvvideoconvert", "red")
# else:
# nvvidconv_src_pad.add_probe(Gst.PadProbeType.BUFFER, src_pad_probe_nvvideoconvert, 0)
if not pipeline.add(cam["filter"]):
cprint("Failed to add capsfilter to pipeline", "red")
else:
caps = Gst.Caps.from_string("video/x-raw(memory:NVMM),format=NV12,width=%d,height=%d,framerate=%d/1" % (WIDTH, HEIGHT, FPS))
print(caps.to_string())
cam["filter"].set_property("caps", caps)
if not pipeline.add(cam["tee"]):
cprint("Failed to add tee to pipeline", "red")
if IS_RECORDING_INPUT:
for i,cam in enumerate([cam1_record, cam2_record]):
if not pipeline.add(cam["encoder"]):
cprint("Failed to add encoder to pipeline", "red")
else:
cam["encoder"].set_property("bitrate", 800000000)
if not pipeline.add(cam["parser"]):
cprint("Failed to add parser to pipeline", "red")
if not pipeline.add(cam["muxer"]):
cprint("Failed to add muxer to pipeline", "red")
if not pipeline.add(cam["sink"]):
cprint("Failed to add sink to pipeline", "red")
else:
cam["sink"].set_property("location", f"{raw_dir}/output_{i}.mp4")
cam["sink"].set_property("max-bitrate", 800000000)
cam["sink"].set_property("sync", 0)
"""
Streammux & Caps
"""
# Create nvstreammux instance to form batches from one or more sources.
streammux = Gst.ElementFactory.make("nvstreammux", "nvstreammux")
if not streammux:
cprint("Unable to create NvStreamMux", "red")
streammux.set_property("width", WIDTH)
streammux.set_property("height", HEIGHT)
streammux.set_property("batch-size", BATCH_SIZE)
streammux.set_property("nvbuf-memory-type", 2)
streammux.set_property("batched-push-timeout", 10000)
# streammux.set_property("buffer-pool-size", 16)
# streammux.set_property("sync-inputs", True)
"""
Nvinfer
"""
# Use nvinfer to run inferencing on decoder's output,
# behaviour of inferencing is set through config file
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
if not pgie:
cprint(" Unable to create pgie", "red")
pgie.set_property("config-file-path", config_path)
pgie.set_property("batch-size", BATCH_SIZE)
"""
Nvtracker
"""
tracker = Gst.ElementFactory.make("nvtracker", "tracker")
if not tracker:
cprint(" Unable to create tracker")
# Set properties of tracker
config = configparser.ConfigParser()
config.read("./config_tracker.txt")
config.sections()
for key in config["tracker"]:
if key == "tracker-width":
tracker_width = config.getint("tracker", key)
tracker.set_property("tracker-width", tracker_width)
if key == "tracker-height":
tracker_height = config.getint("tracker", key)
tracker.set_property("tracker-height", tracker_height)
if key == "gpu-id":
tracker_gpu_id = config.getint("tracker", key)
tracker.set_property("gpu_id", tracker_gpu_id)
if key == "ll-lib-file":
tracker_ll_lib_file = config.get("tracker", key)
tracker.set_property("ll-lib-file", tracker_ll_lib_file)
if key == "ll-config-file":
tracker_ll_config_file = config.get("tracker", key)
tracker.set_property("ll-config-file", tracker_ll_config_file)
"""
Nvtiler
"""
print("Creating nvdslogger ...")
nvdslogger = Gst.ElementFactory.make("nvdslogger", "nvdslogger")
if not nvdslogger:
cprint(" Unable to create nvdslogger", "red")
print("Creating tiler ...")
tiler = Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
if not tiler:
cprint(" Unable to create tiler", "red")
number_sources = 2
tiler_rows = 2
TILED_OUTPUT_WIDTH = WIDTH
TILED_OUTPUT_HEIGHT = HEIGHT * 2
tiler_columns = int(number_sources / tiler_rows)
tiler.set_property("rows", tiler_rows)
tiler.set_property("columns", tiler_columns)
tiler.set_property("width", TILED_OUTPUT_WIDTH)
tiler.set_property("height", TILED_OUTPUT_HEIGHT)
print("Creating nvvidconv ...")
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
if not nvvidconv:
cprint("Unable to create nvvidconv", "red")
print("Creating nvosd ...")
nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
if not nvosd:
cprint("Unable to create nvosd", "red")
# nvosd.set_property("display-text", 0)
print("Creating EGLSink ...")
# sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
sink = Gst.ElementFactory.make("fakesink", "fakesink")
if not sink:
sys.stderr.write(" Unable to create egl sink")
sink.set_property("qos", 0)
sink.set_property("sync", 0)
nvdsosd_tee = Gst.ElementFactory.make("tee", "nvdsosd_tee")
if not nvdsosd_tee:
cprint("Unable to create nvdsosd_tee", "red")
queue_tee_0 = Gst.ElementFactory.make("queue", "queue_tee_0")
queue_tee_1 = Gst.ElementFactory.make("queue", "queue_tee_1")
if not queue_tee_0:
cprint("Unable to create queue_tee_0", "red")
if not queue_tee_1:
cprint("Unable to create queue_tee_1", "red")
queue_streammux_pgie = Gst.ElementFactory.make("queue", "queue1")
qeueu_pgie_nvtracker = Gst.ElementFactory.make("queue", "queue2")
queue_tracker_tiler = Gst.ElementFactory.make("queue", "queue3")
for queue in [queue_streammux_pgie, qeueu_pgie_nvtracker, queue_tracker_tiler, queue_tee_0, queue_tee_1]:
queue.set_property("max-size-buffers", QUEUE_MAX_SIZE_BUFFER)
queue.set_property("max-size-bytes", QUEUE_MAX_SIZE_BYTES)
queue.set_property("max-size-time", QUEUE_MAX_SIZE_TIME)
pipeline.add(queue_streammux_pgie)
pipeline.add(qeueu_pgie_nvtracker)
pipeline.add(queue_tracker_tiler)
pipeline.add(queue_tee_0)
pipeline.add(queue_tee_1)
"""
Adding elements to pipeline
"""
pipeline.add(streammux)
pipeline.add(pgie)
pipeline.add(tracker)
pipeline.add(nvdslogger)
pipeline.add(tiler)
pipeline.add(nvvidconv)
pipeline.add(nvosd)
pipeline.add(nvdsosd_tee)
pipeline.add(sink)
"""
Camera (1) - Queue & Tee
"""
# is_recording = False
# source -> queue
if not cam1["source"].link(cam1["queue"]):
cprint("Failed to link source to queue", "red")
if not cam2["source"].link(cam2["queue"]):
cprint("Failed to link queue_main_cam2 to demuxer", "red")
# queue -> video-converter
if not cam1["queue"].link(cam1["video-converter"]):
cprint("Failed to link queue to video-converter", "red")
if not cam2["queue"].link(cam2["video-converter"]):
cprint("Failed to link queue to video-converter", "red")
# video-converter -> capsfilter
if not cam1["video-converter"].link(cam1["filter"]):
cprint("Failed to link video-converter to capsfilter", "red")
if not cam2["video-converter"].link(cam2["filter"]):
cprint("Failed to link video-converter to capsfilter", "red")
# capsfilter -> tee
if not cam1["filter"].link(cam1["tee"]):
cprint("Failed to link capsfilter to tee", "red")
if not cam2["filter"].link(cam2["tee"]):
cprint("Failed to link capsfilter to tee", "red")
# tee -> streammux
cam1_tee_streammux = cam1["tee"].get_request_pad("src_%u")
cam1_tee_file = cam1["tee"].get_request_pad("src_%u")
if not (cam1_tee_streammux and cam1_tee_file):
cprint("Unable to get the sink pad of streammux", "red")
cam2_tee_streammux = cam2["tee"].get_request_pad("src_%u")
cam2_tee_file = cam2["tee"].get_request_pad("src_%u")
if not (cam2_tee_streammux and cam2_tee_file):
cprint("Unable to get the sink pad of streammux", "red")
sinkpad_streammux_cam1 = streammux.request_pad_simple("sink_0")
if not sinkpad_streammux_cam1:
cprint("Unable to get the sink pad of streammux", "red")
sinkpad_streammux_cam2 = streammux.request_pad_simple("sink_1")
if not sinkpad_streammux_cam2:
cprint("Unable to get the sink pad of streammux", "red")
if cam1_tee_streammux.link(sinkpad_streammux_cam1) != Gst.PadLinkReturn.OK:
cprint("Failed to link source to streammux", "red")
if cam2_tee_streammux.link(sinkpad_streammux_cam2) != Gst.PadLinkReturn.OK:
cprint("Failed to link source to streammux", "red")
if not streammux.link(queue_streammux_pgie):
cprint("Failed to link streammux -> queue_streammux_pgie", "red")
if IS_RECORDING_INPUT:
cam1_record_sink_pad = cam1_record["encoder"].get_static_pad("sink")
cam2_record_sink_pad = cam2_record["encoder"].get_static_pad("sink")
if not (cam1_record_sink_pad and cam2_record_sink_pad):
cprint("Unable to get the sink pad of encoder", "red")
if not cam1_tee_file.link(cam1_record_sink_pad):
cprint("Failed to link tee to encoder", "red")
if not cam1_record["encoder"].link(cam1_record["parser"]):
cprint("Failed to link encoder to parser", "red")
if not cam1_record["parser"].link(cam1_record["muxer"]):
cprint("Failed to link parser to muxer", "red")
if not cam1_record["muxer"].link(cam1_record["sink"]):
cprint("Failed to link muxer to sink", "red")
if not cam2_tee_file.link(cam2_record_sink_pad):
cprint("Failed to link tee to encoder", "red")
if not cam2_record["encoder"].link(cam2_record["parser"]):
cprint("Failed to link encoder to parser", "red")
if not cam2_record["parser"].link(cam2_record["muxer"]):
cprint("Failed to link parser to muxer", "red")
if not cam2_record["muxer"].link(cam2_record["sink"]):
cprint("Failed to link muxer to sink", "red")
""" Without videorate """
if not queue_streammux_pgie.link(pgie):
cprint("Failed to link queue_streammux_pgie -> pgie", "red")
if not pgie.link(qeueu_pgie_nvtracker):
cprint("Failed to link pgie -> nvtracker", "red")
if IS_TRACKER_ENABLED:
if not qeueu_pgie_nvtracker.link(tracker):
cprint("Failed to link nvtracker -> tracker", "red")
if not tracker.link(queue_tracker_tiler):
cprint("Failed to link tracker -> tiler", "red")
else:
if not qeueu_pgie_nvtracker.link(queue_tracker_tiler):
cprint("Failed to link nvtracker -> tracker", "red")
if not queue_tracker_tiler.link(tiler):
cprint("Failed to link tiler -> nvvidconv", "red")
if not tiler.link(nvvidconv):
cprint("Failed to link nvvidconv -> nvosd", "red")
if not nvvidconv.link(nvosd):
cprint("Failed to link nvosd -> sink", "red")
if not nvosd.link(nvdsosd_tee):
cprint("Failed to link nvosd -> nvdsosd_tee", "red")
if not nvdsosd_tee.link(queue_tee_0):
cprint("Failed to link nvosd -> sink", "red")
if not queue_tee_0.link(sink):
cprint("Failed to link nvosd -> sink", "red")
"""
Save video
"""
if IS_RECORDING:
h264enc = Gst.ElementFactory.make("nvv4l2h264enc", "h264enc")
if not h264enc:
cprint("Unable to create h264enc", "red")
h264enc.set_property("bitrate", 800000000)
caps = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=NV12, width={}, height={}, framerate={}/1".format(WIDTH, HEIGHT, FPS))
filter = Gst.ElementFactory.make("capsfilter", "filter")
filter.set_property("caps", caps)
h264parse = Gst.ElementFactory.make("h264parse", "h264parse")
if not h264parse:
cprint("Unable to create h264parse", "red")
qtmux = Gst.ElementFactory.make("qtmux", "qtmux2")
if not qtmux:
cprint("Unable to create qtmux", "red")
filesink = Gst.ElementFactory.make("filesink", "filesink")
if not filesink:
cprint("Unable to create filesink", "red")
filesink.set_property("location", f"{video_dir}/output.avi")
filesink.set_property("max-bitrate", 800000000)
filesink.set_property("sync", 0)
pipeline.add(filter)
pipeline.add(h264enc)
pipeline.add(h264parse)
pipeline.add(qtmux)
pipeline.add(filesink)
if not nvdsosd_tee.link(queue_tee_1):
cprint("Failed to link nvdsosd_tee -> h264enc", "red")
if not queue_tee_1.link(filter):
cprint("Failed to link nvdsosd_tee -> h264enc", "red")
if not filter.link(h264enc):
cprint("Failed to link nvdsosd_tee -> h264enc", "red")
if not h264enc.link(h264parse):
cprint("Failed to link h264enc -> h264parse", "red")
if not h264parse.link(qtmux):
cprint("Failed to link h264parse -> qtmux", "red")
if not qtmux.link(filesink):
cprint("Failed to link qtmux -> filesink", "red")
################################################################################
cprint("Linking complete!", "green")
# create an event loop and feed gstreamer bus mesages to it
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
# bus.connect("message::eos", on_eos, loop)
# bus.connect("message::error", on_error, loop)
# Add keyboard input listener
# GLib.io_add_watch(sys.stdin, GLib.IOCondition.IN, on_keyboard_input, pipeline)
GLib.io_add_watch(sys.stdin.fileno(), GLib.IOCondition.IN, on_keyboard_input, pipeline)
osdsinkpad = nvosd.get_static_pad("sink")
if not osdsinkpad:
cprint("Unable to get sink pad of nvosd", "red")
pgie_src_pad = pgie.get_static_pad("src")
if not pgie_src_pad:
cprint(" Unable to get src pad", "red")
else:
pgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, pgie)
# perf callback function to print fps every 5 sec
GLib.timeout_add(5000, perf_data.perf_print_callback)
tracker_src_pad = tracker.get_static_pad("src")
if not tracker_src_pad:
cprint(" Unable to get src pad", "red")
else:
tracker_src_pad.add_probe(
Gst.PadProbeType.BUFFER, tracker_src_pad_buffer_probe, 0
)
# # Get the queue pads
# queue_cam1_src_pad = cam1["queue"].get_static_pad("src")
# queue_cam2_src_pad = cam2["queue"].get_static_pad("src")
# queue_tee_0_sink_pad = queue_tee_0.get_static_pad("sink")
# queue_tee_1_sink_pad = queue_tee_1.get_static_pad("sink")
# queue_streammux_pgie_sink_pad = queue_streammux_pgie.get_static_pad("sink")
# qeueu_pgie_nvtracker_sink_pad = qeueu_pgie_nvtracker.get_static_pad("sink")
# queue_tracker_tiler_sink_pad = queue_tracker_tiler.get_static_pad("sink")
# # Attach a probe to the queue pads
# queue_cam1_src_pad.add_probe(
# Gst.PadProbeType.BUFFER, buffer_probe, "cam1"
# )
# queue_cam2_src_pad.add_probe(
# Gst.PadProbeType.BUFFER, buffer_probe, "cam2"
# )
# queue_tee_0_sink_pad.add_probe(
# Gst.PadProbeType.BUFFER, buffer_probe, "queue_tee_0"
# )
# queue_tee_1_sink_pad.add_probe(
# Gst.PadProbeType.BUFFER, buffer_probe, "queue_tee_1"
# )
# queue_streammux_pgie_sink_pad.add_probe(
# Gst.PadProbeType.BUFFER, buffer_probe, "queue_streammux_pgie"
# )
# qeueu_pgie_nvtracker_sink_pad.add_probe(
# Gst.PadProbeType.BUFFER, buffer_probe, "qeueu_pgie_nvtracker"
# )
# queue_tracker_tiler_sink_pad.add_probe(
# Gst.PadProbeType.BUFFER, buffer_probe, "queue_tracker_tiler"
# )
"""
Video
"""
if IS_REALTIME:
camera0 = CAMERA(0)
camera1 = CAMERA(1)
if not camera0.initialize():
cprint("Failed to initialize camera 0", "red")
sys.exit(1)
if not camera1.initialize():
cprint("Failed to initialize camera 1", "red")
sys.exit(1)
camera0.set_gamma(0.5)
camera0.set_exposure_time(1700)
camera0.set_white_balance(1.64,1.0, 1.4)
camera0.set_fps_enable(FPS)
camera1.set_gamma(0.5)
camera1.set_exposure_time(1700)
camera1.set_white_balance(1.64,1.0, 1.4)
camera1.set_fps_enable(FPS)
if not camera0.start_grabbing():
cprint("Failed to start grabbing camera 0", "red")
sys.exit(1)
if not camera1.start_grabbing():
cprint("Failed to start grabbing camera 1", "red")
sys.exit(1)
cam1["source"].connect("need-data", cb_need_data, camera0, 0)
cam2["source"].connect("need-data", cb_need_data, camera1, 1)
else:
video_loader1 = VideoLoader(LEFT_VIDEO_PATH, 0, is_looping=IS_LOOPING)
video_loader2 = VideoLoader(RIGHT_VIDEO_PATH, 0, is_looping=IS_LOOPING)
cam1["source"].connect("need-data", cb_need_data, video_loader1, 0)
cam2["source"].connect("need-data", cb_need_data, video_loader2, 1)
print("Starting pipeline \n")
# start play back and listed to events
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except Exception as e:
pass
if IS_REALTIME:
camera0.stop_grabbing()
camera1.stop_grabbing()
camera0.close()
camera1.close()
# cleanup
print("Exiting app\n")
pipeline.set_state(Gst.State.NULL)
pipeline.get_state(Gst.CLOCK_TIME_NONE) # Wait until the state change is complete
# analysis.show()
# Save record as csv
df = pd.DataFrame(records, columns=["start", "end", "frame"])
df.to_csv(f"./records.csv", index=False)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="DeepStream Test")
parser.add_argument(
"-i",
"--input",
type=str,
default="../../../../samples/streams/sample_720p.h264",
help="Path to the input video file",
)
parser.add_argument(
"-c",
"--config",
type=str,
default="./models/yolov5/config_infer_primary_yoloV5_fp16_b10.txt",
# default="./yolov4_last/config_infer_primary.txt",
help="Path to the config file",
)
args = parser.parse_args()
main(args)