• Hardware Platform (Jetson / GPU): GPU
• DeepStream Version: 6.4
• JetPack Version (valid for Jetson only): -
• TensorRT Version: 8.6.1.6-1+cuda12.0
• NVIDIA GPU Driver Version (valid for GPU only): 535.183.01
• Issue Type (questions, new requirements, bugs): questions, bugs
• How to reproduce the issue? (This is for bugs. Including which sample app is used, the configuration files content, the command line used, and other details for reproducing): runtime_source_add_delete
and deepstream_test_4
• Requirement details (This is for the new requirement. Including the module name which plugin or for which sample application, and the function description): uridecodebin → nvstreammux → nvinfer → nvtracker → nvinfer → nvinfer → nvvideoconvert → capsfilter → nvmsgconv → nvmsgbroker
I have a pipeline implemented in Python by combining the Deepstream samples runtime_source_add_delete
and deepstream_test_4
. It adds multiple sources to the pipeline, then attaches metadata to each buffer using a probe function defined on the source pad of the Capsfilter, and finally publishes event messages through Redis using Nvmsgconv and Nvmsgbroker elements. You can find the code below:
# ruff: noqa: E402
# fmt: off
import sys
import gi # type: ignore
gi.require_version("Gst", "1.0")
from gi.repository import GLib, Gst # type: ignore
sys.path.append("../")
Gst.init(None)
# fmt: on
import configparser
import logging
import time
from enum import IntEnum
from threading import Thread
import cv2 # type: ignore
import numpy as np # type: ignore
import pyds # type: ignore
from common.platform_info import PlatformInfo # type: ignore
from common.utils import long_to_uint64 # type: ignore
from numpy.typing import NDArray # type: ignore
GPU_ID = 0
MAX_NUM_SOURCES = 4
UNTRACKED_OBJECT_ID = -1
MAX_TIMESTAMP_LENGTH = 32
STREAM_MUXER_RESOLUTION = (1920, 1080)
STREAM_MUXER_BATCH_SIZE = 30
STREAM_MUXER_BATCHED_PUSH_TIMEOUT = 25000
PRIMARY_INFERENCE_CONFIG_FILE = "dstest_pgie_config.txt"
TRACKER_CONFIG_FILE = "dstest_tracker_config.txt"
SECONDARY_INFERENCE_1_CONFIG_FILE = "dstest_sgie1_config.txt"
SECONDARY_INFERENCE_2_CONFIG_FILE = "dstest_sgie2_config.txt"
MESSAGE_CONVERTER_CONFIG_FILE = "dstest4_msgconv_config.txt"
MESSAGE_CONVERTER_SCHEMA_TYPE = 0
MESSAGE_BROKER_PROTO_LIB = (
"/opt/nvidia/deepstream/deepstream/lib/libnvds_redis_proto.so"
)
MESSAGE_BROKER_CONFIG = "/opt/nvidia/deepstream/deepstream/sources/libs/redis_protocol_adaptor/cfg_redis.txt"
MESSAGE_BROKER_CONN_STR = "redis;6379;test"
MESSAGE_BROKER_TOPIC = "test"
g_num_sources = 0
g_source_id_list = [-1] * MAX_NUM_SOURCES
g_eos_list = [False] * MAX_NUM_SOURCES
g_source_enabled = [False] * MAX_NUM_SOURCES
g_source_bin_list = [None] * MAX_NUM_SOURCES
g_uri = "file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.mp4"
platform_info = None
loop = None
pipeline = None
stream_muxer = None
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
class ClassID(IntEnum):
VEHICLE = 0
BICYCLE = 1
PERSON = 2
ROADSIGN = 3
def generate_vehicle_meta() -> pyds.NvDsVehicleObject:
obj = pyds.alloc_nvds_vehicle_object()
obj = pyds.NvDsVehicleObject.cast(obj)
obj.type = "sedan"
obj.color = "blue"
obj.make = "Bugatti"
obj.model = "M"
obj.license = "XX1234"
obj.region = "CA"
return obj
def generate_person_meta() -> pyds.NvDsPersonObject:
obj = pyds.alloc_nvds_person_object()
obj = pyds.NvDsPersonObject.cast(obj)
obj.age = 45
obj.cap = "none"
obj.hair = "black"
obj.gender = "male"
obj.apparel = "formal"
return obj
def get_frame_array(gst_buffer: Gst.Buffer, batch_id: int) -> NDArray:
frame_array = pyds.get_nvds_buf_surface(hash(gst_buffer), batch_id)
frame_array = np.ascontiguousarray(frame_array)
frame_array = np.frombuffer(frame_array, dtype=np.uint8)
width, height = STREAM_MUXER_RESOLUTION
frame_array = frame_array.reshape(height, width, 3)
frame_array = cv2.cvtColor(frame_array, cv2.COLOR_BGR2RGB)
return frame_array
def generate_event_metadata(
source_id: int,
frame_id: int,
user_meta: pyds.NvDsUserMeta,
object_meta: pyds.NvDsObjectMeta,
) -> pyds.NvDsEventMsgMeta:
metadata = pyds.alloc_nvds_event_msg_meta(user_meta)
metadata = pyds.NvDsEventMsgMeta.cast(metadata)
metadata.sensorId = 0
metadata.placeId = 0
metadata.moduleId = 0
metadata.sensorStr = "sensor-0"
metadata.ts = pyds.alloc_buffer(MAX_TIMESTAMP_LENGTH + 1)
pyds.generate_ts_rfc3339(metadata.ts, MAX_TIMESTAMP_LENGTH)
metadata.bbox.top = object_meta.rect_params.top
metadata.bbox.left = object_meta.rect_params.left
metadata.bbox.width = object_meta.rect_params.width
metadata.bbox.height = object_meta.rect_params.height
metadata.frameId = frame_id
metadata.trackingId = long_to_uint64(object_meta.object_id)
metadata.confidence = object_meta.confidence
match object_meta.class_id:
case ClassID.VEHICLE:
metadata.type = pyds.NvDsEventType.NVDS_EVENT_MOVING
metadata.objType = pyds.NvDsObjectType.NVDS_OBJECT_TYPE_VEHICLE
metadata.objClassId = ClassID.VEHICLE
metadata.extMsg = generate_vehicle_meta()
metadata.extMsgSize = sys.getsizeof(pyds.NvDsVehicleObject)
case ClassID.PERSON:
metadata.type = pyds.NvDsEventType.NVDS_EVENT_ENTRY
metadata.objType = pyds.NvDsObjectType.NVDS_OBJECT_TYPE_PERSON
metadata.objClassId = ClassID.PERSON
metadata.extMsg = generate_person_meta()
metadata.extMsgSize = sys.getsizeof(pyds.NvDsPersonObject)
return metadata
def attach_metadata(
frame_meta: pyds.NvDsFrameMeta,
user_meta: pyds.NvDsUserMeta,
object_meta: pyds.NvDsObjectMeta,
frame_array: NDArray,
source_id: int,
) -> None:
event_metadata = generate_event_metadata(
source_id=source_id,
frame_id=frame_meta.frame_num,
user_meta=user_meta,
object_meta=object_meta,
)
user_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_EVENT_MSG_META
user_meta.user_meta_data = event_metadata
pyds.nvds_add_user_meta_to_frame(frame_meta, user_meta)
def iterate_object_meta_list(
batch_meta: pyds.NvDsBatchMeta,
frame_meta: pyds.NvDsFrameMeta,
frame_array: NDArray,
source_id: int,
) -> None:
l_obj = frame_meta.obj_meta_list
while l_obj is not None:
try:
object_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
tracking_id = long_to_uint64(object_meta.object_id)
if tracking_id == UNTRACKED_OBJECT_ID:
logger.debug("No tracking ID found for object")
raise ValueError
user_meta = pyds.nvds_acquire_user_meta_from_pool(batch_meta)
if user_meta:
attach_metadata(
frame_meta, user_meta, object_meta, frame_array, source_id
)
else:
logger.error("Failed to acquire user meta")
except StopIteration:
logger.debug("Iterating object meta list is over")
except ValueError:
logger.debug("Skipping an invalid object meta")
finally:
try:
l_obj = l_obj.next
except StopIteration:
break
def extract_metadata(pad: Gst.Pad, info: Gst.PadProbeInfo) -> Gst.PadProbeReturn:
global g_source_id_list
gst_buffer = info.get_buffer()
if not gst_buffer:
logger.warning("Unable to get buffer from pad")
return Gst.PadProbeReturn.DROP
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
if not batch_meta:
return Gst.PadProbeReturn.OK
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
if frame_meta.pad_index not in g_source_id_list:
logger.warning(f"No source found for pad index {frame_meta.pad_index}")
raise StopIteration
source_id = g_source_id_list[frame_meta.pad_index]
logger.debug(
f"Extracting metadata for stream '{source_id}' at frame {frame_meta.frame_num}"
)
frame_array = get_frame_array(gst_buffer, frame_meta.batch_id)
iterate_object_meta_list(batch_meta, frame_meta, frame_array, source_id)
except StopIteration:
logger.debug("Iterating frame metadata list is over")
finally:
try:
l_frame = l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
def source_bin_child_added(
child_proxy: Gst.ChildProxy,
emitter_object: Gst.Object,
element_name: str,
source_id: int,
) -> None:
global platform_info
logger.debug(f"Element '{element_name}' added for source '{source_id}'")
# decodebin
if element_name.startswith("decodebin"):
emitter_object.connect("child-added", source_bin_child_added, source_id)
# nvv4l2decoder
elif element_name.startswith("nvv4l2decoder"):
if platform_info.is_integrated_gpu(): # type: ignore
emitter_object.set_property("enable-max-performance", True)
emitter_object.set_property("drop-frame-interval", 0)
emitter_object.set_property("num-extra-surfaces", 0)
else:
emitter_object.set_property("gpu_id", GPU_ID)
# source
elif element_name.startswith("source"):
source_element = child_proxy.get_by_name(element_name)
if source_element.find_property("drop-on-latency") is not None:
emitter_object.set_property("drop-on-latency", True)
# rtspsrc
if source_element.get_factory().get_name() == "rtspsrc":
emitter_object.set_property("protocols", "tcp")
emitter_object.set_property("tcp-timeout", 5000000)
emitter_object.set_property("buffer-mode", "buffer")
def source_bin_pad_added(
source_bin: Gst.Element,
source_pad: Gst.Pad,
source_id: int,
) -> None:
global stream_muxer
source_caps: Gst.Caps = source_pad.get_current_caps()
source_name: str = source_caps.get_structure(0).get_name()
if not source_name.startswith("video"):
logger.debug(f"Skipped adding non-video pad [{source_name}]")
return
sink_name = f"sink_{source_id}"
sink_pad = stream_muxer.request_pad_simple(sink_name) # type: ignore
if not sink_pad:
raise ValueError("Unable to create sink pad")
result = source_pad.link(sink_pad)
if result != Gst.PadLinkReturn.OK:
raise ValueError(f"Failed to link pad to stream muxer for stream '{source_id}'")
logger.debug(f"Added new '{sink_name}' pad for stream '{source_id}'")
def remove_source(source_id: int) -> bool:
global g_num_sources
global g_source_bin_list
global stream_muxer
global pipeline
logger.info(f"Removing source '{source_id}'")
source_bin = g_source_bin_list[source_id]
result = source_bin.set_state(Gst.State.NULL) # type: ignore
if result == Gst.StateChangeReturn.FAILURE:
logger.error(f"Failed to set source state to NULL for stream '{source_id}'")
return False
sink_name = f"sink_{source_id}"
sink_pad = stream_muxer.get_static_pad(sink_name) # type: ignore
if sink_pad is None:
logger.error(f"Sink pad '{sink_name}' for source '{source_id}' not found")
return False
sink_pad.send_event(Gst.Event.new_flush_stop(False))
stream_muxer.release_request_pad(sink_pad) # type: ignore
pipeline.remove(source_bin) # type: ignore
g_source_enabled[source_id] = False
g_source_id_list[source_id] = -1
g_source_bin_list[source_id] = None
g_num_sources -= 1
return True
def add_source(source_id: int) -> bool:
global g_num_sources
global g_source_enabled
global g_source_bin_list
global g_source_id_list
global g_uri
global pipeline
logger.info(f"Adding source '{source_id}'")
logger.debug(f"Creating source bin for stream '{source_id}' [{g_uri}]")
source_bin = Gst.ElementFactory.make("uridecodebin", f"source-{source_id}")
if source_bin is None:
raise ValueError(
f"Unable to create source bin for stream '{source_id}' [{g_uri}]"
)
source_bin.set_property("uri", g_uri)
source_bin.connect("pad-added", source_bin_pad_added, source_id)
source_bin.connect("child-added", source_bin_child_added, source_id)
pipeline.add(source_bin) # type: ignore
result = source_bin.set_state(Gst.State.PLAYING)
if result == Gst.StateChangeReturn.FAILURE:
logger.error(f"Failed to set source state to PLAYING for stream '{source_id}'")
return False
g_source_enabled[source_id] = True
g_source_id_list[source_id] = source_id
g_source_bin_list[source_id] = source_bin
g_num_sources += 1
return True
def run() -> None:
global loop
global pipeline
pipeline.set_state(Gst.State.PAUSED) # type: ignore
pipeline.set_state(Gst.State.PLAYING) # type: ignore
try:
loop.run() # type: ignore
except Exception:
print("Error happend while processing pipeline")
pipeline.set_state(Gst.State.NULL) # type: ignore
def start() -> Thread:
thread = Thread(target=run, daemon=True)
thread.start()
return thread
def stop() -> None:
global loop
loop.quit() # type: ignore
def handle_message(bus: Gst.Bus, message: Gst.Message) -> bool:
global g_eos_list
global loop
logger.debug("Handle received message")
match message.type:
case Gst.MessageType.EOS:
logger.info("Pipeline recieved EOS! Stopping pipeline...")
stop()
case Gst.MessageType.WARNING:
warning, debug = message.parse_warning()
logger.warning(f"Pipeline Warning: {warning}")
logger.debug(f"Pipeline Warning Debug: {debug}")
case Gst.MessageType.ERROR:
error, debug = message.parse_error()
logger.error(f"Pipeline Error: {error}")
logger.debug(f"Pipeline Error Debug: {debug}")
logger.info("Pipeline recieved an error! Stopping pipeline...")
stop()
case Gst.MessageType.ELEMENT:
struct = message.get_structure()
if struct is not None and struct.has_name("stream-eos"):
parsed, stream_id = struct.get_uint("stream-id")
if parsed:
logger.info(f"Received EOS for stream {stream_id}")
g_eos_list[stream_id] = True
remove_source(stream_id)
return True
def create_pipeline():
global platform_info
global loop
global pipeline
global stream_muxer
loop = GLib.MainLoop()
platform_info = PlatformInfo()
pipeline = Gst.Pipeline()
if not pipeline:
raise ValueError("Unable to create Pipeline")
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", handle_message)
logger.debug("Creating stream muxer")
stream_muxer = Gst.ElementFactory.make("nvstreammux", "stream-muxer")
if stream_muxer is None:
raise ValueError("Unable to create stream muxer")
width, height = STREAM_MUXER_RESOLUTION
stream_muxer.set_property("gpu_id", GPU_ID)
stream_muxer.set_property("batch-size", STREAM_MUXER_BATCH_SIZE)
stream_muxer.set_property("batched-push-timeout", STREAM_MUXER_BATCHED_PUSH_TIMEOUT)
stream_muxer.set_property("live-source", 1)
stream_muxer.set_property("width", width)
stream_muxer.set_property("height", height)
stream_muxer.set_property("nvbuf-memory-type", int(pyds.NVBUF_MEM_CUDA_UNIFIED))
logger.debug("Creating primary inference")
primary_inference = Gst.ElementFactory.make("nvinfer", "primary-inference")
if primary_inference is None:
raise ValueError("Unable to create primary inference")
primary_inference.set_property("config-file-path", PRIMARY_INFERENCE_CONFIG_FILE)
primary_inference.set_property("gpu_id", GPU_ID)
batch_size = primary_inference.get_property("batch-size")
if batch_size < MAX_NUM_SOURCES:
logger.warning(
f"Overriding infer-config batch-size {batch_size} with number of sources {MAX_NUM_SOURCES}"
)
primary_inference.set_property("batch-size", MAX_NUM_SOURCES)
logger.debug("Creating tracker")
tracker = Gst.ElementFactory.make("nvtracker", "tracker")
if tracker is None:
raise ValueError("Unable to create tracker")
config = configparser.ConfigParser()
config.read(TRACKER_CONFIG_FILE)
config.sections()
for key in config["tracker"]:
if key == "tracker-width":
tracker_width = config.getint("tracker", key)
tracker.set_property("tracker-width", tracker_width)
if key == "tracker-height":
tracker_height = config.getint("tracker", key)
tracker.set_property("tracker-height", tracker_height)
if key == "gpu-id":
tracker_gpu_id = config.getint("tracker", key)
tracker.set_property("gpu_id", tracker_gpu_id)
if key == "ll-lib-file":
tracker_ll_lib_file = config.get("tracker", key)
tracker.set_property("ll-lib-file", tracker_ll_lib_file)
if key == "ll-config-file":
tracker_ll_config_file = config.get("tracker", key)
tracker.set_property("ll-config-file", tracker_ll_config_file)
if key == "enable-batch-process":
tracker_enable_batch_process = config.getint("tracker", key)
tracker.set_property("enable_batch_process", tracker_enable_batch_process)
logger.debug("Creating secondary inference 1")
secondary_inference_1 = Gst.ElementFactory.make(
"nvinfer", "secondary1-nvinference-engine"
)
if secondary_inference_1 is None:
raise ValueError("Unable to create secondary inference 1")
secondary_inference_1.set_property("gpu_id", GPU_ID)
secondary_inference_1.set_property(
"config-file-path", SECONDARY_INFERENCE_1_CONFIG_FILE
)
logger.debug("Creating secondary inference 2")
secondary_inference_2 = Gst.ElementFactory.make(
"nvinfer", "secondary2-nvinference-engine"
)
if secondary_inference_2 is None:
raise ValueError("Unable to create secondary inference 1")
secondary_inference_2.set_property("gpu_id", GPU_ID)
secondary_inference_2.set_property(
"config-file-path", SECONDARY_INFERENCE_2_CONFIG_FILE
)
logger.debug("Creating video converter")
video_converter = Gst.ElementFactory.make("nvvideoconvert", "video-convertor")
if video_converter is None:
raise ValueError("Unable to create video converter")
video_converter.set_property("gpu_id", GPU_ID)
video_converter.set_property("nvbuf-memory-type", int(pyds.NVBUF_MEM_CUDA_UNIFIED))
logger.debug("Creating filter")
filter = Gst.ElementFactory.make("capsfilter", "filter")
if filter is None:
raise ValueError("Unable to create filter")
filter_caps = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGB")
filter.set_property("caps", filter_caps)
src_pad = filter.get_static_pad("src")
if src_pad is None:
raise ValueError("Unable to get src pad of filter")
src_pad.add_probe(Gst.PadProbeType.BUFFER, extract_metadata)
logger.debug("Creating message converter")
message_converter = Gst.ElementFactory.make("nvmsgconv", "message-converter")
if message_converter is None:
raise ValueError("Unable to create message converter")
message_converter.set_property("config", MESSAGE_CONVERTER_CONFIG_FILE)
message_converter.set_property("payload-type", MESSAGE_CONVERTER_SCHEMA_TYPE)
message_converter.set_property("msg2p-newapi", False)
logger.debug("Creating message broker")
message_broker = Gst.ElementFactory.make("nvmsgbroker", "message-broker")
if message_broker is None:
raise ValueError("Unable to create message broker")
message_broker.set_property("sync", False)
message_broker.set_property("proto-lib", MESSAGE_BROKER_PROTO_LIB)
message_broker.set_property("conn-str", MESSAGE_BROKER_CONN_STR)
message_broker.set_property("topic", MESSAGE_BROKER_TOPIC)
if MESSAGE_BROKER_CONFIG:
message_broker.set_property("config", MESSAGE_BROKER_CONFIG)
logger.info("Adding elements to Pipeline")
pipeline.add(stream_muxer)
pipeline.add(primary_inference)
pipeline.add(tracker)
pipeline.add(secondary_inference_1)
pipeline.add(secondary_inference_2)
pipeline.add(video_converter)
pipeline.add(filter)
pipeline.add(message_converter)
pipeline.add(message_broker)
logger.info("Linking elements in the Pipeline")
stream_muxer.link(primary_inference)
primary_inference.link(tracker)
tracker.link(secondary_inference_1)
secondary_inference_1.link(secondary_inference_2)
secondary_inference_2.link(video_converter)
video_converter.link(filter)
filter.link(message_converter)
message_converter.link(message_broker)
if __name__ == "__main__":
create_pipeline()
thread = start()
time.sleep(120)
for source_id in range(4):
add_source(source_id)
time.sleep(10)
for source_id in range(4):
remove_source(source_id)
time.sleep(5)
stop()
while thread.is_alive():
time.sleep(1)
In this sample, I add 4 streams to the pipeline and then remove them one by one. However, when the last source-bin is removed from the pipeline by calling pipeline.remove(source_bin)
, I receive a segfault as follows:
WARNING:__main__:Pipeline Warning: gst-resource-error-quark: No Sources found at the input of muxer. Waiting for sources. (3)
DEBUG:__main__:Pipeline Warning Debug: gstnvstreammux.cpp(2825): gst_nvstreammux_src_push_loop (): /GstPipeline:pipeline0/GstNvStreamMux:stream-muxer
Segmentation fault (core dumped)
Furthermore, according to the core dump, it seems the Nvvideoconvert is problematic. Here is the parsed core dump of my script:
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/usr/lib/x86_64-linux-gnu/libthread_db.so.1".
--Type <RET> for more, q to quit, c to continue without paging--
Core was generated by `python3 sample.py'.
Program terminated with signal SIGSEGV, Segmentation fault.
#0 0x00007f725c7569e5 in ?? () from /usr/lib/x86_64-linux-gnu/gstreamer-1.0/deepstream/libgstnvvideoconvert.so
[Current thread is 1 (Thread 0x7f71edfff640 (LWP 783))]
Any suggestions or guidance would be greatly appreciated!