Python send video to multiple sinks

Continuing the discussion from How to create an rtsp sink with deepstream python program?:

Basically I want to create two sinks, one to rtsp stream and one to local display. Maybe in future I’ll also create a third to save to mp4.

This links gives a good explanation on how to create multiple sinks in python through tee and multiple queues.

I found an example code in the python sample apps deepstream_test_4.py which uses tee to split the pipeline to multiple sinks, but am having difficulty in understanding how the tee is linked to the two queues. Can someone help explain the code clearly ?

The relevant code from the nvidia example app (deepstream_python_apps/deepstream_test_4.py at master · NVIDIA-AI-IOT/deepstream_python_apps · GitHub) is here for reference.

tee=Gst.ElementFactory.make("tee", "nvsink-tee")
if not tee:
    sys.stderr.write(" Unable to create tee \n")

queue1=Gst.ElementFactory.make("queue", "nvtee-que1")
if not queue1:
    sys.stderr.write(" Unable to create queue1 \n")

queue2=Gst.ElementFactory.make("queue", "nvtee-que2")
if not queue2:
    sys.stderr.write(" Unable to create queue2 \n")

if (no_display) :
    print("Creating FakeSink \n")
    sink = Gst.ElementFactory.make("fakesink", "fakesink")
    if not sink:
        sys.stderr.write(" Unable to create fakesink \n")
else:
    if is_aarch64():
        transform = Gst.ElementFactory.make("nvegltransform", "nvegl-transform")

    print("Creating EGLSink \n")
    sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
    if not sink:
        sys.stderr.write(" Unable to create egl sink \n")

print("Playing file %s " %input_file)
source.set_property('location', input_file)
streammux.set_property('width', 1920)
streammux.set_property('height', 1080)
streammux.set_property('batch-size', 1)
streammux.set_property('batched-push-timeout', 4000000)
pgie.set_property('config-file-path', PGIE_CONFIG_FILE)
msgconv.set_property('config',MSCONV_CONFIG_FILE)
msgconv.set_property('payload-type', schema_type)
msgbroker.set_property('proto-lib', proto_lib)
msgbroker.set_property('conn-str', conn_str)
if cfg_file is not None:
    msgbroker.set_property('config', cfg_file)
if topic is not None:
    msgbroker.set_property('topic', topic)
msgbroker.set_property('sync', False)

print("Adding elements to Pipeline \n")
pipeline.add(source)
pipeline.add(h264parser)
pipeline.add(decoder)
pipeline.add(streammux)
pipeline.add(pgie)
pipeline.add(nvvidconv)
pipeline.add(nvosd)
pipeline.add(tee)
pipeline.add(queue1)
pipeline.add(queue2)
pipeline.add(msgconv)
pipeline.add(msgbroker)
pipeline.add(sink)
if is_aarch64() and not no_display:
    pipeline.add(transform)

print("Linking elements in the Pipeline \n")
source.link(h264parser)
h264parser.link(decoder)

sinkpad = streammux.get_request_pad("sink_0")
if not sinkpad:
    sys.stderr.write(" Unable to get the sink pad of streammux \n")
srcpad = decoder.get_static_pad("src")
if not srcpad:
    sys.stderr.write(" Unable to get source pad of decoder \n")
srcpad.link(sinkpad)

streammux.link(pgie)
pgie.link(nvvidconv)
nvvidconv.link(nvosd)
nvosd.link(tee)
queue1.link(msgconv)
msgconv.link(msgbroker)
if is_aarch64() and not no_display:
    queue2.link(transform)
    transform.link(sink)
else:
    queue2.link(sink)
sink_pad=queue1.get_static_pad("sink")
tee_msg_pad=tee.get_request_pad('src_%u')
tee_render_pad=tee.get_request_pad("src_%u")
if not tee_msg_pad or not tee_render_pad:
    sys.stderr.write("Unable to get request pads\n")
tee_msg_pad.link(sink_pad)
sink_pad=queue2.get_static_pad("sink")
tee_render_pad.link(sink_pad)

# create an event loop and feed gstreamer bus mesages to it
loop = GObject.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect ("message", bus_call, loop)

osdsinkpad = nvosd.get_static_pad("sink")
if not osdsinkpad:
    sys.stderr.write(" Unable to get sink pad of nvosd \n")

osdsinkpad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0)

Hi,
The tee element basically duplicates incoming buffers to all of its src_pads and can create multiple sources on request:

tee

The example is requesting two pads with get_request_pad and linking them to two different branches. you can repeat this process to add another sink and end up with something like this:

