Using v4l2sink with DeepStream

Please provide complete information as applicable to your setup.

⇾ Hardware Platform GPU (deepstream:6.1.1-devel)
⇾ NVIDIA GPU Driver Version Driver Version: 520.61.05
⇾ GPU: NVIDIA GeForce GTX 1650

I’m working on DeepStream code to pass rtsp streams to virtual V4L2 devices (I used v4l2loopback to create the virtual devices). I have a code that works without errors, however, I can’t read the V4L2 device.

Does anyone know of a working DeepStream code where v4l2sink is used? It can be in Python or C++. I have tried to find an example, without success.

Here is my code. The writing part to v4l2sink is in the function: create_v4l2sink_branch()

import sys
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstRtspServer', '1.0')
import math
import sys
import common.utils as DS_UTILS
import pyds
from common.bus_call import bus_call
from common.FPS import PERF_DATA
from common.is_aarch_64 import is_aarch64
from gi.repository import GLib, Gst, GstRtspServer

CODEC="H264"
BITRATE=4000000
MAX_DISPLAY_LEN = 64
MUXER_OUTPUT_WIDTH = 1920
MUXER_OUTPUT_HEIGHT = 1080
MUXER_BATCH_TIMEOUT_USEC = 400000
TILED_OUTPUT_WIDTH = 1920
TILED_OUTPUT_HEIGHT = 1080
GST_CAPS_FEATURES_NVMM = "memory:NVMM"
OSD_PROCESS_MODE = 0
OSD_DISPLAY_TEXT = 1
MUX_SYNC_INPUTS = 0

ds_loop=None
perf_data = None

def terminate_pipeline(u_data):
    global ds_loop
    pass
    # if global_config.request_to_stop == True:
    #     print("Aborting pipeline by request")
    #     ds_loop.quit()
    #     return False
    return True 

def create_onscreen_branch(pipeline, gst_elem, index):
    print("Creating EGLSink")
    sink = DS_UTILS.create_gst_element("nveglglessink", f"nvvideo-renderer-{index}")
    sink.set_property('sync', 0)
    sink.set_property('async', 1)
    pipeline.add(sink)

    if is_aarch64():
        transform = DS_UTILS.create_gst_element("nvegltransform", f"nvegl-transform{index}")
        pipeline.add(transform)
        gst_elem.link(transform)
        transform.link(sink)
    else:
        gst_elem.link(sink)
    sink.set_property("qos", 0)

def create_v4l2sink_branch(pipeline, gst_elem, index, output_video_device):
    # Create a caps filter
    caps = DS_UTILS.create_gst_element("capsfilter", f"filter-{index}")
    #caps.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=I420"))
    #caps.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=NV12"))

    identity = DS_UTILS.create_gst_element("identity", f"identity-{index}")
    identity.set_property("drop-allocation", 1)

    nvvidconv = DS_UTILS.create_gst_element("nvvideoconvert", f"convertor-{index}")    

    sink = DS_UTILS.create_gst_element("v4l2sink", f"v4l2sink-{index}")
    sink.set_property('device', output_video_device)
    sink.set_property("sync", 0)
    sink.set_property("async", 1)

    pipeline.add(caps)
    pipeline.add(nvvidconv)
    pipeline.add(identity)
    pipeline.add(sink)

    gst_elem.link(caps)
    caps.link(nvvidconv)
    nvvidconv.link(identity)
    identity.link(sink)

def run_pipeline(rtsp_v4l2_pairs):
    # Check input arguments
    number_sources = len(rtsp_v4l2_pairs)
    perf_data = PERF_DATA(number_sources)

    # Standard GStreamer initialization
    Gst.init(None)

    # Create gstreamer elements */
    # Create Pipeline element that will form a connection of other elements
    print("Creating Pipeline")
    pipeline = Gst.Pipeline()
    is_live = False
    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")
        return 

    # Create nvstreammux instance to form batches from one or more sources.
    streammux = DS_UTILS.create_gst_element("nvstreammux", "Stream-muxer")
    pipeline.add(streammux)
    for i in range(number_sources):
        uri_name = rtsp_v4l2_pairs[i][0]
        print("  Creating source_bin {} --> {}".format(i, uri_name))
        is_live = uri_name.find("rtsp://") == 0
        source_bin = DS_UTILS.create_source_bin(i, uri_name)

        pipeline.add(source_bin)
        padname = "sink_%u" % i
        sinkpad = streammux.get_request_pad(padname)
        if not sinkpad:
            sys.stderr.write("Unable to create sink pad bin \n")
        srcpad = source_bin.get_static_pad("src")
        if not srcpad:
            sys.stderr.write("Unable to create src pad bin \n")
        srcpad.link(sinkpad)

    # streammux setup 
    if is_live:
        print("  At least one of the sources is live")
        streammux.set_property('live-source', 1)   
    streammux.set_property('width', MUXER_OUTPUT_WIDTH)
    streammux.set_property('height', MUXER_OUTPUT_HEIGHT)
    streammux.set_property('batch-size', number_sources)
    streammux.set_property("batched-push-timeout", MUXER_BATCH_TIMEOUT_USEC)
    #streammux.set_property("sync-inputs", MUX_SYNC_INPUTS)


    queue = DS_UTILS.create_gst_element("queue", "queue1")
    pipeline.add(queue)
    nvstreamdemux = DS_UTILS.create_gst_element("nvstreamdemux", "nvstreamdemux")
    pipeline.add(nvstreamdemux)

    # linking
    streammux.link(queue)
    queue.link(nvstreamdemux)


    for i in range(number_sources):
        queue = DS_UTILS.create_gst_element("queue", f"queue{2+i}")
        pipeline.add(queue)

        demuxsrcpad = nvstreamdemux.get_request_pad(f"src_{i}")
        if not demuxsrcpad:
            sys.stderr.write("Unable to create demux src pad \n")

        queuesinkpad = queue.get_static_pad("sink")
        if not queuesinkpad:
            sys.stderr.write("Unable to create queue sink pad \n")
        demuxsrcpad.link(queuesinkpad)

        #create_onscreen_branch(pipeline=pipeline, gst_elem=queue, index=i)
        create_v4l2sink_branch(pipeline=pipeline, gst_elem=queue, index=i, output_video_device=rtsp_v4l2_pairs[i][1])

    # for termate the pipeline 
    GLib.timeout_add_seconds(1, terminate_pipeline, 0)
    # display FPS 
    GLib.timeout_add(5000, perf_data.perf_print_callback)

    # create an event loop and feed gstreamer bus mesages to it
    loop = GLib.MainLoop()
    ds_loop = loop
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect("message", bus_call, loop)
 

    print("Starting pipeline")
    # start play back and listed to events      
    pipeline.set_state(Gst.State.PLAYING)
    try:
        loop.run()
    except:
        pass
    # cleanup
    print("Pipeline ended")
    pipeline.set_state(Gst.State.NULL)

