Using deepstream_test1_rtsp_in_rtsp_out.py for output as tcp instead of UDP

Hello,

I have been currently testing with the deepstream_test1_rtsp_in_rtsp_out.py script and inputting an RTSP stream through which I receive an RTSP stream output. However, during our deployment within the vehicle with external cameras, the UDP stream output is definitely quite laggy. What I discovered was that the input RTSP stream is using TCP protocol, could that cause any disruption in the output stream?

This is the config file, I do not see any parameter for udp/tcp:

[property]
gpu-id=0
net-scale-factor=0.0039215697906911373
model-color-format=0
onnx-file=./waste-12-12-2023.onnx
model-engine-file=./waste-12-12-2023.onnx_b1_gpu0_fp16.engine
infer-dims=3;640;640
labelfile-path=labels.txt
batch-size=2
workspace-size=1024
network-mode=2
num-detected-classes=10
interval=0
gie-unique-id=1
process-mode=1
network-type=0
cluster-mode=2
maintain-aspect-ratio=1
parse-bbox-func-name=NvDsInferParseYolo
custom-lib-path=./yolov5_decode.so

[class-attrs-all]
nms-iou-threshold=0.45
pre-cluster-threshold=0.85
topk=300

The program script:

import sys
sys.path.append("../")
from common.bus_call import bus_call
from common.is_aarch_64 import is_aarch64
import pyds
import platform
import math
import time
from ctypes import *
import gi
gi.require_version("Gst", "1.0")
gi.require_version("GstRtspServer", "1.0")
from gi.repository import Gst, GstRtspServer, GLib
import configparser
import datetime

import argparse

#MAX_DISPLAY_LEN = 64
#PGIE_CLASS_ID_VEHICLE = 0
#PGIE_CLASS_ID_BICYCLE = 1
#PGIE_CLASS_ID_PERSON = 2
#PGIE_CLASS_ID_ROADSIGN = 3
#PGIE_CLASS_ID_TEXTAREA = 0
#PGIE_CLASS_ID_SEAL = 1
#PGIE_CLASS_ID_CP_MIDDLE = 2
#PGIE_CLASS_ID_CP_WITH_TRAILER = 3
MUXER_OUTPUT_WIDTH = 1920
MUXER_OUTPUT_HEIGHT = 1080
MUXER_BATCH_TIMEOUT_USEC = 4000000
TILED_OUTPUT_WIDTH = 1280
TILED_OUTPUT_HEIGHT = 720
#GST_CAPS_FEATURES_NVMM = "memory:NVMM"
#OSD_PROCESS_MODE = 0
#OSD_DISPLAY_TEXT = 0
#pgie_classes_str = ["TEXTAREA", "SEAL"]

# pgie_src_pad_buffer_probe  will extract metadata received on OSD sink pad
# and update params for drawing rectangle, object information etc.


