How to Stabilize Output RTSP Streams in DeepStream for Reliable Web Streaming?

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU) GPU
• DeepStream Version 7.0.0
• JetPack Version (valid for Jetson only)
• TensorRT Version
• NVIDIA GPU Driver Version (valid for GPU only) 535.171.04
• Issue Type( questions, new requirements, bugs) questions

I am currently running a custom model using Python code with DeepStream.

In DeepStream, I am taking 4 RTSP streams as input and outputting them as RTSP streams. The problem is that the RTSP streams being output are unstable. When I use ffmpeg to convert the RTSP streams to MPEG-DASH for live streaming on the web, the segmentation creation in ffmpeg terminates due to EOF.

When I use JSMPeg or another RTSP stream player for web streaming, the video frequently buffers or freezes.

I would like to know how to stabilize the output RTSP streams in DeepStream and what options to change to prevent these issues.

Below is the current Python code.


def decodebin_child_added(child_proxy, Object, name, user_data):
    if name.find('decodebin') != -1:
        Object.connect('child-added', decodebin_child_added, user_data)
    if name.find('nvv4l2decoder') != -1:
        Object.set_property('drop-frame-interval', 0)
        Object.set_property('num-extra-surfaces', 1)
        Object.set_property('cudadec-memtype', 0)
        Object.set_property('gpu-id', GPU_ID)


def cb_newpad(decodebin, pad, user_data):
    streammux_sink_pad = user_data
    caps = pad.get_current_caps()
    if not caps:
        caps = pad.query_caps()
    structure = caps.get_structure(0)
    name = structure.get_name()
    features = caps.get_features(0)
    if name.find('video') != -1:
        if features.contains('memory:NVMM'):
            if pad.link(streammux_sink_pad) != Gst.PadLinkReturn.OK:
                sys.stderr.write('ERROR: Failed to link source to streammux sink pad\n')
        else:
            sys.stderr.write('ERROR: decodebin did not pick NVIDIA decoder plugin')


def create_uridecode_bin(stream_id, uri, streammux):
    bin_name = 'source-bin-%04d' % stream_id
    bin = Gst.ElementFactory.make('uridecodebin', bin_name)
    bin.set_property('uri', uri)
    bin.set_property('expose-all-streams', False)

    if 'rtsp://' in uri:
        rtspsrc = bin.get_by_name("source")
        if rtspsrc:
            rtspsrc.set_property('short-header', True)
            rtspsrc.set_property('ntp-sync', True)
            rtspsrc.set_property('latency', 200)
            rtspsrc.set_property('ntp-time-source', 3)  # 3 usually stands for "automatic"

    pad_name = 'sink_%u' % stream_id
    streammux_sink_pad = streammux.get_request_pad(pad_name)
    bin.connect('pad-added', cb_newpad, streammux_sink_pad)
    bin.connect('child-added', decodebin_child_added, 0)

    fps_streams['stream{0}'.format(stream_id)] = GETFPS(stream_id)
    return bin


def bus_call(bus, message, user_data):
    loop = user_data
    t = message.type

    if os.path.exists('quit.txt'):
        os.remove('quit.txt')
        with open('flow_quit.txt', 'w') as f:
            f.write('')
        loop.quit()

    if t == Gst.MessageType.EOS:
        sys.stdout.write('DEBUG: EOS\n')
        loop.quit()
    elif t == Gst.MessageType.WARNING:
        err, debug = message.parse_warning()
        sys.stderr.write('WARNING: %s: %s\n' % (err, debug))
    elif t == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        sys.stderr.write('ERROR: %s: %s\n' % (err, debug))
        loop.quit()
    return True


def run_script_in_conda_env(conda_env_name, script_path):
    # Find the path to the conda executable
    conda_executable = "/home/ubuntu/anaconda3/bin/activate"  # Update this to the actual path of your conda executable

    # Construct the command to activate the environment and run the script
    command = f'python3 {script_path}'

    # Execute the command
    result = subprocess.run(command, shell=True, capture_output=True, text=True, executable='/bin/bash')

    # Print the output and error if any
    print(result.stdout)
    if result.stderr:
        print(result.stderr)


