Gstreamer leaky queue stops the pipeline

I am working on a pipeline that runs on many RTSP streams. Following the official Python tutorial for Deepstream 6.0 , I used a source bin that uses a uridecodebin to decode the elements. I am working with a Tesla T4 and the official container Deepstream 6.0 devel. However, I noticed that this leads to a memory leak when the pipeline is too slow because the source bin keeps accumulating video buffers. I tried to add a leaky queue after the source bin with goal of dropping frames. But this seems to stall the pipeline. I know that Deepstream provides some drop-interval configuration, but I’d rather drop frames when they are in excess rather than having a fixed values which could drop too many or too few frames.

The following code get in inputs a list of url and it create a Gstreamer pipeline. Specifically, for each url, a uridecodebin element is initialized and attached to a queue element. Each queue has the following properties: leaky=2, max-size-buffers=1 and flush-on-eos=1. When I start the pipeline, I can see from nvidia-smi dmon that some video is decoded (NVDEC is used). After a second, everything stops.
I would expect that decoding to keep going, and to push frames into the queue with each queue dropping the old frame every time it receives a new one. Am I wrong?

Code to initialize a Bin to decode the video (source_bin.py). You probably don’t need to read it, but here it is:

import sys

from gi.repository import Gst

from pipeline.utils.pipeline import create_gst_elemement


class SourceBin:

    @classmethod
    def create_source_bin(cls, index: int, uri: str):
        # Create a source GstBin to abstract this bin's content from the rest of the pipeline
        bin_name = "source-bin-%02d" % index
        nbin = Gst.Bin.new(bin_name)
        if not nbin:
            sys.stderr.write(" Unable to create source bin \n")
        # Source element for reading from the uri.
        # We will use decodebin and let it figure out the container format of the
        # stream and the codec and plug the appropriate demux and decode plugins.
        uri_decode_bin = create_gst_elemement("uridecodebin", f"uridecodebin_{index}")
        # We set the input uri to the source element
        uri_decode_bin.set_property("uri", uri)
        # Connect to the "pad-added" signal of the decodebin which generates a
        # callback once a new pad for raw data has been created by the decodebin
        uri_decode_bin.connect("pad-added", cls.callback_newpad, nbin)
        uri_decode_bin.connect("pad-removed", cls.callback_pad_removed, nbin)
        uri_decode_bin.connect("no-more-pads", cls.callback_no_more_pads, nbin)
        uri_decode_bin.connect("child-added", cls.decodebin_child_added, nbin)
        # We need to create a ghost pad for the source bin which will act as a proxy
        # for the video decoder src pad. The ghost pad will not have a target right
        # now. Once the decode bin creates the video decoder and generates the
        # cb_newpad callback, we will set the ghost pad target to the video decoder
        # src pad.
        Gst.Bin.add(nbin, uri_decode_bin)
        bin_pad = nbin.add_pad(
            Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC)
        )
        if not bin_pad:
            sys.stderr.write(" Failed to add ghost pad in source bin \n")
            return None
        # Connect bus
        return nbin

    @classmethod
    def callback_newpad(cls, uridecodebin, uridecodebin_new_src_pad, data):
        print(f"SourceBin: added pad {uridecodebin_new_src_pad.name} to {uridecodebin.name}")
        caps = uridecodebin_new_src_pad.get_current_caps()
        gststruct = caps.get_structure(0)
        gstname = gststruct.get_name()
        source_bin = data
        features = caps.get_features(0)

        # Need to check if the pad created by the decodebin is for video and not audio.
        if gstname.find("video") != -1:
            # Link the decodebin pad only if decodebin has picked nvidia decoder plugin nvdec_*.
            # We do this by checking if the pad caps contain NVMM memory features.
            if features.contains("memory:NVMM"):
                # Get the source bin ghost pad
                bin_ghost_pad = source_bin.get_static_pad("src")
                if not bin_ghost_pad.set_target(uridecodebin_new_src_pad):
                    sys.stderr.write(
                        "Failed to link decoder src pad to source bin ghost pad\n"
                    )
            else:
                sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")

    @classmethod
    def decodebin_child_added(cls, child_proxy, Object, name, user_data):
        if name.find("decodebin") != -1:
            Object.connect("child-added", cls.decodebin_child_added, user_data)

    @classmethod
    def callback_pad_removed(cls, uridecodebin, uridecodebin_removed_src_pad, data):
        print(f"SourceBin: Removed pad {uridecodebin_removed_src_pad.name} from {uridecodebin.name}")

    @classmethod
    def callback_no_more_pads(cls, uridecodebin, data):
        print(f"SourceBin: No more pads for {uridecodebin.name}")

Pipeline:

import sys
sys.path.append("../")
import gi
gi.require_version("Gst", "1.0")
gi.require_version("GstRtspServer", "1.0")
from gi.repository import GObject, Gst

from source_bin import SourceBin


def create_gst_elemement(factory_name, instance_name):
    element = Gst.ElementFactory.make(factory_name, instance_name)
    if not element:
        sys.stderr.write(f" Unable to create {factory_name} {instance_name} \n")
    return element



urls = [
    "my rtsp url..."
]

GObject.threads_init()
Gst.init(None)
pipeline = Gst.Pipeline()

source_bins = [
    SourceBin.create_source_bin(i, url)
    for i, url in enumerate(urls)
]
frames_queues = list()
for i in range(len(source_bins)):
    frames_queue = create_gst_elemement("queue", f"frame_queue_{i}")
    frames_queue.set_property("leaky", 2)
    frames_queue.set_property("max-size-buffers", 1)
    frames_queue.set_property("flush-on-eos", 1)
    frames_queues.append(frames_queue)

for source_bin, frames_queue in zip(source_bins, frames_queues):
    pipeline.add(source_bin)
    pipeline.add(frames_queue)
    source_bin.link(frames_queue)

# loop = GObject.MainLoop()
# bus = pipeline.get_bus()
# bus.add_signal_watch()
# bus.connect("message", bus_call, loop)

pipeline.set_state(Gst.State.PLAYING)

For your convenience, you can find a graph of the pipeline: pipeline.pdf - Google Drive . Everything you see on the left of a queue is the Bin to decode the video. Each row represent a stream and it’s identical to the other ones.

Why is there no sink in your pipeline?

I wanted to find a minimum reproducible sample to understand the queue behavior. I’d like to use a the queue to drop data in a more complex pipeline when the pipeline can’t process the frames fast enough. However, I noticed that the queue was not working in that situation, as the pipeline was producing a memory leak.

This is related: Memory leak in DeepStream - #12 by marcoslucianops

Related question: Deepstream RTSP memory leak (with code to reproduce the issue)