def pgie_src_pad_buffer_probe(pad, info, u_data):
    frame_number = 0
    num_rects = 0
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return

    # Retrieve batch metadata from the gst_buffer
    # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
    # C address of gst_buffer as input, which is obtained with hash(gst_buffer)
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list
    while l_frame is not None:
        try:
            # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
            # The casting is done by pyds.NvDsFrameMeta.cast()
            # The casting also keeps ownership of the underlying memory
            # in the C code, so the Python garbage collector will leave
            # it alone.
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        frame_number = frame_meta.frame_num
        print(
            "Frame Number=",
            frame_number
        )
        if ts_from_rtsp:
            ts = frame_meta.ntp_timestamp/1000000000 # Retrieve timestamp, put decimal in proper position for Unix format
            print("RTSP Timestamp:",datetime.datetime.utcfromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')) # Convert timestamp to UTC

        try:
            l_frame = l_frame.next
        except StopIteration:
            break

    return Gst.PadProbeReturn.OK


def cb_newpad(decodebin, decoder_src_pad, data):
    print("In cb_newpad\n")
    caps = decoder_src_pad.get_current_caps()
    gststruct = caps.get_structure(0)
    gstname = gststruct.get_name()
    source_bin = data
    features = caps.get_features(0)

    # Need to check if the pad created by the decodebin is for video and not
    # audio.
    print("gstname=", gstname)
    if gstname.find("video") != -1:
        # Link the decodebin pad only if decodebin has picked nvidia
        # decoder plugin nvdec_*. We do this by checking if the pad caps contain
        # NVMM memory features.
        print("features=", features)
        if features.contains("memory:NVMM"):
            # Get the source bin ghost pad
            bin_ghost_pad = source_bin.get_static_pad("src")
            if not bin_ghost_pad.set_target(decoder_src_pad):
                sys.stderr.write(
                    "Failed to link decoder src pad to source bin ghost pad\n"
                )
        else:
            sys.stderr.write(
                " Error: Decodebin did not pick nvidia decoder plugin.\n")


def decodebin_child_added(child_proxy, Object, name, user_data):
    print("Decodebin child added:", name, "\n")
    if name.find("decodebin") != -1:
        Object.connect("child-added", decodebin_child_added, user_data)

    if ts_from_rtsp:
        if name.find("source") != -1:
            pyds.configure_source_for_ntp_sync(hash(Object))


def create_source_bin(index, uri):
    print("Creating source bin")

    # Create a source GstBin to abstract this bin's content from the rest of the
    # pipeline
    bin_name = "source-bin-%02d" % index
    print(bin_name)
    nbin = Gst.Bin.new(bin_name)
    if not nbin:
        sys.stderr.write(" Unable to create source bin \n")

    # Source element for reading from the uri.
    # We will use decodebin and let it figure out the container format of the
    # stream and the codec and plug the appropriate demux and decode plugins.
    uri_decode_bin = Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
    if not uri_decode_bin:
        sys.stderr.write(" Unable to create uri decode bin \n")
    # We set the input uri to the source element
    uri_decode_bin.set_property("uri", uri)
    # Connect to the "pad-added" signal of the decodebin which generates a
    # callback once a new pad for raw data has beed created by the decodebin
    uri_decode_bin.connect("pad-added", cb_newpad, nbin)
    uri_decode_bin.connect("child-added", decodebin_child_added, nbin)

    # We need to create a ghost pad for the source bin which will act as a proxy
    # for the video decoder src pad. The ghost pad will not have a target right
    # now. Once the decode bin creates the video decoder and generates the
    # cb_newpad callback, we will set the ghost pad target to the video decoder
    # src pad.
    Gst.Bin.add(nbin, uri_decode_bin)
    bin_pad = nbin.add_pad(
        Gst.GhostPad.new_no_target(
            "src", Gst.PadDirection.SRC))
    if not bin_pad:
        sys.stderr.write(" Failed to add ghost pad in source bin \n")
        return None
    return nbin


def main(args):
    # Check input arguments
    number_sources = len(args)

    # Standard GStreamer initialization
    Gst.init(None)

    # Create gstreamer elements */
    # Create Pipeline element that will form a connection of other elements
    print("Creating Pipeline \n ")
    pipeline = Gst.Pipeline()
    is_live = False

    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")
    print("Creating streamux \n ")

    # Create nvstreammux instance to form batches from one or more sources.
    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        sys.stderr.write(" Unable to create NvStreamMux \n")

    pipeline.add(streammux)
    for i in range(number_sources):
        print("Creating source_bin ", i, " \n ")
        uri_name = args[i]
        if uri_name.find("rtsp://") == 0:
            is_live = True
        source_bin = create_source_bin(i, uri_name)
        if not source_bin:
            sys.stderr.write("Unable to create source bin \n")
        pipeline.add(source_bin)
        padname = "sink_%u" % i
        sinkpad = streammux.get_request_pad(padname)
        if not sinkpad:
            sys.stderr.write("Unable to create sink pad bin \n")
        srcpad = source_bin.get_static_pad("src")
        if not srcpad:
            sys.stderr.write("Unable to create src pad bin \n")
        srcpad.link(sinkpad)

    print("Creating Pgie \n ")
    if gie=="nvinfer":
        pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    else:
        pgie = Gst.ElementFactory.make("nvinferserver", "primary-inference")
    if not pgie:
        sys.stderr.write(" Unable to create pgie \n")
    print("Creating tiler \n ")
    tiler = Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
    if not tiler:
        sys.stderr.write(" Unable to create tiler \n")
    print("Creating nvvidconv \n ")
    nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
    if not nvvidconv:
        sys.stderr.write(" Unable to create nvvidconv \n")
    print("Creating nvosd \n ")
    nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
    if not nvosd:
        sys.stderr.write(" Unable to create nvosd \n")
    nvvidconv_postosd = Gst.ElementFactory.make(
        "nvvideoconvert", "convertor_postosd")
    if not nvvidconv_postosd:
        sys.stderr.write(" Unable to create nvvidconv_postosd \n")

    # Create a caps filter
    caps = Gst.ElementFactory.make("capsfilter", "filter")
    caps.set_property(
        "caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=I420")
    )

    # Make the encoder
    if codec == "H264":
        encoder = Gst.ElementFactory.make("nvv4l2h264enc", "encoder")
        print("Creating H264 Encoder")
    elif codec == "H265":
        encoder = Gst.ElementFactory.make("nvv4l2h265enc", "encoder")
        print("Creating H265 Encoder")
    if not encoder:
        sys.stderr.write(" Unable to create encoder")
    encoder.set_property("bitrate", bitrate)
    if is_aarch64():
        encoder.set_property("preset-level", 1)
        encoder.set_property("insert-sps-pps", 1)
        #encoder.set_property("bufapi-version", 1)

    # Make the payload-encode video into RTP packets
    if codec == "H264":
        rtppay = Gst.ElementFactory.make("rtph264pay", "rtppay")
        print("Creating H264 rtppay")
    elif codec == "H265":
        rtppay = Gst.ElementFactory.make("rtph265pay", "rtppay")
        print("Creating H265 rtppay")
    if not rtppay:
        sys.stderr.write(" Unable to create rtppay")

    # Make the UDP sink
    updsink_port_num = 5400
    sink = Gst.ElementFactory.make("udpsink", "udpsink")
    if not sink:
        sys.stderr.write(" Unable to create udpsink")

    sink.set_property("host", "224.224.255.255")
    sink.set_property("port", updsink_port_num)
    sink.set_property("async", False)
    sink.set_property("sync", 1)

    streammux.set_property("width", MUXER_OUTPUT_WIDTH)
    streammux.set_property("height", MUXER_OUTPUT_HEIGHT)
    streammux.set_property("batch-size", number_sources)
    streammux.set_property("batched-push-timeout", MUXER_BATCH_TIMEOUT_USEC)
    
    if ts_from_rtsp:
        streammux.set_property("attach-sys-ts", 0)

    if gie=="nvinfer":
        pgie.set_property("config-file-path", "dstest1_pgie_config.txt")
    else:
        pgie.set_property("config-file-path", "dstest1_pgie_inferserver_config.txt")


    pgie_batch_size = pgie.get_property("batch-size")
    if pgie_batch_size != number_sources:
        print(
            "WARNING: Overriding infer-config batch-size",
            pgie_batch_size,
            " with number of sources ",
            number_sources,
            " \n",
        )
        pgie.set_property("batch-size", number_sources)

    print("Adding elements to Pipeline \n")
    tiler_rows = int(math.sqrt(number_sources))
    tiler_columns = int(math.ceil((1.0 * number_sources) / tiler_rows))
    tiler.set_property("rows", tiler_rows)
    tiler.set_property("columns", tiler_columns)
    tiler.set_property("width", TILED_OUTPUT_WIDTH)
    tiler.set_property("height", TILED_OUTPUT_HEIGHT)
    sink.set_property("qos", 0)

    pipeline.add(pgie)
    pipeline.add(tiler)
    pipeline.add(nvvidconv)
    pipeline.add(nvosd)
    pipeline.add(nvvidconv_postosd)
    pipeline.add(caps)
    pipeline.add(encoder)
    pipeline.add(rtppay)
    pipeline.add(sink)

    streammux.link(pgie)
    pgie.link(nvvidconv)
    nvvidconv.link(tiler)
    tiler.link(nvosd)
    nvosd.link(nvvidconv_postosd)
    nvvidconv_postosd.link(caps)
    caps.link(encoder)
    encoder.link(rtppay)
    rtppay.link(sink)

    # create an event loop and feed gstreamer bus mesages to it
    loop = GLib.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect("message", bus_call, loop)

    pgie_src_pad=pgie.get_static_pad("src")
    if not pgie_src_pad:
        sys.stderr.write(" Unable to get src pad \n")
    else:
        pgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, pgie_src_pad_buffer_probe, 0)

    # Start streaming
    rtsp_port_num = 8554

    server = GstRtspServer.RTSPServer.new()
    server.props.service = "%d" % rtsp_port_num
    server.attach(None)

    factory = GstRtspServer.RTSPMediaFactory.new()
    factory.set_launch(
        '( udpsrc name=pay0 port=%d buffer-size=524288 caps="application/x-rtp, media=video, clock-rate=90000, encoding-name=(string)%s, payload=96 " )'
        % (updsink_port_num, codec)
    )
    factory.set_shared(True)
    server.get_mount_points().add_factory("/ds-test", factory)

    print(
        "\n *** DeepStream: Launched RTSP Streaming at rtsp://localhost:%d/ds-test ***\n\n"
        % rtsp_port_num
    )

    # start play back and listen to events
    print("Starting pipeline \n")
    pipeline.set_state(Gst.State.PLAYING)
    try:
        loop.run()
    except BaseException:
        pass
    # cleanup
    pipeline.set_state(Gst.State.NULL)


