Please provide complete information as applicable to your setup.
• Hardware Platform (GPU) - GPU A100
• DeepStream Version - 7.1
• JetPack Version (valid for Jetson only) - N/A
• TensorRT Version - 10.3.0.26-1+cuda12.5
• NVIDIA GPU Driver Version (valid for GPU only) - 550.90.07
• Issue Type( questions, new requirements, bugs) - bugs
• How to reproduce the issue ? (This is for bugs. Including which sample app is using, the configuration files content, the command line used and other details for reproducing)
We have built an app, which uses nvsreammux
and nvurisrcbin
for processing multiple RTSP streams. Also, it uses Smart Record feature to keep capture moments of interest.
Streams are being fed to pipeline with original FPS, but inferencing rate is capped @5 FPS.
It may be too complex to provide a runnable snippet here.
Problem: In majority of cases we get EOS propagated to bus callback, but sometimes it’s not.
When it doesn’t happen the streams are stuck with no activity.
It impacts the rest of the business logic.
So far we’ve implemented a workaround to monitor for stuck streams and remove them from pipeline. But there’s a suspicion that it may lead to crashes if we try to remove a stream (nvurisrcbin
), if it’s stuck in undefined state.
Also, we experience strange behavior of SR: sometimes video duration may be way longer than we anticipate: minutes vs 45s (our config)
Memory type used - Unified memory. We use it to create a copy of a frame for when the moment of interest happens.
Here’s the function of how we create a pipeline
def _create(self) -> bool:
if not self._check_config():
return False
self._platform_info = PlatformInfo()
self._pipeline = Gst.Pipeline()
if self._pipeline is None:
logger.error("Unable to create Pipeline")
return False
batch_size = self._total_capacity if self._total_capacity > 0 else self.DEFAULT_MUX_BATCH_SIZE
# Create nvstreammux instance to form batches from one or more sources.
self._muxer = self._create_element("nvstreammux", "Stream-muxer")
if self._muxer is None:
return False
self._muxer.set_property("batched-push-timeout", 33000)
self._muxer.set_property("batch-size", batch_size)
self._muxer.set_property("gpu_id", self._gpu_id)
self._muxer.set_property("live-source", 1)
# Set streammux width and height
self._muxer.set_property('width', self.MUXER_OUTPUT_WIDTH)
self._muxer.set_property('height', self.MUXER_OUTPUT_HEIGHT)
self._pgie = self._create_element("nvinferserver", "primary-inference")
if self._pgie is None:
return False
tee_infer = self._create_element("tee", "tee_infer")
if tee_infer is None:
return False
queue_infer = self._create_element("queue", "queue_infer")
if tee_infer is None:
return False
tiler = None
nv_video_convert = None
nv_osd = None
self.nv_video_convert_infer = None
logger.info("Creating nv_video_convert_infer")
self.nv_video_convert_infer = self._create_element("nvvideoconvert", "nv_video_convert_infer")
if self.nv_video_convert_infer is None:
logger.error("Unable to create nv_video_convert_infer")
return False
video_src_pad = self.nv_video_convert_infer.get_static_pad("src")
if video_src_pad is None:
logger.error("Unable to get source pad for nvvideoconv")
return False
else:
video_src_pad.add_probe(Gst.PadProbeType.BUFFER, self._videoconv_src_pad_buffer_probe, 0)
logger.info("Creating caps filter")
video_convert_caps = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
if video_convert_caps is None:
return False
video_convert_caps_filter = self._create_element("capsfilter", "video_convert_caps_filter")
if not video_convert_caps_filter:
logger.error("Unable to create video_convert_caps_filter")
return False
video_convert_caps_filter.set_property("caps", video_convert_caps)
sink = self._create_element("fakesink", "fakesink")
if sink is None:
return False
sink.set_property('enable-last-sample', 0)
sink.set_property('sync', 0)
# Set configuration file path
self._pgie.set_property('config-file-path', self.client_config)
if not application_environment.DEBUG:
mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
self._muxer.set_property("nvbuf-memory-type", mem_type)
self.nv_video_convert_infer.set_property("nvbuf-memory-type", mem_type)
logger.info("Adding elements to Pipeline...")
self._pipeline.add(self._muxer)
self._pipeline.add(self._pgie)
self._pipeline.add(tee_infer)
self._pipeline.add(queue_infer)
self._pipeline.add(self.nv_video_convert_infer)
self._pipeline.add(video_convert_caps_filter)
self._pipeline.add(sink)
# link elements:
logger.info("Linking elements in the Pipeline...")
self._muxer.link(self._pgie)
self._pgie.link(tee_infer)
tee_infer.link(queue_infer)
queue_infer.link(self.nv_video_convert_infer)
self.nv_video_convert_infer.link(video_convert_caps_filter)
video_convert_caps_filter.link(sink)
return True
Bus callback implementation:
def _bus_callback(self, bus, message, loop):
type = message.type
if type == Gst.MessageType.EOS:
logger.info("End-of-stream reached. Stopping pipeline...")
self._stop_pipeline()
elif type == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
logger.warning(f"Warning: {err}: {debug}")
elif type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
logger.error(f"Error: {err}: {debug}")
logger.error(f"Message structure: {message.get_structure()}")
error_message = f"Error: {err}: debug info: {debug}"
match = self.stream_id_pattern.match(str(message.get_structure()))
if match is not None:
try:
stream_id = int(match.group(1))
source = self._get_source(stream_id)
stream = self.get_stream(stream_id)
if source is None:
historian.detection_point_error(None, stream_id, error_message)
else:
historian.detection_point_error(stream.detection_point_id, stream_id, error_message)
if stream is not None:
logger.info(f"Got an error for stream id: {stream_id}. Camera URL is: '{stream.url}' "
f"Detection point ID {stream.detection_point_id}, "
f"name '{stream.detection_point_name}'. "
"Stopping it. Coordinator should restart this stream.")
historian.detection_point_stop(stream.detection_point_id,
stream.detection_point_name,
stream.url, stream_id,
f"STOP BY ERROR: {error_message}")
self._release_source(stream_id)
super().remove_source(stream_id)
elif source is not None:
historian.detection_point_stop(None, None, None,
stream_id, f"OFF-PIPELINE STOP BY ERROR: {error_message}")
self._release_source(stream_id)
except ValueError:
logger.warning(f"Got an error, but wasn't able to parse related stream id.")
historian.session_error(f"Got an error without stream id. Error: {err}: debug info: {debug}")
else:
logger.error(f"Got an error, but wasn't able to extract related stream id.")
historian.session_error(f"Got an error without stream id. Error: {err}: debug info: {debug}")
elif type == Gst.MessageType.ELEMENT:
struct = message.get_structure()
# Check for stream-eos message
if struct is not None and struct.has_name("stream-eos"):
parsed, stream_id = struct.get_uint("stream-id")
if parsed:
logger.info(f"Got End-of-stream for source {stream_id}")
stream = self.get_stream(stream_id)
source = self._get_source(stream_id)
if stream is None:
historian.session_error(f"Got EOS for stream {stream_id}, but it's no on the pipeline already. "
f"Is source also gone?: {source is None}")
else:
historian.detection_point_stop(stream.detection_point_id,
stream.detection_point_name,
stream.url,
stream_id,
"Removed because of EOS")
self._release_source(stream_id)
super().remove_source(stream_id)
return True
The following is how we setup the source bins
@staticmethod
def _create_element(element_name, bin_name):
logger.info(f"Creating bin {bin_name}\n")
element = Gst.ElementFactory.make(element_name, bin_name)
if element is None:
logger.error(f"Unable to create {element_name}\n")
return None
return element
@staticmethod
def _get_source_bin_name(source_id: int):
return "source-bin-%02d" % source_id
def _create_source_bin(self, uri, source_id):
# Create a source GstBin to abstract this bin's content from the rest of the
# pipeline
bin_name = self._get_source_bin_name(source_id)
logger.info(f"Creating {bin_name} for {uri}")
# Source element for reading from the uri.
# We will use decodebin and let it figure out the container format of the
# stream and the codec and plug the appropriate demux and decode plugins.
source_bin = self._create_element("nvurisrcbin", f"{bin_name}")
stream = self.get_stream(source_id)
# We set the input uri to the source element
source_bin.set_property("uri", uri)
source_bin.set_property("smart-record", 2)
source_bin.set_property("smart-rec-dir-path", self._recordings_path)
source_bin.set_property("smart-rec-file-prefix", f"recording_dp_{stream.detection_point_id}")
source_bin.set_property("smart-rec-cache", self.recording_cache_size + 1)
#source_bin.set_property("smart-rec-start-time", self.recording_cache_size)
source_bin.set_property("smart-rec-default-duration", self.recording_default_duration)
source_bin.set_property("smart-rec-mode", 1) # video only
source_bin.set_property("select-rtp-protocol", 4) #force tcp
source_bin.set_property("drop-on-latency", False)
# Connect to the "pad-added" signal of the decodebin which generates a
# callback once a new pad for raw data has been created by the decodebin
source_bin.connect("pad-added", self._on_pad_added, source_id)
source_bin.connect("child-added", self._on_decodebin_child_added, source_id)
source_bin.connect("start-sr", self._on_recording_started, source_id)
source_bin.connect("sr-done", self._on_recording_stopped, source_id)
return source_bin
def _on_pad_added(self, _, pad, data):
caps = pad.get_current_caps()
if not caps:
caps = pad.query_caps()
gst_struct = caps.get_structure(0)
gst_name = gst_struct.get_name()
# Need to check if the pad created by the decodebin is for video and not
# audio.
logger.info(f"Pad added: {gst_name}")
if gst_name.find("video") != -1:
source_id = data
pad_name = self._get_sink_pad_name(source_id)
# Get a sink pad from the streammux, link to decodebin
sink_pad = self._muxer.request_pad_simple(pad_name)
if not sink_pad:
logger.error(f"Unable to create sink pad bin {pad_name}")
if pad.link(sink_pad) == Gst.PadLinkReturn.OK:
logger.info(f"Decodebin {pad_name} linked to pipeline. User data: {data}")
self._set_inference_for_source(data, True)
if self._max_fps is not None:
self._set_infer_fps(data, self._max_fps)
else:
logger.error(f"Failed to link decodebin {pad_name} to pipeline")
def _on_decodebin_child_added(self, child_proxy: Gst.ChildProxy, obj: GObject.Object, name: str, user_data):
logger.info(f"Decodebin child added {name}")
if name.find("decodebin") != -1:
obj.connect("child-added", self._on_decodebin_child_added, user_data)
if name.find("nvv4l2decoder") != -1:
logger.info(f"Found nvv4l2decoder: {name}")
obj.set_property("gpu_id", self._gpu_id)
# Use CUDA unified memory in the pipeline so frames can be easily accessed on CPU in Python.
# 0: NVBUF_MEM_CUDA_DEVICE, 1: NVBUF_MEM_CUDA_PINNED, 2: NVBUF_MEM_CUDA_UNIFIED
# Don't use direct macro here like NVBUF_MEM_CUDA_UNIFIED since nvv4l2decoder uses a
# different enum internally
if not application_environment.DEBUG:
obj.set_property("cudadec-memtype", 2)
logger.info(f"Set cudadec-memtype to NVBUF_MEM_CUDA_UNIFIED for {name}")
Aux struct we use for managing additional properties of the stream:
class SourceControlStruct:
def __init__(self, source_bin = None):
self.source_bin = source_bin
self.inference_enabled:bool = False
self.recording_start_timestamp: float | None = None
@property
def is_recording(self) -> bool:
return self.source_bin.get_property("smart-rec-status") if self.source_bin is not None else False
Recording management:
def start_recording(self, source_id: int, timestamp: int):
source = self._get_source(source_id)
if not super().start_recording(source_id, timestamp) or source is None:
logger.error(f"Cannot start recording. Source {source_id} does not exist.")
return
if source.is_recording:
logger.error(f"Source {source_id} is already being recorded. Skipping record command.")
return
seconds_before = self.recording_cache_size
if seconds_before < 0:
logger.warning(f"Negative seconds_before requested: {seconds_before}, using 0 instead.")
seconds_before = 0
if self.recording_default_duration <= 0:
logger.error(
f"Non-positive recording_duration requested: {self.recording_default_duration}, not recording.")
return
session_id = pyds.alloc_uint32_value(0)
result = source.source_bin.emit('start-sr', session_id, seconds_before, self.recording_default_duration, None)
logger.success(f"Started recording session {session_id}, result: {result}")
pyds.free_gbuffer(session_id)
start_time = time.time()
while True:
if source.is_recording:
source.recording_start_timestamp = time.time()
logger.info(f"Recording for {source_id} started @ {source.recording_start_timestamp}")
break
time.sleep(0.1)
if time.time() - start_time > 1:
logger.warning(f"Timeout while waiting for recording to start for source {source_id}.")
break
def stop_recording(self, source_id: int):
source = self._get_source(source_id)
if source is None:
logger.error(f"Source {source_id} does not exist, there is nothing to stop.")
return
if not source.is_recording:
logger.info(f"Source {source_id} is not being recorded, skipping the command.")
return
logger.info(f"Stopping smart recording on source {source_id}...")
source.source_bin.emit('stop-sr', 0)
start_time = time.time()
while True:
if not source.is_recording:
logger.info(f"Recording for {source_id} stopped.")
break
time.sleep(0.1)
if time.time() - start_time > 1:
logger.warning(f"Timeout while waiting for recording to stop for source {source_id}.")
break
def _on_recording_stopped(self, nvurisrcbin, recording_info, user_data, source_id):
source = self._get_source(source_id)
if source is None:
logger.error(f"Got recording stopped callback for non-existent source '{source_id}'")
elif source.recording_start_timestamp is None:
logger.warning(f"Got recording stopped callback while no recording start timestamp '{source_id}'.")
else:
logger.info(f"Got recording stopped callback for source '{source_id}'. "
f"Recording duration: {time.time() - source.recording_start_timestamp}s")
source.recording_start_timestamp = None
logger.success(f"Recording stopped: {source_id}, info: {recording_info}, user data: {user_data}")
info = pyds.NvDsSRRecordingInfo.cast(hash(recording_info))
filename = pyds.get_string(info.filename)
folder = pyds.get_string(info.dirpath)
logger.success(f"Recording info: {filename}, {folder}")
def _on_recording_started(self, nvurisrcbin, session_id, seconds_before, duration, user_data, source_id):
source = self._get_source(source_id)
logger.success(f"Registered 'recording_started' for source {source_id}. Source exists?: {source is not None}")
Annotation of inferenced frames (videoconv probe)
def _videoconv_src_pad_buffer_probe(self, pad, info, u_data):
gst_buffer = info.get_buffer()
if not gst_buffer:
logger.error("Unable to get source buffer")
return Gst.PadProbeReturn.DROP
# Retrieve batch metadata from the gst_buffer
# Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
# C address of gst_buffer as input, which is obtained with hash(gst_buffer)
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
if not batch_meta:
return Gst.PadProbeReturn.OK
l_frame = batch_meta.frame_meta_list
streams: dict[int, Stream] = {}
while l_frame is not None:
try:
# Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
# The casting is done by pyds.NvDsFrameMeta.cast()
# The casting also keeps ownership of the underlying memory
# in the C code, so the Python garbage collector will leave
# it alone.
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
stream = streams.get(frame_meta.source_id, None)
if stream is None:
stream = self.get_stream(frame_meta.source_id)
if stream is None:
break
streams[frame_meta.source_id] = stream
# Update frame rate through this probe
if frame_meta.bInferDone:
self._perf_data.register_frame_for_source_id(frame_meta.source_id)
if not application_environment.DEBUG:
n_frame = pyds.get_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
if n_frame is None:
logger.warning("Unable to get frame surface pointer")
try:
l_frame = l_frame.next
except StopIteration:
break
continue
frame_copy = np.array(n_frame, copy=True, order='C')
frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_RGBA2BGRA)
else:
frame_copy = None
source_frame_width = self.MUXER_OUTPUT_WIDTH #frame_meta.source_frame_width
source_frame_height = self.MUXER_OUTPUT_HEIGHT #frame_meta.source_frame_height
classes = []
scores = []
xs = []
ys = []
widths = []
heights = []
l_obj = frame_meta.obj_meta_list
while l_obj is not None:
try:
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
except StopIteration:
break
classes.append(obj_meta.class_id)
scores.append(obj_meta.confidence)
rect_params = obj_meta.rect_params
xs.append(rect_params.left / source_frame_width)
ys.append(rect_params.top / source_frame_height)
widths.append(rect_params.width / source_frame_width)
heights.append(rect_params.height / source_frame_height)
left = int(rect_params.left)
top = int(rect_params.top)
w = int(rect_params.width)
h = int(rect_params.height)
try:
if not application_environment.DEBUG:
text = f"{self.get_class_name_by_id(obj_meta.class_id)}: {obj_meta.confidence:.2f}"
color = OBJECTS_PALETTE[int(obj_meta.class_id) % len(OBJECTS_PALETTE)] # Assign predefined color
cv2.rectangle(frame_copy, (left, top), (left + w, top + h), color, 2)
cv2.putText(frame_copy, text, (left, top - 10), cv2.FONT_HERSHEY_SIMPLEX,
0.5, (255, 255, 255), 2)
except Exception as e:
logger.warning(f"Failed to draw detection bbox: {self.get_class_name_by_id(obj_meta.class_id)}, "
f"({rect_params.left}, {rect_params.top}, "
f"{rect_params.width}, {rect_params.height}): {e})")
try:
l_obj = l_obj.next
except StopIteration:
break
stream.last_frame = frame_copy
try:
l_frame = l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
Unfortunately we don’t have a sample app, which may run locally, as this issue happens sporadically in live environment.