Input-selector in deepstreamer pipeline

Please provide complete information as applicable to your setup.

• Hardware Platform: RTX 3080
• DeepStream Version: 6.1.1-devel
• TensorRT Version: 8.4.1
• NVIDIA GPU Driver Version: 515.105.01

Issue: Im trying to use input-selector to choose between two different videos every 5 seconds. The pipeline looks like

videofile → streammux → nvinfer → nvtracker → nvvidconver → nvstreamdemux →

src_0 → queue → nvvidconver → nvdsosd → identity → input-selector → identity → nveglglessink

src_1 → queue → nvvidconver → nvdsosd → identity → input-selector → identity → nveglglessink

The first time that change from one stream to the other works perfect. But once it tries to change again, the other stream is freeze. I setup the input-selector (video_switch) with the following properties.

video_switch.set_property(“sync-streams”, True)
video_switch.set_property(“cache-buffers”, False)
video_switch.set_property(“sync-mode”, “active-segment”)

Can you help me, please?

Can you provide the whole code and configurations? There are so many details needed to identify whether the problem is caused by DeepSteam or your own implementation.

Thanks for your response. Well, I simplify the pipeline in order to reproduce the problem.

Starting code

import gi
import logging
gi.require_version(‘Gst’, ‘1.0’)
from gi.repository import GObject, Gst
from threading import Thread, Event
import sys
OSD_PROCESS_MODE= 0

class Main:
def init(self,args):
self.number_sources = len(args)-1
print("Number of sources: ", self.number_sources)
self.pipeline = Gst.Pipeline.new(“pipeline”)
self.bus = self.pipeline.get_bus()

    # Create Input source
    self.videosrc1_uri_name = args[1]
    self.videosrc1 = self.__create_source_bin(0, self.videosrc1_uri_name)
    self.videosrc1_src_pad = self.videosrc1.get_static_pad("src")
    self.pipeline.add(self.videosrc1)

    self.videosrc2_uri_name = args[2]
    self.videosrc2 = self.__create_source_bin(1, self.videosrc2_uri_name)
    self.videosrc2_src_pad = self.videosrc2.get_static_pad("src")
    self.pipeline.add(self.videosrc2)

    # create nvstreammux
    self.streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    self.streammux_sink_pad_0 = self.streammux.get_request_pad("sink_0")
    self.streammux_sink_pad_1 = self.streammux.get_request_pad("sink_1")
    self.streammux.set_property('width', 1920)
    self.streammux.set_property('height', 1080)
    self.streammux.set_property('batch-size', self.number_sources)
    self.streammux.set_property('batched-push-timeout', 4000000)
    self.pipeline.add(self.streammux)

    # # Create PGIE
    # self.pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    # self.pgie.set_property('config-file-path', "elvistech_yolo_v2.txt")
    # self.pipeline.add(self.pgie)

    # Create identity
    self.identity = Gst.ElementFactory.make("identity", "identity_0")
    self.pipeline.add(self.identity)


    # # Create OSD
    # self.nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
    # self.nvosd.set_property('process-mode',OSD_PROCESS_MODE)
    # self.pipeline.add(self.nvosd)


    # Create demux
    self.demux = Gst.ElementFactory.make("nvstreamdemux", "nvstreamdemux")
    self.demux_src_pad_0 = self.demux.get_request_pad("src_0")
    self.demux_src_pad_1 = self.demux.get_request_pad("src_1")
    self.pipeline.add(self.demux)

    # Create queue
    self.queue_0 = Gst.ElementFactory.make("queue", "queue_0")
    self.queue_0_sink_pad = self.queue_0.get_static_pad("sink")
    self.queue_0.set_property("leaky",1)
    self.pipeline.add(self.queue_0)

    self.queue_1 = Gst.ElementFactory.make("queue", "queue_1")
    self.queue_1_sink_pad = self.queue_1.get_static_pad("sink")
    self.queue_1.set_property("leaky",1)
    self.pipeline.add(self.queue_1)

    # Video converter
    self.videoconvert_0 = Gst.ElementFactory.make("nvvideoconvert", "nvvideoconvert_0")
    # self.videoconvert_0_sink_pad = self.videoconvert_0.get_static_pad("sink")
    self.videoconvert_0_src_pad = self.videoconvert_0.get_static_pad("src")
    self.pipeline.add(self.videoconvert_0)

    self.videoconvert_1 = Gst.ElementFactory.make("nvvideoconvert", "nvvideoconvert_1")
    # self.videoconvert_1_sink_pad = self.videoconvert_1.get_static_pad("sink")
    self.videoconvert_1_src_pad = self.videoconvert_1.get_static_pad("src")
    self.pipeline.add(self.videoconvert_1)


    # Create input selector
    self.input_selector = Gst.ElementFactory.make("input-selector", "input-selector")
    self.input_selector_sink_pad_0 = self.input_selector.get_request_pad("sink_0")
    self.input_selector_sink_pad_1 = self.input_selector.get_request_pad("sink_1")
    self.input_selector.set_property("active-pad", self.input_selector_sink_pad_0)
    self.pipeline.add(self.input_selector)

    # Create videosink
    self.videosink = Gst.ElementFactory.make("nveglglessink", "videosink")
    self.pipeline.add(self.videosink)

    # Link process
    self.videosrc1_src_pad.link(self.streammux_sink_pad_0)
    self.videosrc2_src_pad.link(self.streammux_sink_pad_1)
    # self.streammux.link(self.pgie)
    # self.pgie.link(self.nvosd)
    # self.nvosd.link(self.demux)
    # self.pgie.link(self.demux)
    self.streammux.link(self.identity)
    self.identity.link(self.demux)
    self.demux_src_pad_0.link(self.queue_0_sink_pad)
    self.queue_0.link(self.videoconvert_0)
    self.videoconvert_0_src_pad.link(self.input_selector_sink_pad_0)
    self.demux_src_pad_1.link(self.queue_1_sink_pad)
    self.queue_1.link(self.videoconvert_1)
    self.videoconvert_1_src_pad.link(self.input_selector_sink_pad_1)


    self.input_selector.link(self.videosink)