def parse_args():
    parser = argparse.ArgumentParser(description='RTSP Output Sample Application Help ')
    parser.add_argument("-i", "--input",
                  help="Path to input H264 elementry stream", nargs="+", default=["a"], required=True)
    parser.add_argument("-g", "--gie", default="nvinfer",
                  help="choose GPU inference engine type nvinfer or nvinferserver , default=nvinfer", choices=['nvinfer','nvinferserver'])
    parser.add_argument("-c", "--codec", default="H264",
                  help="RTSP Streaming Codec H264/H265 , default=H264", choices=['H264','H265'])
    parser.add_argument("-b", "--bitrate", default=4000000,
                  help="Set the encoding bitrate ", type=int)
    parser.add_argument("--rtsp-ts", action="store_true", default=False, dest='rtsp_ts', help="Attach NTP timestamp from RTSP source",
    )
    # Check input arguments
    if len(sys.argv)==1:
        parser.print_help(sys.stderr)
        sys.exit(1)
    args = parser.parse_args()
    global codec
    global bitrate
    global stream_path
    global gie
    global ts_from_rtsp
    gie = args.gie
    codec = args.codec
    bitrate = args.bitrate
    stream_path = args.input
    ts_from_rtsp = args.rtsp_ts
    return stream_path

if __name__ == '__main__':
    stream_path = parse_args()
    sys.exit(main(stream_path))