In that example, we use GstInterpipe because it helps isolate each sub-pipeline to handle stop, play, and errors individually but it is the same idea as using a tee.

4 Likes

Thank you Miguel, this is an excellent answer and links.

Great sharing, @miguel.taylor!

Thanks!

@tttuser Were you able to do it? I am trying to do something similar but instead of local display, I am trying to one queue for msgconv and other for rtsp sink.

print(“Adding elements to Pipeline \n”)
pipeline.add(pgie)
pipeline.add(tracker)
pipeline.add(tiler)
pipeline.add(nvvidconv)
pipeline.add(nvosd)

pipeline.add(tee)
pipeline.add(queue1)
pipeline.add(queue2)
pipeline.add(msgconv)
pipeline.add(msgbroker)

pipeline.add(nvvidconv_postosd)
pipeline.add(caps)
pipeline.add(encoder)
pipeline.add(rtppay)
if is_aarch64():
    pipeline.add(transform)
pipeline.add(sink)
pipeline.add(fakesink)

print("Linking elements in the Pipeline \n")
streammux.link(pgie)
pgie.link(tracker)
tracker.link(tiler)
tiler.link(nvvidconv)
nvvidconv.link(nvosd)
nvosd.link(tee)

queue1.link(msgconv)
msgconv.link(msgbroker)
queue2.link(nvvidconv_postosd)    
nvvidconv_postosd.link(caps)
caps.link(encoder)
encoder.link(rtppay)
rtppay.link(sink)
if is_aarch64():
    rtppay.link(transform)
    transform.link(sink)
else:
    rtppay.link(sink)  


tee_msg_pad = tee.get_request_pad('src_%u')
tee_render_pad = tee.get_request_pad("src_%u")
if not tee_msg_pad or not tee_render_pad :
    sys.stderr.write("Unable to get request pads \n")

msg_sink_pad = queue1.get_static_pad("sink")
tee_msg_pad.link(msg_sink_pad)
vid_sink_pad = queue2.get_static_pad("sink")
tee_render_pad.link(vid_sink_pad)

I have a tee splitting to rtsp_server and video_render
They working perfectly, I tried to add another element to the tee for jpeg encoding, however my pipeline would choke on the first image, I can see the display appear with an image but it stops immediately.

To try investigate I removed all the convert and jpeg encode elements in place of a simple fakesink element.

But this also stops my pipeline… Ive tried setting the fakesink property (‘sync’,False) but it makes no difference.
If anyone has some hints for why this is the case it would be appreciated.

In the meantime im going to try Interpipes, below is my code

vw_fake_sink = Gst.ElementFactory.make("fakesink", "vw_fakesink")
vw_fake_sink.set_property('sync', False)

pgie.link(tracker)
tracker.link(analytics)
analytics.link(nvvidconv1)
nvvidconv1.link(filter1)
filter1.link(tiler)
tiler.link(nvvidconv)
nvvidconv.link(nvosd)
nvosd.link(tee)

tee_stream_pad = tee.get_request_pad('src_%u')
print("Obtained request pad {0} for stream branch".format(tee_stream_pad.get_name()))
tee_render_pad = tee.get_request_pad("src_%u")
print("Obtained request pad {0} for rend branch".format(tee_render_pad.get_name()))
tee_stream_pad_mjpg = tee.get_request_pad('src_%u')
print("Obtained request pad {0} for stream MJPEG branch".format(tee_stream_pad_mjpg.get_name()))
if not tee_stream_pad or not tee_render_pad or not tee_stream_pad_mjpg:
    sys.stderr.write("Unable to get request pads\n")
    sys.exit(-1)

########## RTSP Streaming #########
pipeline.add(nvvidconv_postosd)
pipeline.add(caps)
pipeline.add(encoder)
pipeline.add(rtppay)
pipeline.add(stream_sink)

tee_stream_pad.link(nvvidconv_postosd.get_static_pad("sink"))
nvvidconv_postosd.link(caps)
caps.link(encoder)
encoder.link(rtppay)
rtppay.link(stream_sink)

########## Fakessink Output  #########
pipeline.add(vw_stream_sink)  # <------ Problems
tee_stream_pad_mjpg.link(vw_stream_sink.get_static_pad("sink"))  # <------ Problems

########## Render Output #########
pipeline.add(render_sink)

if is_aarch64():
    tee_render_pad.link(transform.get_static_pad("sink"))
    transform.link(render_sink)
else:
    nvosd.link(render_sink)