def change_source(self):
    while not self.stop_event.is_set():
        self.stop_event.wait(5)
        print(self.queue_0.get_property("current-level-buffers"))

        if self.stop_event.is_set():
            break
        
        if self.input_selector_sink_pad_0.get_property("active"):
            self.input_selector.set_property("active-pad", self.input_selector_sink_pad_1)
        else:
            self.input_selector.set_property("active-pad", self.input_selector_sink_pad_0)

def run(self):
    self.pipeline.set_state(Gst.State.PLAYING)

    self.stop_event = Event()
    self.change_source_thread = Thread(target=self.change_source)
    self.change_source_thread.start()

    while True:
        try:
            message = self.bus.timed_pop(Gst.SECOND)

            if message == None:
                pass
            elif message.type == Gst.MessageType.EOS:
                break
            elif message.type == Gst.MessageType.ERROR:
                break
        except KeyboardInterrupt:
            break

    self.stop_event.set()
    self.pipeline.set_state(Gst.State.NULL)

@staticmethod
def __cb_newpad(decodebin, decoder_src_pad,data):
    print("In cb_newpad\n")
    caps=decoder_src_pad.get_current_caps()
    if not caps:
        caps = decoder_src_pad.query_caps()
    gststruct=caps.get_structure(0)
    gstname=gststruct.get_name()
    source_bin=data
    features=caps.get_features(0)

    # Need to check if the pad created by the decodebin is for video and not
    # audio.
    print("gstname=",gstname)
    if(gstname.find("video")!=-1):
        # Link the decodebin pad only if decodebin has picked nvidia
        # decoder plugin nvdec_*. We do this by checking if the pad caps contain
        # NVMM memory features.
        print("features=",features)
        if features.contains("memory:NVMM"):
            # Get the source bin ghost pad
            bin_ghost_pad=source_bin.get_static_pad("src")
            if not bin_ghost_pad.set_target(decoder_src_pad):
                sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
        else:
            sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")