def main():
    global ip, port, cctv_id, cctv_pw
     
    flow_thread = Thread(target=run_script_in_conda_env, args=("count", "detect_and_track_sub.py"))
    flow_thread.daemon = True
    flow_thread.start()
    
    Gst.init(None)

    loop = GLib.MainLoop()

    pipeline = Gst.Pipeline()
    if not pipeline:
        sys.stderr.write('ERROR: Failed to create pipeline\n')
        sys.exit(1)

    streammux = Gst.ElementFactory.make('nvstreammux', 'nvstreammux')
    if not streammux:
        sys.stderr.write('ERROR: Failed to create nvstreammux\n')
        sys.exit(1)
    pipeline.add(streammux)

    tiler = Gst.ElementFactory.make('nvmultistreamtiler', 'nvtiler')
    if not tiler:
        sys.stderr.write('ERROR: Failed to create nvmultistreamtiler\n')
        sys.exit(1)
    tiler.set_property('rows', ROWS)
    tiler.set_property('columns', COLUMNS)
    tiler.set_property('width', STREAMMUX_WIDTH / ROWS)
    tiler.set_property('height', STREAMMUX_HEIGHT / COLUMNS)
    pipeline.add(tiler)

    pgie = Gst.ElementFactory.make('nvinfer', 'pgie')
    if not pgie:
        sys.stderr.write('ERROR: Failed to create nvinfer\n')
        sys.exit(1)

    converter = Gst.ElementFactory.make('nvvideoconvert', 'nvvideoconvert')
    if not converter:
        sys.stderr.write('ERROR: Failed to create nvvideoconvert\n')
        sys.exit(1)

    osd = Gst.ElementFactory.make('nvdsosd', 'nvdsosd')
    if not osd:
        sys.stderr.write('ERROR: Failed to create nvdsosd\n')
        sys.exit(1)

    nvvidconv_postosd = Gst.ElementFactory.make("nvvideoconvert", "convertor_postosd")

    caps = Gst.ElementFactory.make("capsfilter", "filter")
    caps.set_property(
        "caps", Gst.Caps.from_string(f"video/x-raw(memory:NVMM), format=I420, width=(int)1920, height=(int)1080")
    )

    # Make the encoder
    encoder = Gst.ElementFactory.make("nvv4l2h264enc", "encoder")

    if not encoder:
        sys.stderr.write(" Unable to create encoder")
    encoder.set_property("bitrate", bitrate)
    # encoder.set_property("bitrate", 8000000)
    encoder.set_property("iframeinterval", 60)
    encoder.set_property("profile", 2)

    # Make the payload-encode video into RTP packets
    rtppay = Gst.ElementFactory.make("rtph264pay", "rtppay")
    # rtppay.set_property("mtu", 1400)
    print("Creating H264 rtppay")

    # Make the UDP sink
    udpsink_port_num = 25400
    sink = Gst.ElementFactory.make("udpsink", "udpsink")
    if not sink:
        sys.stderr.write(" Unable to create udpsink")

    sink.set_property("host", "127.0.0.1")
    sink.set_property("port", udpsink_port_num)
    sink.set_property("async", False)
    sink.set_property("sync", True)
    sink.set_property("buffer-size", buf_size)

    streammux.set_property('batch-size', STREAMMUX_BATCH_SIZE)
    streammux.set_property('batched-push-timeout', 25000)
    streammux.set_property('width', STREAMMUX_WIDTH)
    streammux.set_property('height', STREAMMUX_HEIGHT)
    streammux.set_property('enable-padding', 0)
    streammux.set_property('live-source', True)
    streammux.set_property('attach-sys-ts', True)
    pgie.set_property('config-file-path', CONFIG_INFER)

    osd.set_property('process-mode', int(pyds.MODE_GPU))
    osd.set_property('display-bbox', 1)
    osd.set_property('display-text', 0)

    pipeline.add(pgie)
    pipeline.add(converter)
    pipeline.add(osd)
    pipeline.add(nvvidconv_postosd)
    pipeline.add(caps)
    pipeline.add(encoder)
    pipeline.add(rtppay)
    pipeline.add(sink)

    streammux.link(tiler)
    tiler.link(pgie)
    pgie.link(converter)
    converter.link(osd)
    osd.link(nvvidconv_postosd)
    nvvidconv_postosd.link(caps)
    caps.link(encoder)
    encoder.link(rtppay)
    rtppay.link(sink)

    sources = [None, None, None, None]
    for idx in range(4):
        sources[idx] = 'rtsp://' + str(cctv_id[idx]) + ':' + str(cctv_pw[idx]) + '@' + \
                     str(ip[idx]) + ':' + str(port[idx]) + '/profile2/media.smp'

    for idx, source in enumerate(sources):
        source_bin = create_uridecode_bin(idx, source, streammux)
        pipeline.add(source_bin)
        source_bin.sync_state_with_parent()

    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect('message', bus_call, loop)

    # Connect custom probe function to pgie src pad
    pgie_src_pad = pgie.get_static_pad('src')
    if not pgie_src_pad:
        sys.stderr.write('ERROR: Failed to get pgie src pad\n')
        sys.exit(1)
    else:
        pgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, tracker_src_pad_buffer_probe, 0)

    rtsp_port_num = 18555

    server = GstRtspServer.RTSPServer.new()
    server.props.service = "%d" % rtsp_port_num
    server.attach(None)

    factory = GstRtspServer.RTSPMediaFactory.new()
    factory.set_launch(
        '( udpsrc name=pay0 port=%d buffer-size=%d caps="application/x-rtp, media=video, clock-rate=90000, '
        'encoding-name=(string)%s, payload=96 " )'
        % (udpsink_port_num, buf_size, 'H264')
    )
    factory.set_shared(True)
    server.get_mount_points().add_factory("/ds-test", factory)

    print(
        "\n *** DeepStream: Launched RTSP Streaming at rtsp://localhost:%d/ds-test ***\n\n"
        % rtsp_port_num
    )

    pipeline.set_state(Gst.State.PLAYING)

    try:
        loop.run()
    except Exception as e:
        print(e)
        pass

    pipeline.set_state(Gst.State.NULL)

  1. what is the bitrate value, 8000000 is too much for 1920x1280.
  2. To narrow down this issue. you can replace udpsink with nveglglessink to check if the output is fine.

