• Hardware Platform (Jetson / GPU) : NVIDIA Jetson AGX Orin • DeepStream Version : 7.1 • JetPack Version (valid for Jetson only) : 6.1 • TensorRT Version : 8.6.2.3 • Issue Type( questions, new requirements, bugs) : question
Hello,
I would like to know how to stop a DeepStream pipeline based on a specific condition and resume it when the condition is no longer met, without restarting the pipeline. My goal is to place the pipeline in an “idle” or “paused” state and then “wake it up” when needed.
For example:
I have a GPS device connected to my NVIDIA hardware, along with a service that retrieves the current GPS location. If the device is stationary, I want the pipeline to stop processing but remain in an “awake” mode. When the device starts moving and exceeds a speed of e.g.10 km/h, I want the pipeline to resume processing seamlessly from where it left off.
To clarify, my question is not about handling the GPS input but about managing the state of the DeepStream pipeline. Could this be achieved by modifying the behavior of GLib.MainLoop() or using another method to transition the pipeline between states like PLAYING, PAUSED, IDLE or other?
Any suggestions or guidance would be greatly appreciated.
What does this mean? For the live stream, once you stop/pause, you can’t resume from the time again. For example, if you pause the pipeline at 00:00:30, how can you resume the stream from 00:00:30 when you resume the pipeline again at 00:02:00?
In my pipeline, I have several models performing inference, but I want the inference to occur only when certain conditions are met. For example, if the condition is met from 00:00:00 to 00:00:30, inference and other processing should occur during that time. However, when the condition is no longer met (e.g., after 00:00:30 till 00:02:00), all inference and post-processing should pause, while the pipeline itself continues running.
Then, if the condition is met again at 00:02:00, the pipeline should resume inference and processing until the condition is no longer satisfied.
To clarify, I don’t want to stop the pipeline entirely or resume it from a specific timestamp (e.g., 00:00:30). Instead, I want to temporarily pause the inference and processing logic while keeping the pipeline active.
I’m not referring to nvinfer functionality here. My pipeline also includes a branch that saves files to specific locations without nvinfer plugin inside. I would like to stop the entire pipeline temporarily or change its state to something that indicates it’s paused, not restart it and run again, but resume later with all previous states preserved.
For example, in the bus callback, when a specific message type is received, I want to stop the loop without quitting it entirely. Alternatively, I’d like to modify the pipeline’s state as shown in this snippet from deepstream-test-1:
# Start playback and listen to events
print("Starting pipeline \n")
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except Exception as e:
print(f"Loop interrupted: {e}")
# Cleanup
pipeline.set_state(Gst.State.NULL)
Thank you for pointing that out and sorry maybe I am not precise enough. Let me clarify my requirements:
I have a pipeline that performs inference and post-processing tasks. Alongside this, a separate service running on the Nvidia device monitors the vehicle’s speed using a GPS device.
Here’s what I want to achieve:
Pipeline Running: When the vehicle’s speed exceeds 10 km/h, the pipeline should run normally, performing inference and all other processing as expected.
Pipeline Paused or some watch state: When the speed drops below 10 km/h, I want to pause the pipeline’s processing (both inference and post-processing). However, I do not want to stop or shut down the pipeline entirely. This is to avoid having to restart the script or reinitialize the pipeline when conditions change. I want to keep all the previous states of the pipeline.
Pipeline Resumed: If the speed rises above 10 km/h again, I want the pipeline to resume processing from its paused state, without losing any of its previous configurations or states.
In essence, I need a way to change the pipeline to a “watch” or “idle” state when the speed condition is not met and bring it back to active processing when the condition is satisfied again, all without restarting the entire pipeline.
I hope this clarifies my requirements better. Thank you for your patience!
This is my idea. It is relatively simple to implement, but I have not tested it. I don’t know whether it is possible to modify the properties of nvinfer when it is running.
When the conditions are met:
nvinfer.interval=0
else:
nvinfer.interval=999999999
def nvinfer_probe(pad,info):
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
while l_frame is not None:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
if frame_meta.bInferDone: # Only process inference frames
your logic .....
else:
l_frame = l_frame.next
continue
or
# not processed, return directly
Gst.PadProbeReturn.OK
Thank You very much @cihn for the suggestion. However I do not see how this should work. Where I am supposed to change nvinfer.interval from 0 to 999999999?
0 is when the pipeline should run normally and 999999999 when my condition occur? @Fiona.Chen could such a solution work?
@szymon.budziak.td
“Hello, I have written a simple test based on /opt/nvidia/deepstream/deepstream-7.0/sources/deepstream_python_apps/apps/deepstream-test1/deepstream_test_1.py. After starting, you can use http://your_jetson_ip:1234/close to stop the inference or open to restart the inference.”
#!/usr/bin/env python3
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2019-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import sys
import threading
import time
sys.path.append('../')
import os
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
from common.platform_info import PlatformInfo
from common.bus_call import bus_call
import pyds
# ---------------close or open pgie---------------
# ---------------import flask---------------
# Please install Flask for testing purposes
from flask import Flask
app = Flask(__name__)
pgie = None
# this is control pgie working or sleep
@app.route('/close')
def close():
pgie.set_property("interval",2147483647) # 2147483647 is max value
return "pgie sleep 2147483647 frame"
@app.route('/open')
def open():
pgie.set_property("interval",0)
return "pgie working"
def start_web_server():
app.run(host="0.0.0.0",port=1234)
# --------------------------------------------------
def osd_sink_pad_buffer_probe(pad,info,u_data):
frame_number=0
num_rects=0
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return
# Retrieve batch metadata from the gst_buffer
# Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
# C address of gst_buffer as input, which is obtained with hash(gst_buffer)
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
if frame_meta.bInferDone:
print(f"infer working:{frame_meta.frame_num}")
l_frame=l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
def main(args):
global pgie
# Check input arguments
args.append("/opt/nvidia/deepstream/deepstream-7.0/samples/streams/sample_720p.h264")
if len(args) != 2:
sys.stderr.write("usage: %s <media file or uri>\n" % args[0])
sys.exit(1)
platform_info = PlatformInfo()
# Standard GStreamer initialization
Gst.init(None)
# Create gstreamer elements
# Create Pipeline element that will form a connection of other elements
print("Creating Pipeline \n ")
pipeline = Gst.Pipeline()
if not pipeline:
sys.stderr.write(" Unable to create Pipeline \n")
# Source element for reading from the file
print("Creating Source \n ")
source = Gst.ElementFactory.make("filesrc", "file-source")
if not source:
sys.stderr.write(" Unable to create Source \n")
# Since the data format in the input file is elementary h264 stream,
# we need a h264parser
print("Creating H264Parser \n")
h264parser = Gst.ElementFactory.make("h264parse", "h264-parser")
decoder = Gst.ElementFactory.make("nvv4l2decoder", "nvv4l2-decoder")
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
if platform_info.is_integrated_gpu():
print("Creating nv3dsink \n")
sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
if not sink:
sys.stderr.write(" Unable to create nv3dsink \n")
else:
if platform_info.is_platform_aarch64():
print("Creating nv3dsink \n")
sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
else:
print("Creating EGLSink \n")
sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
if not sink:
sys.stderr.write(" Unable to create egl sink \n")
sink.set_property("sync",1)
print("Playing file %s " %args[1])
source.set_property('location', args[1])
if os.environ.get('USE_NEW_NVSTREAMMUX') != 'yes': # Only set these properties if not using new gst-nvstreammux
streammux.set_property('width', 1920)
streammux.set_property('height', 1080)
streammux.set_property('batched-push-timeout', 33000)
streammux.set_property('batch-size', 1)
pgie.set_property('config-file-path', "/opt/nvidia/deepstream/deepstream-7.0/sources/deepstream_python_apps/apps/deepstream-test1/dstest1_pgie_config.txt")
print("Adding elements to Pipeline \n")
pipeline.add(source)
pipeline.add(h264parser)
pipeline.add(decoder)
pipeline.add(streammux)
pipeline.add(pgie)
pipeline.add(nvvidconv)
pipeline.add(nvosd)
pipeline.add(sink)
source.link(h264parser)
h264parser.link(decoder)
sinkpad = streammux.request_pad_simple("sink_0")
if not sinkpad:
sys.stderr.write(" Unable to get the sink pad of streammux \n")
srcpad = decoder.get_static_pad("src")
if not srcpad:
sys.stderr.write(" Unable to get source pad of decoder \n")
srcpad.link(sinkpad)
streammux.link(pgie)
pgie.link(nvvidconv)
nvvidconv.link(nvosd)
nvosd.link(sink)
# create an event loop and feed gstreamer bus mesages to it
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect ("message", bus_call, loop)
# Lets add probe to get informed of the meta data generated, we add probe to
# the sink pad of the osd element, since by that time, the buffer would have
# had got all the metadata.
osdsinkpad = nvosd.get_static_pad("sink")
if not osdsinkpad:
sys.stderr.write(" Unable to get sink pad of nvosd \n")
osdsinkpad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0)
# ---------------start web server---------------
# Used to control the start and stop of pgie
th = threading.Thread(target=start_web_server)
th.daemon = True
th.start()
# ----------------------------------------------
print("start thread")
# start play back and listen to events
print("Starting pipeline \n")
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except:
pass
# cleanup
pipeline.set_state(Gst.State.NULL)
if __name__ == '__main__':
sys.exit(main(sys.argv))
If the downstream of the pipeline needs processing, you can check frame_meta.bInferDone to determine whether the upstream of the pipeline is currently in an inference state.
The meaning of pgie.set_property("interval", 2147483647) is that inference will only occur once every 2147483647 frames, which should be sufficient for your use case.