Any help would be appreciated towards this matter.

Board: Orin NX 8GB
Deepstream: 6.3
Jetpack: 5.1.1

Thanks,
Neville

diff --git a/apps/deepstream-rtsp-in-rtsp-out/deepstream_test1_rtsp_in_rtsp_out.py b/apps/deepstream-rtsp-in-rtsp-out/deepstream_test1_rtsp_in_rtsp_out.py
index d8af91a..ead64d5 100755
--- a/apps/deepstream-rtsp-in-rtsp-out/deepstream_test1_rtsp_in_rtsp_out.py
+++ b/apps/deepstream-rtsp-in-rtsp-out/deepstream_test1_rtsp_in_rtsp_out.py
@@ -131,6 +131,14 @@ def decodebin_child_added(child_proxy, Object, name, user_data):
         if name.find("source") != -1:
             pyds.configure_source_for_ntp_sync(hash(Object))
 
+def deep_element_added_callback (self, sub_bin, element, user_data):
+    #python callback for the 'deep-element-added' signal
+    factory = Gst.Element.get_factory(element)
+    print(f"deep {factory.name} element child added")
+    if factory.name.find("rtspsrc") != -1:
+        print("set rtspsrc use tcp")
+        element.set_property("protocols", 4)
+
 
 def create_source_bin(index, uri):
     print("Creating source bin")
@@ -155,6 +163,7 @@ def create_source_bin(index, uri):
     # callback once a new pad for raw data has beed created by the decodebin
     uri_decode_bin.connect("pad-added", cb_newpad, nbin)
     uri_decode_bin.connect("child-added", decodebin_child_added, nbin)
+    uri_decode_bin.connect("deep-element-added", deep_element_added_callback, nbin)
 
     # We need to create a ghost pad for the source bin which will act as a proxy
     # for the video decoder src pad. The ghost pad will not have a target right

Thanks for the reply. @junshengy The code definitely works fine, however I do not think there is any change in the stream. For example, when I try to view the RTSP stream over VLC, it shows with a maximum of 1second delay and there is no frames getting stuck. However, while using the code, the stream keeps getting stuck then continues and stuck again then continue, happens in a cycle. Could there be anything to change in the config params? I have changed a few params such as interval, network-mode etc. but there is no change in the stream getting stuck.

Is there any other params which could be added into the config file? Additionally, we have connected the camera directly to the Orin NX device through ethernet so there should be no latency.

Edit:

Adding onto the issue, there is still a jitter in the frames while using a standard YOLOv5s COCO model and our custom YOLOv5 model.

Here is a video sample:

External Media

1.How do you measure the latency?

The latency cannot be directly compared with VLC.

There are many elements in the DS pipeline, such as infer, encoder, etc., which will cause delays.

vlc

rtsp camera --> vlc

guess the pipline from your video.

rtsp camera --> infer --> encoder --> rtsp sink --> vlc

Each step brings delays
2.The stuck issue may be caused by the above-mentioned downstream network. You can watch it on your monitor using the patch below