def __decodebin_child_added(self, child_proxy,Object,name,user_data):
    print("Decodebin child added:", name, "\n")
    if(name.find("decodebin") != -1):
        Object.connect("child-added",self.__decodebin_child_added,user_data)

    if "source" in name:
        source_element = child_proxy.get_by_name("source")
        if source_element.find_property('drop-on-latency') != None:
            Object.set_property("drop-on-latency", True)

def __create_source_bin(self, index,uri):
    print("Creating source bin")

    # Create a source GstBin to abstract this bin's content from the rest of the
    # pipeline
    bin_name="source-bin-%02d" %index
    print(bin_name)
    nbin=Gst.Bin.new(bin_name)
    if not nbin:
        sys.stderr.write(" Unable to create source bin \n")

    # Source element for reading from the uri.
    # We will use decodebin and let it figure out the container format of the
    # stream and the codec and plug the appropriate demux and decode plugins.
    file_loop = False
    if file_loop:
        # use nvurisrcbin to enable file-loop
        uri_decode_bin=Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
        uri_decode_bin.set_property("file-loop", 1)
        uri_decode_bin.set_property("cudadec-memtype", 0)
    else:
        uri_decode_bin=Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
    if not uri_decode_bin:
        sys.stderr.write(" Unable to create uri decode bin \n")
    # We set the input uri to the source element
    uri_decode_bin.set_property("uri",uri)
    # Connect to the "pad-added" signal of the decodebin which generates a
    # callback once a new pad for raw data has beed created by the decodebin
    uri_decode_bin.connect("pad-added",self.__cb_newpad,nbin)
    uri_decode_bin.connect("child-added",self.__decodebin_child_added,nbin)

    # We need to create a ghost pad for the source bin which will act as a proxy
    # for the video decoder src pad. The ghost pad will not have a target right
    # now. Once the decode bin creates the video decoder and generates the
    # cb_newpad callback, we will set the ghost pad target to the video decoder
    # src pad.
    Gst.Bin.add(nbin,uri_decode_bin)
    bin_pad=nbin.add_pad(Gst.GhostPad.new_no_target("src",Gst.PadDirection.SRC))
    if not bin_pad:
        sys.stderr.write(" Failed to add ghost pad in source bin \n")
        return None
    return nbin

if name == ‘main’:
GObject.threads_init()
Gst.init(None)
start = Main(sys.argv)
start.run()

The idea is simple: the pipeline is simple:

2 video source files → nvstreammux → identity → nvstreamdemux → queue → nvvidconvert → inputselector → nveglglessink

Lately, I want to change identity by a nvinfer + tracker. The pipeline runs, but after some changes in the screen (nveglglessink), it freezes and run faster sometimes.

I can run your code with two input local file sources. You’d better add capsfilter after nvvideoconvert.

Are you running with live sources?

Hi Fiona.

I added the capsfilter. This is the new code

Start code

import gi
import logging
gi.require_version(‘Gst’, ‘1.0’)
from gi.repository import GObject, Gst
from threading import Thread, Event
import sys
OSD_PROCESS_MODE= 0

