I am working on a pipeline that runs on many RTSP streams. Following the official Python tutorial for Deepstream 6.0 , I used a source bin that uses a uridecodebin
to decode the elements. I am working with a Tesla T4 and the official container Deepstream 6.0 devel. However, I noticed that this leads to a memory leak when the pipeline is too slow because the source bin keeps accumulating video buffers. I tried to add a leaky queue after the source bin with goal of dropping frames. But this seems to stall the pipeline. I know that Deepstream provides some drop-interval
configuration, but I’d rather drop frames when they are in excess rather than having a fixed values which could drop too many or too few frames.
The following code get in inputs a list of url and it create a Gstreamer pipeline. Specifically, for each url, a uridecodebin
element is initialized and attached to a queue
element. Each queue has the following properties: leaky=2
, max-size-buffers=1
and flush-on-eos=1
. When I start the pipeline, I can see from nvidia-smi dmon
that some video is decoded (NVDEC is used). After a second, everything stops.
I would expect that decoding to keep going, and to push frames into the queue with each queue dropping the old frame every time it receives a new one. Am I wrong?
Code to initialize a Bin
to decode the video (source_bin.py
). You probably don’t need to read it, but here it is:
import sys
from gi.repository import Gst
from pipeline.utils.pipeline import create_gst_elemement
class SourceBin:
@classmethod
def create_source_bin(cls, index: int, uri: str):
# Create a source GstBin to abstract this bin's content from the rest of the pipeline
bin_name = "source-bin-%02d" % index
nbin = Gst.Bin.new(bin_name)
if not nbin:
sys.stderr.write(" Unable to create source bin \n")
# Source element for reading from the uri.
# We will use decodebin and let it figure out the container format of the
# stream and the codec and plug the appropriate demux and decode plugins.
uri_decode_bin = create_gst_elemement("uridecodebin", f"uridecodebin_{index}")
# We set the input uri to the source element
uri_decode_bin.set_property("uri", uri)
# Connect to the "pad-added" signal of the decodebin which generates a
# callback once a new pad for raw data has been created by the decodebin
uri_decode_bin.connect("pad-added", cls.callback_newpad, nbin)
uri_decode_bin.connect("pad-removed", cls.callback_pad_removed, nbin)
uri_decode_bin.connect("no-more-pads", cls.callback_no_more_pads, nbin)
uri_decode_bin.connect("child-added", cls.decodebin_child_added, nbin)
# We need to create a ghost pad for the source bin which will act as a proxy
# for the video decoder src pad. The ghost pad will not have a target right
# now. Once the decode bin creates the video decoder and generates the
# cb_newpad callback, we will set the ghost pad target to the video decoder
# src pad.
Gst.Bin.add(nbin, uri_decode_bin)
bin_pad = nbin.add_pad(
Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC)
)
if not bin_pad:
sys.stderr.write(" Failed to add ghost pad in source bin \n")
return None
# Connect bus
return nbin
@classmethod
def callback_newpad(cls, uridecodebin, uridecodebin_new_src_pad, data):
print(f"SourceBin: added pad {uridecodebin_new_src_pad.name} to {uridecodebin.name}")
caps = uridecodebin_new_src_pad.get_current_caps()
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()
source_bin = data
features = caps.get_features(0)
# Need to check if the pad created by the decodebin is for video and not audio.
if gstname.find("video") != -1:
# Link the decodebin pad only if decodebin has picked nvidia decoder plugin nvdec_*.
# We do this by checking if the pad caps contain NVMM memory features.
if features.contains("memory:NVMM"):
# Get the source bin ghost pad
bin_ghost_pad = source_bin.get_static_pad("src")
if not bin_ghost_pad.set_target(uridecodebin_new_src_pad):
sys.stderr.write(
"Failed to link decoder src pad to source bin ghost pad\n"
)
else:
sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")
@classmethod
def decodebin_child_added(cls, child_proxy, Object, name, user_data):
if name.find("decodebin") != -1:
Object.connect("child-added", cls.decodebin_child_added, user_data)
@classmethod
def callback_pad_removed(cls, uridecodebin, uridecodebin_removed_src_pad, data):
print(f"SourceBin: Removed pad {uridecodebin_removed_src_pad.name} from {uridecodebin.name}")
@classmethod
def callback_no_more_pads(cls, uridecodebin, data):
print(f"SourceBin: No more pads for {uridecodebin.name}")
Pipeline:
import sys
sys.path.append("../")
import gi
gi.require_version("Gst", "1.0")
gi.require_version("GstRtspServer", "1.0")
from gi.repository import GObject, Gst
from source_bin import SourceBin
def create_gst_elemement(factory_name, instance_name):
element = Gst.ElementFactory.make(factory_name, instance_name)
if not element:
sys.stderr.write(f" Unable to create {factory_name} {instance_name} \n")
return element
urls = [
"my rtsp url..."
]
GObject.threads_init()
Gst.init(None)
pipeline = Gst.Pipeline()
source_bins = [
SourceBin.create_source_bin(i, url)
for i, url in enumerate(urls)
]
frames_queues = list()
for i in range(len(source_bins)):
frames_queue = create_gst_elemement("queue", f"frame_queue_{i}")
frames_queue.set_property("leaky", 2)
frames_queue.set_property("max-size-buffers", 1)
frames_queue.set_property("flush-on-eos", 1)
frames_queues.append(frames_queue)
for source_bin, frames_queue in zip(source_bins, frames_queues):
pipeline.add(source_bin)
pipeline.add(frames_queue)
source_bin.link(frames_queue)
# loop = GObject.MainLoop()
# bus = pipeline.get_bus()
# bus.add_signal_watch()
# bus.connect("message", bus_call, loop)
pipeline.set_state(Gst.State.PLAYING)
For your convenience, you can find a graph of the pipeline: pipeline.pdf - Google Drive . Everything you see on the left of a queue
is the Bin
to decode the video. Each row represent a stream and it’s identical to the other ones.