diff --git a/apps/deepstream-rtsp-in-rtsp-out/deepstream_test1_rtsp_in_rtsp_out.py b/apps/deepstream-rtsp-in-rtsp-out/deepstream_test1_rtsp_in_rtsp_out.py
index 022784e..ecffee5 100755
--- a/apps/deepstream-rtsp-in-rtsp-out/deepstream_test1_rtsp_in_rtsp_out.py
+++ b/apps/deepstream-rtsp-in-rtsp-out/deepstream_test1_rtsp_in_rtsp_out.py
@@ -270,6 +270,7 @@ def main(args):
     # Make the UDP sink
     updsink_port_num = 5400
     sink = Gst.ElementFactory.make("udpsink", "udpsink")
+    nv3dsink = Gst.ElementFactory.make("nv3dsink", "udpsink")
     if not sink:
         sys.stderr.write(" Unable to create udpsink")
 
@@ -316,21 +317,23 @@ def main(args):
     pipeline.add(tiler)
     pipeline.add(nvvidconv)
     pipeline.add(nvosd)
-    pipeline.add(nvvidconv_postosd)
-    pipeline.add(caps)
-    pipeline.add(encoder)
-    pipeline.add(rtppay)
-    pipeline.add(sink)
+    # pipeline.add(nvvidconv_postosd)
+    # pipeline.add(caps)
+    # pipeline.add(encoder)
+    # pipeline.add(rtppay)
+    # pipeline.add(sink)
+    pipeline.add(nv3dsink)
 
     streammux.link(pgie)
     pgie.link(nvvidconv)
     nvvidconv.link(tiler)
     tiler.link(nvosd)
-    nvosd.link(nvvidconv_postosd)
-    nvvidconv_postosd.link(caps)
-    caps.link(encoder)
-    encoder.link(rtppay)
-    rtppay.link(sink)
+    nvosd.link(nv3dsink)
+    # nvosd.link(nvvidconv_postosd)
+    # nvvidconv_postosd.link(caps)
+    # caps.link(encoder)
+    # encoder.link(rtppay)
+    # rtppay.link(sink)
 
     # create an event loop and feed gstreamer bus mesages to it
     loop = GLib.MainLoop()

So comparing VLC with deepstream pipeline would not be the best solution towards the delay, the delay is bound to come into play. I have tried to run your patch but it seems I am getting the following error:

Creating nvosd 
 
Creating H264 Encoder
Creating H264 rtppay
Adding elements to Pipeline 


 *** DeepStream: Launched RTSP Streaming at rtsp://localhost:8554/ds-test ***


Starting pipeline 

Error: gst-core-error-quark: GStreamer error: state change failed and some element failed to post a proper error message with the reason for the failure. (4): gstbasesink.c(5367): gst_base_sink_change_state (): /GstPipeline:pipeline0/GstNv3dSink:udpsink:
Failed to start

The additions:

updsink_port_num = 5400
sink = Gst.ElementFactory.make("udpsink", "udpsink")
nv3dsink = Gst.ElementFactory.make("nv3dsink", "udpsink")

pipeline.add(pgie)
pipeline.add(tiler)
pipeline.add(nvvidconv)
pipeline.add(nvosd)
#pipeline.add(nvvidconv_postosd)
#pipeline.add(caps)
#pipeline.add(encoder)
#pipeline.add(rtppay)
#pipeline.add(sink)
pipeline.add(nv3dsink)


streammux.link(pgie)
pgie.link(nvvidconv)
nvvidconv.link(tiler)
tiler.link(nvosd)
nvosd.link(nv3dsink)
#nvosd.link(nvvidconv_postosd)
#nvvidconv_postosd.link(caps)
#caps.link(encoder)
#encoder.link(rtppay)
#rtppay.link(sink)

# create an event loop and feed gstreamer bus mesages to it
loop = GLib.MainLoop()

Whether you are running in ssh and whether orin is plugged into your monitor.

What is the output of the following command ?

cat /proc/$(pidof "gnome-terminal-server")/environ | tr '\0' '\n' | grep ^DISPLAY=

Run export DISPLAY=:x in ssh (the value output from the above command)

Apologies for the late reply. I am running on SSH and ran your command as well:

I got an error for file not found:

cat: /proc//environ: No such file or directory

Thanks,
Neville

Don’t you use jetpack 5.1? The above commands should be valid on ubuntu 20.04. If you have upgraded to 22.04 and Jetpack 6.0 .

please use

cat /proc/$(pidof "gnome-shell")/environ | tr '\0' '\n' | grep ^DISPLAY=

Yes I am having Jetpack 5.1.1.

The command did not work on ssh for some reason but when I connected to the display, I got a response.

x means the value you get from the command, which should be export DISPLAY=:0

Sorry for the late reply, seems like I was able to run the command fine. On a side note, I was able to run the whole program with little latency through a camera change, so the live detections are showing up properly.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.