Thank you for your response.

Initially, we used a bitrate of 1000000 and a buffer size of 262144. We tested replacing udpsink with nveglglessink. When using nveglglessink, we confirmed that the video output was smooth and uninterrupted.

The problem is that we need RTSP output for web streaming. To generate RTSP output, we understand that udpsink must be used. We are curious if there is any alternative method for this.

If there are no alternatives, is there a way to stabilize the RTSP stream when using udpsink?

Below is the Python code using nveglglessink.


def decodebin_child_added(child_proxy, Object, name, user_data):
    if name.find('decodebin') != -1:
        Object.connect('child-added', decodebin_child_added, user_data)
    if name.find('nvv4l2decoder') != -1:
        Object.set_property('drop-frame-interval', 0)
        Object.set_property('num-extra-surfaces', 1)
        Object.set_property('cudadec-memtype', 0)
        Object.set_property('gpu-id', GPU_ID)


def cb_newpad(decodebin, pad, user_data):
    streammux_sink_pad = user_data
    caps = pad.get_current_caps()
    if not caps:
        caps = pad.query_caps()
    structure = caps.get_structure(0)
    name = structure.get_name()
    features = caps.get_features(0)
    if name.find('video') != -1:
        if features.contains('memory:NVMM'):
            if pad.link(streammux_sink_pad) != Gst.PadLinkReturn.OK:
                sys.stderr.write('ERROR: Failed to link source to streammux sink pad\n')
        else:
            sys.stderr.write('ERROR: decodebin did not pick NVIDIA decoder plugin')


def create_uridecode_bin(stream_id, uri, streammux):
    bin_name = 'source-bin-%04d' % stream_id
    bin = Gst.ElementFactory.make('uridecodebin', bin_name)
    bin.set_property('uri', uri)
    bin.set_property('expose-all-streams', False)

    # RTSP 소스에 대해 NTP 동기화 설정이 필요한 경우
    if 'rtsp://' in uri:
        rtspsrc = bin.get_by_name("source")
        if rtspsrc:
            rtspsrc.set_property('short-header', True)
            rtspsrc.set_property('ntp-sync', True)
            rtspsrc.set_property('latency', 200)
            rtspsrc.set_property('ntp-time-source', 3)  # 3 usually stands for "automatic"

    pad_name = 'sink_%u' % stream_id
    streammux_sink_pad = streammux.get_request_pad(pad_name)
    bin.connect('pad-added', cb_newpad, streammux_sink_pad)
    bin.connect('child-added', decodebin_child_added, 0)

    # 스트림 ID에 따라 GETFPS 인스턴스 생성 및 저장
    fps_streams['stream{0}'.format(stream_id)] = GETFPS(stream_id)
    return bin


def bus_call(bus, message, user_data):
    loop = user_data
    t = message.type

    if t == Gst.MessageType.EOS:
        sys.stdout.write('DEBUG: EOS\n')
        loop.quit()
    elif t == Gst.MessageType.WARNING:
        err, debug = message.parse_warning()
        sys.stderr.write('WARNING: %s: %s\n' % (err, debug))
    elif t == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        sys.stderr.write('ERROR: %s: %s\n' % (err, debug))
        loop.quit()
    return True


