Gstreamer leaky queue stops the pipeline

I am working on a pipeline that runs on many RTSP streams. Following the official Python tutorial for Deepstream 6.0 , I used a source bin that uses a uridecodebin to decode the elements. I am working with a Tesla T4 and the official container Deepstream 6.0 devel. However, I noticed that this leads to a memory leak when the pipeline is too slow because the source bin keeps accumulating video buffers. I tried to add a leaky queue after the source bin with goal of dropping frames. But this seems to stall the pipeline. I know that Deepstream provides some drop-interval configuration, but I’d rather drop frames when they are in excess rather than having a fixed values which could drop too many or too few frames.

The following code get in inputs a list of url and it create a Gstreamer pipeline. Specifically, for each url, a uridecodebin element is initialized and attached to a queue element. Each queue has the following properties: leaky=2, max-size-buffers=1 and flush-on-eos=1. When I start the pipeline, I can see from nvidia-smi dmon that some video is decoded (NVDEC is used). After a second, everything stops.
I would expect that decoding to keep going, and to push frames into the queue with each queue dropping the old frame every time it receives a new one. Am I wrong?

Code to initialize a Bin to decode the video ( You probably don’t need to read it, but here it is:

import sys

from gi.repository import Gst

from pipeline.utils.pipeline import create_gst_elemement

class SourceBin:

    def create_source_bin(cls, index: int, uri: str):
        # Create a source GstBin to abstract this bin's content from the rest of the pipeline
        bin_name = "source-bin-%02d" % index
        nbin =
        if not nbin:
            sys.stderr.write(" Unable to create source bin \n")
        # Source element for reading from the uri.
        # We will use decodebin and let it figure out the container format of the
        # stream and the codec and plug the appropriate demux and decode plugins.
        uri_decode_bin = create_gst_elemement("uridecodebin", f"uridecodebin_{index}")
        # We set the input uri to the source element
        uri_decode_bin.set_property("uri", uri)
        # Connect to the "pad-added" signal of the decodebin which generates a
        # callback once a new pad for raw data has been created by the decodebin
        uri_decode_bin.connect("pad-added", cls.callback_newpad, nbin)
        uri_decode_bin.connect("pad-removed", cls.callback_pad_removed, nbin)
        uri_decode_bin.connect("no-more-pads", cls.callback_no_more_pads, nbin)
        uri_decode_bin.connect("child-added", cls.decodebin_child_added, nbin)
        # We need to create a ghost pad for the source bin which will act as a proxy
        # for the video decoder src pad. The ghost pad will not have a target right
        # now. Once the decode bin creates the video decoder and generates the
        # cb_newpad callback, we will set the ghost pad target to the video decoder
        # src pad.
        Gst.Bin.add(nbin, uri_decode_bin)
        bin_pad = nbin.add_pad(
            Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC)
        if not bin_pad:
            sys.stderr.write(" Failed to add ghost pad in source bin \n")
            return None
        # Connect bus
        return nbin

    def callback_newpad(cls, uridecodebin, uridecodebin_new_src_pad, data):
        print(f"SourceBin: added pad {} to {}")
        caps = uridecodebin_new_src_pad.get_current_caps()
        gststruct = caps.get_structure(0)
        gstname = gststruct.get_name()
        source_bin = data
        features = caps.get_features(0)

        # Need to check if the pad created by the decodebin is for video and not audio.
        if gstname.find("video") != -1:
            # Link the decodebin pad only if decodebin has picked nvidia decoder plugin nvdec_*.
            # We do this by checking if the pad caps contain NVMM memory features.
            if features.contains("memory:NVMM"):
                # Get the source bin ghost pad
                bin_ghost_pad = source_bin.get_static_pad("src")
                if not bin_ghost_pad.set_target(uridecodebin_new_src_pad):
                        "Failed to link decoder src pad to source bin ghost pad\n"
                sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")

    def decodebin_child_added(cls, child_proxy, Object, name, user_data):
        if name.find("decodebin") != -1:
            Object.connect("child-added", cls.decodebin_child_added, user_data)

    def callback_pad_removed(cls, uridecodebin, uridecodebin_removed_src_pad, data):
        print(f"SourceBin: Removed pad {} from {}")

    def callback_no_more_pads(cls, uridecodebin, data):
        print(f"SourceBin: No more pads for {}")


import sys
import gi
gi.require_version("Gst", "1.0")
gi.require_version("GstRtspServer", "1.0")
from gi.repository import GObject, Gst

from source_bin import SourceBin

def create_gst_elemement(factory_name, instance_name):
    element = Gst.ElementFactory.make(factory_name, instance_name)
    if not element:
        sys.stderr.write(f" Unable to create {factory_name} {instance_name} \n")
    return element

urls = [
    "my rtsp url..."

pipeline = Gst.Pipeline()

source_bins = [
    SourceBin.create_source_bin(i, url)
    for i, url in enumerate(urls)
frames_queues = list()
for i in range(len(source_bins)):
    frames_queue = create_gst_elemement("queue", f"frame_queue_{i}")
    frames_queue.set_property("leaky", 2)
    frames_queue.set_property("max-size-buffers", 1)
    frames_queue.set_property("flush-on-eos", 1)

for source_bin, frames_queue in zip(source_bins, frames_queues):

# loop = GObject.MainLoop()
# bus = pipeline.get_bus()
# bus.add_signal_watch()
# bus.connect("message", bus_call, loop)


For your convenience, you can find a graph of the pipeline: pipeline.pdf - Google Drive . Everything you see on the left of a queue is the Bin to decode the video. Each row represent a stream and it’s identical to the other ones.

Why is there no sink in your pipeline?

I wanted to find a minimum reproducible sample to understand the queue behavior. I’d like to use a the queue to drop data in a more complex pipeline when the pipeline can’t process the frames fast enough. However, I noticed that the queue was not working in that situation, as the pipeline was producing a memory leak.

This is related: Memory leak in DeepStream - #12 by marcoslucianops

Related question: Deepstream RTSP memory leak (with code to reproduce the issue)

I’ve run the following pipeline for 5 hours, there is no apparent memory leak.

gst-launch-1.0 uridecodebin uri=rtsp://xxxx ! queue ! fakesink

There is no update from you for a period, assuming this is not an issue anymore.
Hence we are closing this topic. If need further support, please open a new one.

Hey @yingliu , the memory leak was caused by trying to decode too many video streams. So that’s not a problem anymore.
However, do you know why the code above would stop? I am not sure I understood completely the behavior of the queue component in this context.
Thank you

How many streams are you used when the pipeline stops?

That happened every time. I don’t think it was a bug. It’s probably just a misuse of the queue.

Hello @mfoglio As the original memory issue doesn’t exist and now it is a pure gstreamer usage topic, suggest to discuss in Gstreamer forum, thanks.