class Main:
def init(self,args):
self.number_sources = len(args)-1
print("Number of sources: ", self.number_sources)
self.pipeline = Gst.Pipeline.new(“pipeline”)
self.bus = self.pipeline.get_bus()

    # Create Input source
    self.videosrc1_uri_name = args[1]
    self.videosrc1 = self.__create_source_bin(0, self.videosrc1_uri_name)
    self.videosrc1_src_pad = self.videosrc1.get_static_pad("src")
    self.pipeline.add(self.videosrc1)

    self.videosrc2_uri_name = args[2]
    self.videosrc2 = self.__create_source_bin(1, self.videosrc2_uri_name)
    self.videosrc2_src_pad = self.videosrc2.get_static_pad("src")
    self.pipeline.add(self.videosrc2)

    # create nvstreammux
    self.streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    self.streammux_sink_pad_0 = self.streammux.get_request_pad("sink_0")
    self.streammux_sink_pad_1 = self.streammux.get_request_pad("sink_1")
    self.streammux.set_property('width', 1920)
    self.streammux.set_property('height', 1080)
    self.streammux.set_property('batch-size', self.number_sources)
    self.streammux.set_property('batched-push-timeout', 4000000)
    self.pipeline.add(self.streammux)

    # Create PGIE
    self.pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    self.pgie.set_property('config-file-path', "elvistech_yolo_v2.txt")
    self.pipeline.add(self.pgie)

    # # Create identity
    # self.identity = Gst.ElementFactory.make("identity", "identity_0")
    # self.pipeline.add(self.identity)


    # # Create OSD
    # self.nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
    # self.nvosd.set_property('process-mode',OSD_PROCESS_MODE)
    # self.pipeline.add(self.nvosd)


    # Create demux
    self.demux = Gst.ElementFactory.make("nvstreamdemux", "nvstreamdemux")
    self.demux_src_pad_0 = self.demux.get_request_pad("src_0")
    self.demux_src_pad_1 = self.demux.get_request_pad("src_1")
    self.pipeline.add(self.demux)

    # Create queue
    self.queue_0 = Gst.ElementFactory.make("queue", "queue_0")
    self.queue_0_sink_pad = self.queue_0.get_static_pad("sink")
    self.queue_0.set_property("leaky",1)
    self.pipeline.add(self.queue_0)

    self.queue_1 = Gst.ElementFactory.make("queue", "queue_1")
    self.queue_1_sink_pad = self.queue_1.get_static_pad("sink")
    self.queue_1.set_property("leaky",1)
    self.pipeline.add(self.queue_1)

    # Video converter
    self.videoconvert_0 = Gst.ElementFactory.make("nvvideoconvert", "nvvideoconvert_0")
    # self.videoconvert_0_sink_pad = self.videoconvert_0.get_static_pad("sink")
    self.videoconvert_0_src_pad = self.videoconvert_0.get_static_pad("src")
    self.pipeline.add(self.videoconvert_0)

    self.videoconvert_1 = Gst.ElementFactory.make("nvvideoconvert", "nvvideoconvert_1")
    # self.videoconvert_1_sink_pad = self.videoconvert_1.get_static_pad("sink")
    self.videoconvert_1_src_pad = self.videoconvert_1.get_static_pad("src")
    self.pipeline.add(self.videoconvert_1)

    # Capsfilter
    self.capsfilter_0 = Gst.ElementFactory.make("capsfilter","capsfilter_0")
    self.capsfilter_0_caps = Gst.Caps.from_string("video/x-raw, format=I420")
    self.capsfilter_0.set_property("caps", self.capsfilter_0_caps)
    self.capsfilter_0_src_pad = self.capsfilter_0.get_static_pad("src")
    self.pipeline.add(self.capsfilter_0)

    self.capsfilter_1 = Gst.ElementFactory.make("capsfilter","capsfilter_1")
    self.capsfilter_1_caps = Gst.Caps.from_string("video/x-raw, format=I420")
    self.capsfilter_1.set_property("caps", self.capsfilter_1_caps)
    self.capsfilter_1_src_pad = self.capsfilter_1.get_static_pad("src")
    self.pipeline.add(self.capsfilter_1)


    # Create input selector
    self.input_selector = Gst.ElementFactory.make("input-selector", "input-selector")
    self.input_selector_sink_pad_0 = self.input_selector.get_request_pad("sink_0")
    self.input_selector_sink_pad_1 = self.input_selector.get_request_pad("sink_1")
    self.input_selector.set_property("active-pad", self.input_selector_sink_pad_0)
    self.pipeline.add(self.input_selector)

    # Create videosink
    self.videosink = Gst.ElementFactory.make("nveglglessink", "videosink")
    self.pipeline.add(self.videosink)

    # Link process
    self.videosrc1_src_pad.link(self.streammux_sink_pad_0)
    self.videosrc2_src_pad.link(self.streammux_sink_pad_1)
    self.streammux.link(self.pgie)
    # self.pgie.link(self.nvosd)
    # self.nvosd.link(self.demux)
    self.pgie.link(self.demux)
    # self.streammux.link(self.identity)
    # self.identity.link(self.demux)

    # First demux
    self.demux_src_pad_0.link(self.queue_0_sink_pad)
    self.queue_0.link(self.videoconvert_0)
    self.videoconvert_0.link(self.capsfilter_0)
    self.capsfilter_0_src_pad.link(self.input_selector_sink_pad_0)

    # Second demux
    self.demux_src_pad_1.link(self.queue_1_sink_pad)
    self.queue_1.link(self.videoconvert_1)
    self.videoconvert_1.link(self.capsfilter_1)
    self.capsfilter_1_src_pad.link(self.input_selector_sink_pad_1)

    # Videosink
    self.input_selector.link(self.videosink)

    # Plot the pipeline
    Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, "switch2")