def main():
    global ip, port, cctv_id, cctv_pw
    
    Gst.init(None)

    loop = GLib.MainLoop()

    pipeline = Gst.Pipeline()
    if not pipeline:
        sys.stderr.write('ERROR: Failed to create pipeline\n')
        sys.exit(1)

    streammux = Gst.ElementFactory.make('nvstreammux', 'nvstreammux')
    if not streammux:
        sys.stderr.write('ERROR: Failed to create nvstreammux\n')
        sys.exit(1)
    pipeline.add(streammux)

    tiler = Gst.ElementFactory.make('nvmultistreamtiler', 'nvtiler')
    if not tiler:
        sys.stderr.write('ERROR: Failed to create nvmultistreamtiler\n')
        sys.exit(1)
    tiler.set_property('rows', ROWS)
    tiler.set_property('columns', COLUMNS)
    tiler.set_property('width', STREAMMUX_WIDTH / ROWS)
    tiler.set_property('height', STREAMMUX_HEIGHT / COLUMNS)
    pipeline.add(tiler)

    pgie = Gst.ElementFactory.make('nvinfer', 'pgie')
    if not pgie:
        sys.stderr.write('ERROR: Failed to create nvinfer\n')
        sys.exit(1)

    converter = Gst.ElementFactory.make('nvvideoconvert', 'nvvideoconvert')
    if not converter:
        sys.stderr.write('ERROR: Failed to create nvvideoconvert\n')
        sys.exit(1)

    osd = Gst.ElementFactory.make('nvdsosd', 'nvdsosd')
    if not osd:
        sys.stderr.write('ERROR: Failed to create nvdsosd\n')
        sys.exit(1)

    nvvidconv_postosd = Gst.ElementFactory.make("nvvideoconvert", "convertor_postosd")

    caps = Gst.ElementFactory.make("capsfilter", "filter")
    caps.set_property(
        "caps", Gst.Caps.from_string(f"video/x-raw(memory:NVMM), format=NV12, width=(int)1920, height=(int)1080")
    )

    # Make the EGL sink
    sink = Gst.ElementFactory.make("nveglglessink", "nveglglessink")
    if not sink:
        sys.stderr.write(" Unable to create nveglglessink")

    sink.set_property("sync", False)

    streammux.set_property('batch-size', STREAMMUX_BATCH_SIZE)
    streammux.set_property('batched-push-timeout', 25000)
    streammux.set_property('width', STREAMMUX_WIDTH)
    streammux.set_property('height', STREAMMUX_HEIGHT)
    streammux.set_property('enable-padding', 0)
    streammux.set_property('live-source', True)
    streammux.set_property('attach-sys-ts', True)
    pgie.set_property('config-file-path', CONFIG_INFER)

    osd.set_property('process-mode', int(pyds.MODE_GPU))
    osd.set_property('display-bbox', 1)
    osd.set_property('display-text', 0)

    pipeline.add(pgie)
    pipeline.add(converter)
    pipeline.add(osd)
    pipeline.add(nvvidconv_postosd)
    pipeline.add(caps)
    pipeline.add(sink)

    streammux.link(tiler)
    tiler.link(pgie)
    pgie.link(converter)
    converter.link(osd)
    osd.link(nvvidconv_postosd)
    nvvidconv_postosd.link(caps)
    caps.link(sink)

    # Create sources as before, using create_uridecode_bin function
    
    sources = [None, None, None, None]
    for idx in range(4):
        sources[idx] = 'rtsp://' + str(cctv_id[idx]) + ':' + str(cctv_pw[idx]) + '@' + \
                     str(ip[idx]) + ':' + str(port[idx]) + '/profile2/media.smp'

    for idx, source in enumerate(sources):
        source_bin = create_uridecode_bin(idx, source, streammux)
        pipeline.add(source_bin)
        source_bin.sync_state_with_parent()

    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect('message', bus_call, loop)

    # Connect custom probe function to pgie src pad
    pgie_src_pad = pgie.get_static_pad('src')
    if not pgie_src_pad:
        sys.stderr.write('ERROR: Failed to get pgie src pad\n')
        sys.exit(1)
    else:
        pgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, tracker_src_pad_buffer_probe, 0)

    pipeline.set_state(Gst.State.PLAYING)

    try:
        loop.run()
    except Exception as e:
        print(e)
        pass

    pipeline.set_state(Gst.State.NULL)

if using VLC or ffmpeg plays the rtsp stream generated by RTSPServer, is the vlc video fine?

Hello. Thank you for your response.
Firstly, regarding my previous question, I used GstRTSPStream to create an RTSP stream by referring to the Python binding example code.
The problem is that when using ffmpeg or streaming on the web, the connection gets disconnected approximately every 5-10 minutes.
In the case of ffmpeg, the logs show that it encounters an EOF (End of File) and terminates. I would like to resolve this issue.

we need to narrow down this issue.

  1. noticing you are using GstRtspServer to set up a RTSP server, if using VLC(or ffplay)plays this rtsp stream, is the VLC video smooth for about 3 minutes?
  2. if the video in the step1 is not smooth, did some camera reconnect during playing? can you use this deepstream_test1_rtsp_in_rtsp_out.py to reproduce this issue?
  3. if the video in the step1 is smooth, noticing you will use ffmpeg to convert the RTSP streams to MPEG-DASH for live streaming on the web, maybe ffmpeg command-line needs to be adjusted.

There is no update from you for a period, assuming this is not an issue anymore. Hence we are closing this topic. If need further support, please open a new one. Thanks

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.