if __name__ == '__main__':
    import json
    import sys 

    pairs = [
        ("rtsp://192.168.1.88:554/22", "/dev/video6")
    ]
    

    run_pipeline(rtsp_v4l2_pairs=pairs)

1.Did you run it on the GPU directly or in a docker?
2.Could you try to use the Gstreamer command to run the pipeline first? You can find a gstreamer command that can be used first, and then it is easier to write code. Like:

gst-launch-1.0 -v filesrc location=test.mp4 ! qtdemux ! h264parse ! avdec_h264 ! videoconvert ! v4l2sink device=/dev/xxx

Thank you for your answer.
1→ I ran this inside the container.
To ensure the container has access to the rtsp stream, I did (inside the container):

gst-launch-1.0 rtspsrc location=rtsp://192.168.1.88:554/11 latency=10 ! rtph265depay ! h265parse ! decodebin ! nvvideoconvert ! autovideosink
and I was able to see the video.

To ensure that the container can write the v4l2 device, I did (inside the container):
gst-launch-1.0 videotestsrc ! videoconvert ! v4l2sink device=/dev/video5
and I was able to see the video pattern reading from /dev/video5 in both case, from the container and from the host, using this:

gst-launch-1.0 v4l2src device=/dev/video5 ! videoconvert  ! xvimagesink

2→ Yes, I have tried many combinations and none work for me. In particular, this one does not give me any errors:

(inside the container)

gst-launch-1.0 rtspsrc location=rtsp://192.168.1.88:554/11 latency=10 ! rtph265depay ! h265parse ! decodebin ! nvvideoconvert  ! v4l2sink device=/dev/video5 

this is the output:

Setting pipeline to PAUSED ...
Pipeline is live and does not need PREROLL ...
Progress: (open) Opening Stream
Progress: (connect) Connecting to rtsp://192.168.1.88:554/11
Progress: (open) Retrieving server options
Progress: (open) Retrieving media info
Progress: (request) SETUP stream 0
Progress: (request) SETUP stream 1
Progress: (open) Opened Stream
Setting pipeline to PLAYING ...
New clock: GstSystemClock
Progress: (request) Sending PLAY request
Progress: (request) Sending PLAY request
Progress: (request) Sent PLAY request

The problem is I cannot read from the device /dev/video5 as in the case of video pattern. When I tried I got:

gst-launch-1.0 v4l2src device=/dev/video5 ! videoconvert  ! xvimagesink
Setting pipeline to PAUSED ...
ERROR: from element /GstPipeline:pipeline0/GstV4l2Src:v4l2src0: Device '/dev/video5' is not a capture device.
Additional debug info:
../sys/v4l2/v4l2_calls.c(629): gst_v4l2_open (): /GstPipeline:pipeline0/GstV4l2Src:v4l2src0:
Capabilities: 0x5200002
ERROR: pipeline doesn't want to preroll.
Failed to set pipeline to PAUSED.
Setting pipeline to NULL ...
Freeing pipeline ...

Maybe I solve the problem if I use pure gstreamer, but I need to use DeepStream, since I plan to run ML models over the video betwen rtsp and v4l2

Could you help to try to use the command below and check whether there are any problems?

gst-launch-1.0 rtspsrc location=rtsp://192.168.1.88:554/11 latency=10 ! rtph265depay ! h265parse ! decodebin ! videoconvert  ! v4l2sink device=/dev/video5 
gst-launch-1.0 v4l2src device=/dev/video5 ! videoconvert  ! xvimagesink