def change_source(self):
    while not self.stop_event.is_set():
        self.stop_event.wait(10)
        print(self.queue_0.get_property("current-level-buffers"))

        if self.stop_event.is_set():
            break
        
        if self.input_selector_sink_pad_0.get_property("active"):
            self.input_selector.set_property("active-pad", self.input_selector_sink_pad_1)
        else:
            self.input_selector.set_property("active-pad", self.input_selector_sink_pad_0)

def run(self):
    self.pipeline.set_state(Gst.State.PLAYING)

    self.stop_event = Event()
    self.change_source_thread = Thread(target=self.change_source)
    self.change_source_thread.start()

    while True:
        try:
            message = self.bus.timed_pop(Gst.SECOND)

            if message == None:
                pass
            elif message.type == Gst.MessageType.EOS:
                break
            elif message.type == Gst.MessageType.ERROR:
                break
        except KeyboardInterrupt:
            break

    self.stop_event.set()
    self.pipeline.set_state(Gst.State.NULL)

@staticmethod
def __cb_newpad(decodebin, decoder_src_pad,data):
    print("In cb_newpad\n")
    caps=decoder_src_pad.get_current_caps()
    if not caps:
        caps = decoder_src_pad.query_caps()
    gststruct=caps.get_structure(0)
    gstname=gststruct.get_name()
    source_bin=data
    features=caps.get_features(0)

    # Need to check if the pad created by the decodebin is for video and not
    # audio.
    print("gstname=",gstname)
    if(gstname.find("video")!=-1):
        # Link the decodebin pad only if decodebin has picked nvidia
        # decoder plugin nvdec_*. We do this by checking if the pad caps contain
        # NVMM memory features.
        print("features=",features)
        if features.contains("memory:NVMM"):
            # Get the source bin ghost pad
            bin_ghost_pad=source_bin.get_static_pad("src")
            if not bin_ghost_pad.set_target(decoder_src_pad):
                sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
        else:
            sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")


def __decodebin_child_added(self, child_proxy,Object,name,user_data):
    print("Decodebin child added:", name, "\n")
    if(name.find("decodebin") != -1):
        Object.connect("child-added",self.__decodebin_child_added,user_data)

    if "source" in name:
        source_element = child_proxy.get_by_name("source")
        if source_element.find_property('drop-on-latency') != None:
            Object.set_property("drop-on-latency", True)

def __create_source_bin(self, index,uri):
    print("Creating source bin")

    # Create a source GstBin to abstract this bin's content from the rest of the
    # pipeline
    bin_name="source-bin-%02d" %index
    print(bin_name)
    nbin=Gst.Bin.new(bin_name)
    if not nbin:
        sys.stderr.write(" Unable to create source bin \n")

    # Source element for reading from the uri.
    # We will use decodebin and let it figure out the container format of the
    # stream and the codec and plug the appropriate demux and decode plugins.
    file_loop = False
    if file_loop:
        # use nvurisrcbin to enable file-loop
        uri_decode_bin=Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
        uri_decode_bin.set_property("file-loop", 1)
        uri_decode_bin.set_property("cudadec-memtype", 0)
    else:
        uri_decode_bin=Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
    if not uri_decode_bin:
        sys.stderr.write(" Unable to create uri decode bin \n")
    # We set the input uri to the source element
    uri_decode_bin.set_property("uri",uri)
    # Connect to the "pad-added" signal of the decodebin which generates a
    # callback once a new pad for raw data has beed created by the decodebin
    uri_decode_bin.connect("pad-added",self.__cb_newpad,nbin)
    uri_decode_bin.connect("child-added",self.__decodebin_child_added,nbin)

    # We need to create a ghost pad for the source bin which will act as a proxy
    # for the video decoder src pad. The ghost pad will not have a target right
    # now. Once the decode bin creates the video decoder and generates the
    # cb_newpad callback, we will set the ghost pad target to the video decoder
    # src pad.
    Gst.Bin.add(nbin,uri_decode_bin)
    bin_pad=nbin.add_pad(Gst.GhostPad.new_no_target("src",Gst.PadDirection.SRC))
    if not bin_pad:
        sys.stderr.write(" Failed to add ghost pad in source bin \n")
        return None
    return nbin

if name == ‘main’:
GObject.threads_init()
Gst.init(None)
start = Main(sys.argv)
start.run()

I added capsfilter after nvideoconvert. Like I mentioned before, the pipeline runs, but the video freezes after a while and has erratic behaviour. I tested with other two complete different videos and have the same issue.

Not for the moment. But, the videos were recorded from two different cameras. I dont know if it is a problem related with the timestamp. I tried to run the same pipeline, but at the end did not put the input-selector. Instead, I just put two nveglglessink and worked. But the ultimate goal of my app is that to change between inputs, depending on events that happen in the nvinfer. For the moment I am just trying to change every 5 seconds. Switch2.pdf didnt work, but switch3.pdf works.

switch2.pdf (31.3 KB)
switch3.pdf (31.1 KB)

Additionally I checked the GST_DEBUG=4, and we found the following warnings when the videos start freezing

WARN basetransform gstbasetransform.c:2167:default_generate_output:<nvvideoconvert_1> could not get buffer from pool: flushing
0:00:13.820799987 39867 0x7f5aa804c180 WARN videodecoder gstvideodecoder.c:2761:gst_video_decoder_prepare_finish_frame: decreasing timestamp (0:00:07.534333333 < 0:00:07.734666666)
0:00:13.837498942 39867 0x7f5aa804b9e0 WARN videodecoder gstvideodecoder.c:2761:gst_video_decoder_prepare_finish_frame: decreasing timestamp (0:00:07.300333333 < 0:00:07.500333333)

Yes. Seems the streams with different frame rate or timestamp will cause some problem with sink element. We need to investigate.

Thanks for your response Fiona. My guess is that the problem is the nveglglessink, but i am not quite sure. Let me know if you have any update.

It will take some time to fix the problem. We will inform when there is any progress.

This warning suggests that the nvvideoconvert_1 element is unable to retrieve a buffer from the buffer pool, which might indicate a problem with the buffer allocation. This issue could potentially lead to freezing in video playback.

Thanks Fiona for your help.

@srua The bug has been fixed. You may get it with the next DeepStream release.

1 Like

Thanks Fiona for all your help

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.