Issue with nvjpegdec decoder and MJPEG encoded RTSP stream

Hello everyone!
I’m trying to build a pipeline which takes multiple rtsp streams as an input and outputs the results as a video file (later switching it to an output rtsp stream).
The input stream is coming from an ESP32 microcontroller, which encodes the stream with MJPEG.

Right now I’m getting the following error message:

Creating Pipeline 
 
Creating streamux 
 
Creating source_bin  0  
 
Creating source bin
source-bin-00
Creating Pgie 
 
Creating nvtracker 
 
Creating nvdsanalytics 
 
Creating tiler 
 
Creating nvvidconv 
 
Creating nvosd 
 
Creating H264 Encoder
Atleast one of the sources is live
Adding elements to Pipeline 

Linking elements in the Pipeline 

Now playing...
1 :  rtsp://192.168.1.245:8554/mjpeg/1
Starting pipeline 

Opening in BLOCKING MODE 
0:00:00.275678113 12409     0x380bc360 WARN                    v4l2 gstv4l2object.c:2388:gst_v4l2_object_add_interlace_mode:0x38082c50 Failed to determine interlace mode
0:00:00.275759729 12409     0x380bc360 WARN                    v4l2 gstv4l2object.c:2388:gst_v4l2_object_add_interlace_mode:0x38082c50 Failed to determine interlace mode
0:00:00.275800616 12409     0x380bc360 WARN                    v4l2 gstv4l2object.c:2388:gst_v4l2_object_add_interlace_mode:0x38082c50 Failed to determine interlace mode
0:00:00.275844731 12409     0x380bc360 WARN                    v4l2 gstv4l2object.c:2388:gst_v4l2_object_add_interlace_mode:0x38082c50 Failed to determine interlace mode
0:00:00.275938118 12409     0x380bc360 WARN                    v4l2 gstv4l2object.c:4476:gst_v4l2_object_probe_caps:<encoder:src> Failed to probe pixel aspect ratio with VIDIOC_CROPCAP: Unknown error -1
gstnvtracker: Loading low-level lib at /opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so
gstnvtracker: Batch processing is ON
gstnvtracker: Past frame output is OFF
[NvMultiObjectTracker] Initialized
0:00:00.388030031 12409     0x380bc360 WARN                 nvinfer gstnvinfer.cpp:635:gst_nvinfer_logger:<primary-inference> NvDsInferContext[UID 1]: Warning from NvDsInferContextImpl::initialize() <nvdsinfer_context_impl.cpp:1161> [UID = 1]: Warning, OpenCV has been deprecated. Using NMS for clustering instead of cv::groupRectangles with topK = 20 and NMS Threshold = 0.5
0:00:05.814812890 12409     0x380bc360 INFO                 nvinfer gstnvinfer.cpp:638:gst_nvinfer_logger:<primary-inference> NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::deserializeEngineAndBackend() <nvdsinfer_context_impl.cpp:1900> [UID = 1]: deserialized trt engine from :/opt/nvidia/deepstream/deepstream-6.0/samples/models/Primary_Detector/resnet10.caffemodel_b1_gpu0_fp16.engine
INFO: [Implicit Engine Info]: layers num: 3
0   INPUT  kFLOAT input_1         3x368x640       
1   OUTPUT kFLOAT conv2d_bbox     16x23x40        
2   OUTPUT kFLOAT conv2d_cov/Sigmoid 4x23x40         

0:00:05.816895687 12409     0x380bc360 INFO                 nvinfer gstnvinfer.cpp:638:gst_nvinfer_logger:<primary-inference> NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::generateBackendContext() <nvdsinfer_context_impl.cpp:2004> [UID = 1]: Use deserialized engine model: /opt/nvidia/deepstream/deepstream-6.0/samples/models/Primary_Detector/resnet10.caffemodel_b1_gpu0_fp16.engine
0:00:05.825741831 12409     0x380bc360 INFO                 nvinfer gstnvinfer_impl.cpp:313:notifyLoadModelStatus:<primary-inference> [UID 1]: Load new model:config_pgie.txt sucessfully
Decodebin child added: source 

0:00:09.037875395 12409     0x3777af70 FIXME                default gstutils.c:3981:gst_pad_create_stream_id_internal:<fakesrc0:src> Creating random stream-id, consider implementing a deterministic way of creating a stream-id
Decodebin child added: decodebin0 

Decodebin child added: rtpjpegdepay0 

Decodebin child added: nvjpegdec0 

0:00:12.096881109 12409     0x3777b8f0 FIXME           videodecoder gstvideodecoder.c:933:gst_video_decoder_drain_out:<nvjpegdec0> Sub-class should implement drain()
0:00:12.097226740 12409     0x3777b8f0 FIXME           videodecoder gstvideodecoder.c:933:gst_video_decoder_drain_out:<nvjpegdec0> Sub-class should implement drain()
0:00:12.097772999 12409     0x3777b8f0 FIXME           videodecoder gstvideodecoder.c:933:gst_video_decoder_drain_out:<nvjpegdec0> Sub-class should implement drain()
In cb_newpad

gstname= video/x-raw
features= <Gst.CapsFeatures object at 0x7f7d170b28 (GstCapsFeatures at 0x7eec064760)>
0:00:12.102154847 12409     0x3777b8f0 WARN                GST_PADS gstpad.c:4226:gst_pad_peer_query:<nvjpegdec0:src> could not send sticky events
0:00:12.102552927 12409     0x3777b8f0 WARN                GST_PADS gstpad.c:4226:gst_pad_peer_query:<nvjpegdec0:src> could not send sticky events
0:00:12.168172253 12409     0x377580f0 WARN                 basesrc gstbasesrc.c:3055:gst_base_src_loop:<udpsrc1> error: Internal data stream error.
0:00:12.168260432 12409     0x377580f0 WARN                 basesrc gstbasesrc.c:3055:gst_base_src_loop:<udpsrc1> error: streaming stopped, reason not-negotiated (-4)
Error: gst-stream-error-quark: Internal data stream error. (1): gstbasesrc.c(3055): gst_base_src_loop (): /GstPipeline:pipeline0/GstBin:source-bin-00/GstURIDecodeBin:uri-decode-bin/GstRTSPSrc:source/GstUDPSrc:udpsrc1:
streaming stopped, reason not-negotiated (-4)
Exiting app

[NvMultiObjectTracker] De-initialized
0:00:12.200167345 12409     0x37755cf0 WARN                 rtspsrc gstrtspsrc.c:5919:gst_rtsp_src_receive_response:<source> receive interrupted
0:00:12.200216564 12409     0x37755cf0 WARN                 rtspsrc gstrtspsrc.c:8246:gst_rtspsrc_pause:<source> PAUSE interrupted

What I can get from this is, that a uridecodebin is created correctly with the nvjpegdec decoder. But somehow the stream is being opened and immediately closed with an error message.
Does anyone know what the issue could be?

This is the python code:

#!/usr/bin/env python3

################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2020-2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import argparse
import sys

sys.path.append('../')
import gi
import configparser

gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
from gi.repository import GLib
from ctypes import *
import time
import sys
import math
import platform
from common.is_aarch_64 import is_aarch64
from common.bus_call import bus_call
from common.FPS import GETFPS
from kafka import KafkaProducer
from json import dumps

import pyds

fps_streams = {}

MAX_DISPLAY_LEN = 64
PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3
MUXER_OUTPUT_WIDTH = 1920
MUXER_OUTPUT_HEIGHT = 1080
MUXER_BATCH_TIMEOUT_USEC = 4000000
TILED_OUTPUT_WIDTH = 1920*2
TILED_OUTPUT_HEIGHT = 1080
GST_CAPS_FEATURES_NVMM = "memory:NVMM"
OSD_PROCESS_MODE = 0
OSD_DISPLAY_TEXT = 1
pgie_classes_str = ["Vehicle", "TwoWheeler", "Person", "RoadSign"]


# nvanlytics_src_pad_buffer_probe  will extract metadata received on nvtiler sink pad
# and update params for drawing rectangle, object information etc.
def nvanalytics_src_pad_buffer_probe(pad, info, u_data):
    frame_number = 0
    num_rects = 0
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return

    # Retrieve batch metadata from the gst_buffer
    # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
    # C address of gst_buffer as input, which is obtained with hash(gst_buffer)
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list

    while l_frame:
        try:
            # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
            # The casting is done by pyds.NvDsFrameMeta.cast()
            # The casting also keeps ownership of the underlying memory
            # in the C code, so the Python garbage collector will leave
            # it alone.
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        frame_number = frame_meta.frame_num
        l_obj = frame_meta.obj_meta_list
        num_rects = frame_meta.num_obj_meta
        obj_counter = {
            PGIE_CLASS_ID_VEHICLE: 0,
            PGIE_CLASS_ID_PERSON: 0,
            PGIE_CLASS_ID_BICYCLE: 0,
            PGIE_CLASS_ID_ROADSIGN: 0
        }
        print("#" * 50)
        while l_obj:
            try:
                # Note that l_obj.data needs a cast to pyds.NvDsObjectMeta
                # The casting is done by pyds.NvDsObjectMeta.cast()
                obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
            except StopIteration:
                break
            obj_counter[obj_meta.class_id] += 1
            l_user_meta = obj_meta.obj_user_meta_list
            # Extract object level meta data from NvDsAnalyticsObjInfo
            while l_user_meta:
                try:
                    user_meta = pyds.NvDsUserMeta.cast(l_user_meta.data)
                    if user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type("NVIDIA.DSANALYTICSOBJ.USER_META"):
                        user_meta_data = pyds.NvDsAnalyticsObjInfo.cast(user_meta.user_meta_data)
                        if user_meta_data.dirStatus: print(
                            "Object {0} moving in direction: {1}".format(obj_meta.object_id, user_meta_data.dirStatus))
                        if user_meta_data.lcStatus: print(
                            "Object {0} line crossing status: {1}".format(obj_meta.object_id, user_meta_data.lcStatus))
                        if user_meta_data.ocStatus: print(
                            "Object {0} overcrowding status: {1}".format(obj_meta.object_id, user_meta_data.ocStatus))
                        if user_meta_data.roiStatus:
                            print("Object {0} roi status: {1}".format(obj_meta.object_id, user_meta_data.roiStatus))
                except StopIteration:
                    break

                try:
                    l_user_meta = l_user_meta.next
                except StopIteration:
                    break
            try:
                l_obj = l_obj.next
            except StopIteration:
                break

        # Get meta data from NvDsAnalyticsFrameMeta
        l_user = frame_meta.frame_user_meta_list
        while l_user:
            try:
                user_meta = pyds.NvDsUserMeta.cast(l_user.data)
                if user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type("NVIDIA.DSANALYTICSFRAME.USER_META"):
                    user_meta_data = pyds.NvDsAnalyticsFrameMeta.cast(user_meta.user_meta_data)
                    if user_meta_data.objInROIcnt:
                        print("Objs in ROI: {0}".format(user_meta_data.objInROIcnt))
                        data = "Objs in ROI: {0}".format(user_meta_data.objInROIcnt)
                        producer.send('quickstart-events', value=data)
                    if user_meta_data.objLCCumCnt:
                        print("Linecrossing Cumulative: {0}".format(user_meta_data.objLCCumCnt))
                        data = "Linecrossing Cumulative: {0}".format(user_meta_data.objLCCumCnt)
                        producer.send('quickstart-events', value=data)
                    if user_meta_data.objLCCurrCnt:
                        print("Linecrossing Current Frame: {0}".format(user_meta_data.objLCCurrCnt))
                        data = "Linecrossing Current Frame: {0}".format(user_meta_data.objLCCurrCnt)
                        producer.send('quickstart-events', value=data)
                    if user_meta_data.ocStatus:
                        print("Overcrowding status: {0}".format(user_meta_data.ocStatus))
                        data = "Overcrowding status: {0}".format(user_meta_data.ocStatus)
                        producer.send('quickstart-events', value=data)
            except StopIteration:
                break
            try:
                l_user = l_user.next
            except StopIteration:
                break

        print("Frame Number=", frame_number, "stream id=", frame_meta.pad_index, "Number of Objects=", num_rects,
              "Person_count=", obj_counter[PGIE_CLASS_ID_PERSON])
        # Get frame rate through this probe
        fps_streams["stream{0}".format(frame_meta.pad_index)].get_fps()
        try:
            l_frame = l_frame.next
        except StopIteration:
            break
        print("#" * 50)

    return Gst.PadProbeReturn.OK


def cb_newpad(decodebin, decoder_src_pad, data):
    print("In cb_newpad\n")
    caps = decoder_src_pad.get_current_caps()
    gststruct = caps.get_structure(0)
    gstname = gststruct.get_name()
    source_bin = data
    features = caps.get_features(0)

    # Need to check if the pad created by the decodebin is for video and not
    # audio.
    print("gstname=", gstname)
    if (gstname.find("video") != -1):
        # Link the decodebin pad only if decodebin has picked nvidia
        # decoder plugin nvdec_*. We do this by checking if the pad caps contain
        # NVMM memory features.
        print("features=", features)
        if features.contains("memory:NVMM"):
            # Get the source bin ghost pad
            bin_ghost_pad = source_bin.get_static_pad("src")
            if not bin_ghost_pad.set_target(decoder_src_pad):
                sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
        else:
            sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")


def decodebin_child_added(child_proxy, Object, name, user_data):
    print("Decodebin child added:", name, "\n")
    if (name.find("decodebin") != -1):
        Object.connect("child-added", decodebin_child_added, user_data)


def create_source_bin(index, uri):
    print("Creating source bin")

    # Create a source GstBin to abstract this bin's content from the rest of the
    # pipeline
    bin_name = "source-bin-%02d" % index
    print(bin_name)
    nbin = Gst.Bin.new(bin_name)
    if not nbin:
        sys.stderr.write(" Unable to create source bin \n")

    # Source element for reading from the uri.
    # We will use decodebin and let it figure out the container format of the
    # stream and the codec and plug the appropriate demux and decode plugins.
    uri_decode_bin = Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
    if not uri_decode_bin:
        sys.stderr.write(" Unable to create uri decode bin \n")
    # We set the input uri to the source element
    uri_decode_bin.set_property("uri", uri)
    # Connect to the "pad-added" signal of the decodebin which generates a
    # callback once a new pad for raw data has beed created by the decodebin
    uri_decode_bin.connect("pad-added", cb_newpad, nbin)
    uri_decode_bin.connect("child-added", decodebin_child_added, nbin)

    # We need to create a ghost pad for the source bin which will act as a proxy
    # for the video decoder src pad. The ghost pad will not have a target right
    # now. Once the decode bin creates the video decoder and generates the
    # cb_newpad callback, we will set the ghost pad target to the video decoder
    # src pad.
    Gst.Bin.add(nbin, uri_decode_bin)
    bin_pad = nbin.add_pad(Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC))
    if not bin_pad:
        sys.stderr.write(" Failed to add ghost pad in source bin \n")
        return None
    return nbin


def main(args):
    # Check input arguments
    if len(args) < 2:
        sys.stderr.write("usage: %s <uri1> [uri2] ... [uriN]\n" % args[0])
        sys.exit(1)

    for i in range(0, len(args) - 1):
        fps_streams["stream{0}".format(i)] = GETFPS(i)
    number_sources = len(args) - 1

    # Standard GStreamer initialization
    GObject.threads_init()
    Gst.init(None)

    # Create gstreamer elements */
    # Create Pipeline element that will form a connection of other elements
    print("Creating Pipeline \n ")
    pipeline = Gst.Pipeline()
    is_live = False

    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")
    print("Creating streamux \n ")

    # Create nvstreammux instance to form batches from one or more sources.
    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        sys.stderr.write(" Unable to create NvStreamMux \n")

    pipeline.add(streammux)
    for i in range(number_sources):
        print("Creating source_bin ", i, " \n ")
        uri_name = args[i + 1]
        if uri_name.find("rtsp://") == 0:
            is_live = True
        source_bin = create_source_bin(i, uri_name)
        if not source_bin:
            sys.stderr.write("Unable to create source bin \n")
        pipeline.add(source_bin)
        padname = "sink_%u" % i
        sinkpad = streammux.get_request_pad(padname)
        if not sinkpad:
            sys.stderr.write("Unable to create sink pad bin \n")
        srcpad = source_bin.get_static_pad("src")
        if not srcpad:
            sys.stderr.write("Unable to create src pad bin \n")
        srcpad.link(sinkpad)
    queue1 = Gst.ElementFactory.make("queue", "queue1")
    queue2 = Gst.ElementFactory.make("queue", "queue2")
    queue3 = Gst.ElementFactory.make("queue", "queue3")
    queue4 = Gst.ElementFactory.make("queue", "queue4")
    queue5 = Gst.ElementFactory.make("queue", "queue5")
    queue6 = Gst.ElementFactory.make("queue", "queue6")
    queue7 = Gst.ElementFactory.make("queue", "queue7")
    pipeline.add(queue1)
    pipeline.add(queue2)
    pipeline.add(queue3)
    pipeline.add(queue4)
    pipeline.add(queue5)
    pipeline.add(queue6)
    pipeline.add(queue7)

    print("Creating Pgie \n ")
    pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    if not pgie:
        sys.stderr.write(" Unable to create pgie \n")

    print("Creating nvtracker \n ")
    tracker = Gst.ElementFactory.make("nvtracker", "tracker")
    if not tracker:
        sys.stderr.write(" Unable to create tracker \n")

    print("Creating nvdsanalytics \n ")
    nvanalytics = Gst.ElementFactory.make("nvdsanalytics", "analytics")
    if not nvanalytics:
        sys.stderr.write(" Unable to create nvanalytics \n")
    nvanalytics.set_property("config-file", "config_nvdsanalytics.txt")

    print("Creating tiler \n ")
    tiler = Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
    if not tiler:
        sys.stderr.write(" Unable to create tiler \n")

    print("Creating nvvidconv \n ")
    nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
    if not nvvidconv:
        sys.stderr.write(" Unable to create nvvidconv \n")

    print("Creating nvosd \n ")
    nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
    if not nvosd:
        sys.stderr.write(" Unable to create nvosd \n")
    nvosd.set_property('process-mode', OSD_PROCESS_MODE)
    nvosd.set_property('display-text', OSD_DISPLAY_TEXT)

    # added for mp4-out

    nvvidconv_postosd = Gst.ElementFactory.make("nvvideoconvert", "convertor_postosd")
    if not nvvidconv_postosd:
        sys.stderr.write(" Unable to create nvvidconv_postosd \n")

    # Create a caps filter
    caps = Gst.ElementFactory.make("capsfilter", "filter")
    caps.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=I420"))

    # Make the encoder
    if codec == "H264":
        encoder = Gst.ElementFactory.make("nvv4l2h264enc", "encoder")
        print("Creating H264 Encoder")
    elif codec == "H265":
        encoder = Gst.ElementFactory.make("nvv4l2h265enc", "encoder")
        print("Creating H265 Encoder")
    if not encoder:
        sys.stderr.write(" Unable to create encoder")
    encoder.set_property('bitrate', bitrate)
    if is_aarch64():
        encoder.set_property('preset-level', 1)
        encoder.set_property('insert-sps-pps', 1)
        encoder.set_property('bufapi-version', 1)

    codecparse = Gst.ElementFactory.make("h264parse", "h264_parse")
    if not codecparse:
        sys.stderr.write(" Unable to create codecparse \n")

    mux = Gst.ElementFactory.make("mp4mux", "mux")
    if not mux:
        sys.stderr.write(" Unable to create mux \n")

    sink = Gst.ElementFactory.make("filesink", "filesink")
    # sink = Gst.ElementFactory.make("fakesink", "fakesink")
    if not sink:
        sys.stderr.write(" Unable to create filesink \n")
    sink.set_property('location', output_path)
    sink.set_property("sync", 0)
    sink.set_property("async", 1)
    #################

    if is_live:
        print("Atleast one of the sources is live")
        streammux.set_property('live-source', 1)

    streammux.set_property('width', 1920)
    streammux.set_property('height', 1080)
    streammux.set_property('batch-size', number_sources)
    streammux.set_property('batched-push-timeout', 4000000)
    pgie.set_property('config-file-path', "config_pgie.txt")
    pgie_batch_size = pgie.get_property("batch-size")
    if (pgie_batch_size != number_sources):
        print("WARNING: Overriding infer-config batch-size", pgie_batch_size, " with number of sources ",
              number_sources, " \n")
        pgie.set_property("batch-size", number_sources)
    tiler_rows = int(math.sqrt(number_sources))
    tiler_columns = int(math.ceil((1.0 * number_sources) / tiler_rows))
    tiler.set_property("rows", tiler_rows)
    tiler.set_property("columns", tiler_columns)
    tiler.set_property("width", TILED_OUTPUT_WIDTH)
    tiler.set_property("height", TILED_OUTPUT_HEIGHT)
    sink.set_property("qos", 0)

    # Set properties of tracker
    config = configparser.ConfigParser()
    config.read('config_tracker.txt')
    config.sections()

    for key in config['tracker']:
        if key == 'tracker-width':
            tracker_width = config.getint('tracker', key)
            tracker.set_property('tracker-width', tracker_width)
        if key == 'tracker-height':
            tracker_height = config.getint('tracker', key)
            tracker.set_property('tracker-height', tracker_height)
        if key == 'gpu-id':
            tracker_gpu_id = config.getint('tracker', key)
            tracker.set_property('gpu_id', tracker_gpu_id)
        if key == 'll-lib-file':
            tracker_ll_lib_file = config.get('tracker', key)
            tracker.set_property('ll-lib-file', tracker_ll_lib_file)
        if key == 'll-config-file':
            tracker_ll_config_file = config.get('tracker', key)
            tracker.set_property('ll-config-file', tracker_ll_config_file)
        if key == 'enable-batch-process':
            tracker_enable_batch_process = config.getint('tracker', key)
            tracker.set_property('enable_batch_process', tracker_enable_batch_process)
        if key == 'enable-past-frame':
            tracker_enable_past_frame = config.getint('tracker', key)
            tracker.set_property('enable_past_frame', tracker_enable_past_frame)

    print("Adding elements to Pipeline \n")
    pipeline.add(pgie)
    pipeline.add(tracker)
    pipeline.add(nvanalytics)
    pipeline.add(tiler)
    pipeline.add(nvvidconv)
    pipeline.add(nvosd)
    pipeline.add(nvvidconv_postosd)
    pipeline.add(caps)
    pipeline.add(encoder)
    pipeline.add(codecparse)
    pipeline.add(mux)
    pipeline.add(sink)

    # We link elements in the following order:
    # sourcebin -> streammux -> nvinfer -> nvtracker -> nvdsanalytics ->
    # nvtiler -> nvvideoconvert -> nvdsosd -> sink
    print("Linking elements in the Pipeline \n")
    streammux.link(queue1)
    queue1.link(pgie)
    pgie.link(queue2)
    queue2.link(tracker)
    tracker.link(queue3)
    queue3.link(nvanalytics)
    nvanalytics.link(queue4)
    queue4.link(tiler)
    tiler.link(queue5)
    queue5.link(nvvidconv)
    nvvidconv.link(queue6)
    queue6.link(nvosd)
    nvosd.link(queue7)
    queue7.link(nvvidconv_postosd)
    nvvidconv_postosd.link(caps)
    caps.link(encoder)
    queue7.link(encoder)
    encoder.link(codecparse)
    codecparse.link(mux)
    mux.link(sink)

    # create an event loop and feed gstreamer bus mesages to it
    loop = GObject.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect("message", bus_call, loop)
    nvanalytics_src_pad = nvanalytics.get_static_pad("src")
    if not nvanalytics_src_pad:
        sys.stderr.write(" Unable to get src pad \n")
    else:
        nvanalytics_src_pad.add_probe(Gst.PadProbeType.BUFFER, nvanalytics_src_pad_buffer_probe, 0)

    # List the sources
    print("Now playing...")
    for i, source in enumerate(args):
        if (i != 0):
            print(i, ": ", source)

    print("Starting pipeline \n")
    # start play back and listed to events
    pipeline.set_state(Gst.State.PLAYING)
    try:
        loop.run()
    except:
        pass
    # cleanup
    print("Exiting app\n")
    pipeline.set_state(Gst.State.NULL)


if __name__ == '__main__':
    global codec
    codec = "H264"
    global bitrate
    bitrate = 4000000
    global output_path
    output_path = "out.mp4"

    # set Kafka producer and encode messages in json
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8'))

    sys.exit(main(sys.argv))

In addition, this is the error message with the GST_DEBUG=4 flag:

GST_DEBUG_4_output.txt (302.5 KB)

• Hardware Platform (Jetson / GPU)
Jetson Nano
• DeepStream Version
Deepstream 6.0
• JetPack Version (valid for Jetson only)
4.6
• TensorRT Version
8.2.1-1

Unfortunately I still couldn’t get it to work. I ran the following command before running my code:

export GST_DEBUG=uridecodebin*:5

And this is the error log of the uridecodebin:

python3 main.py rtsp://192.168.1.115:8554/mjpeg/1
Creating Pipeline 
 
Creating streamux 
 
Creating source_bin  0  
 
Creating source bin
source-bin-00
Creating Pgie 
 
Creating nvtracker 
 
Creating nvdsanalytics 
 
Creating tiler 
 
Creating nvvidconv 
 
Creating nvosd 
 
Creating H264 Encoder
Atleast one of the sources is live
Adding elements to Pipeline 

Linking elements in the Pipeline 

Now playing...
1 :  rtsp://192.168.1.115:8554/mjpeg/1
Starting pipeline 

Opening in BLOCKING MODE 
gstnvtracker: Loading low-level lib at /opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so
gstnvtracker: Batch processing is ON
gstnvtracker: Past frame output is OFF
[NvMultiObjectTracker] Initialized
0:00:00.396243782 15540     0x235eb550 WARN                 nvinfer gstnvinfer.cpp:635:gst_nvinfer_logger:<primary-inference> NvDsInferContext[UID 1]: Warning from NvDsInferContextImpl::initialize() <nvdsinfer_context_impl.cpp:1161> [UID = 1]: Warning, OpenCV has been deprecated. Using NMS for clustering instead of cv::groupRectangles with topK = 20 and NMS Threshold = 0.5
0:00:05.671450549 15540     0x235eb550 INFO                 nvinfer gstnvinfer.cpp:638:gst_nvinfer_logger:<primary-inference> NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::deserializeEngineAndBackend() <nvdsinfer_context_impl.cpp:1900> [UID = 1]: deserialized trt engine from :/opt/nvidia/deepstream/deepstream-6.0/samples/models/Primary_Detector/resnet10.caffemodel_b1_gpu0_fp16.engine
INFO: [Implicit Engine Info]: layers num: 3
0   INPUT  kFLOAT input_1         3x368x640       
1   OUTPUT kFLOAT conv2d_bbox     16x23x40        
2   OUTPUT kFLOAT conv2d_cov/Sigmoid 4x23x40         

0:00:05.672692708 15540     0x235eb550 INFO                 nvinfer gstnvinfer.cpp:638:gst_nvinfer_logger:<primary-inference> NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::generateBackendContext() <nvdsinfer_context_impl.cpp:2004> [UID = 1]: Use deserialized engine model: /opt/nvidia/deepstream/deepstream-6.0/samples/models/Primary_Detector/resnet10.caffemodel_b1_gpu0_fp16.engine
0:00:05.681737927 15540     0x235eb550 INFO                 nvinfer gstnvinfer_impl.cpp:313:notifyLoadModelStatus:<primary-inference> [UID 1]: Load new model:config_pgie.txt sucessfully
0:00:05.682089600 15540     0x235eb550 DEBUG           uridecodebin gsturidecodebin.c:2811:gst_uri_decode_bin_change_state: ready to paused
0:00:05.682124392 15540     0x235eb550 DEBUG           uridecodebin gsturidecodebin.c:2184:setup_source:<uri-decode-bin> setup source
0:00:05.688771963 15540     0x235eb550 DEBUG           uridecodebin gsturidecodebin.c:1362:gen_source_element:<uri-decode-bin> setting connection-speed=0 to source element
Decodebin child added: source 

0:00:05.689619791 15540     0x235eb550 DEBUG           uridecodebin gsturidecodebin.c:2238:setup_source:<uri-decode-bin> Source has dynamic output pads
0:00:05.689849483 15540     0x235eb550 DEBUG           uridecodebin gsturidecodebin.c:899:do_async_done:<uri-decode-bin> posting ASYNC_DONE
0:00:11.902192841 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:2110:source_new_pad:<uri-decode-bin> Found new pad source.recv_rtp_src_0_335117927_26 in source element source
0:00:11.902804050 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:1444:has_all_raw_caps:<source:recv_rtp_src_0_335117927_26> have caps application/x-rtp, media=(string)video, payload=(int)26, clock-rate=(int)90000, npt-start=(guint64)0, play-speed=(double)1, play-scale=(double)1, ssrc=(uint)335117927
Decodebin child added: decodebin0 

0:00:11.906754540 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:2135:source_new_pad:<uri-decode-bin> linked decoder to new pad
0:00:11.907222569 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:1013:source_no_more_pads:<uri-decode-bin> No more pads in source element source.
0:00:11.907354811 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:946:no_more_pads_full:<source> no more pads, 2 pending
0:00:11.907954666 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:1695:proxy_autoplug_continue_signal:<uri-decode-bin> autoplug-continue returned 1
0:00:11.908086075 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:336:gst_uri_decode_bin_autoplug_factories:<uri-decode-bin> finding factories
0:00:11.916104765 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:358:gst_uri_decode_bin_autoplug_factories:<uri-decode-bin> autoplug-factories returns 0x7ee404fa20
0:00:11.916169714 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:1710:proxy_autoplug_factories_signal:<uri-decode-bin> autoplug-factories returned 0x7ee404fa80
0:00:11.916215131 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:1725:proxy_autoplug_sort_signal:<uri-decode-bin> autoplug-sort returned (nil)
0:00:11.916387166 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:374:gst_uri_decode_bin_autoplug_select:<uri-decode-bin> default autoplug-select returns TRY
0:00:11.916424927 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:1740:proxy_autoplug_select_signal:<uri-decode-bin> autoplug-select returned 0
Decodebin child added: rtpjpegdepay0 

0:00:11.917871360 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:1695:proxy_autoplug_continue_signal:<uri-decode-bin> autoplug-continue returned 1
0:00:11.918363765 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:1695:proxy_autoplug_continue_signal:<uri-decode-bin> autoplug-continue returned 1
0:00:11.918413089 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:336:gst_uri_decode_bin_autoplug_factories:<uri-decode-bin> finding factories
0:00:11.919003881 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:358:gst_uri_decode_bin_autoplug_factories:<uri-decode-bin> autoplug-factories returns 0x7ee4053ac0
0:00:11.919046277 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:1710:proxy_autoplug_factories_signal:<uri-decode-bin> autoplug-factories returned 0x7ee4053a40
0:00:11.919080966 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:1725:proxy_autoplug_sort_signal:<uri-decode-bin> autoplug-sort returned (nil)
0:00:11.919205239 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:374:gst_uri_decode_bin_autoplug_select:<uri-decode-bin> default autoplug-select returns TRY
0:00:11.919235760 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:1740:proxy_autoplug_select_signal:<uri-decode-bin> autoplug-select returned 0
Decodebin child added: nvjpegdec0 

0:00:11.922672074 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:1755:proxy_autoplug_query_signal:<uri-decode-bin> autoplug-query returned 0
0:00:11.922934370 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:1755:proxy_autoplug_query_signal:<uri-decode-bin> autoplug-query returned 0
0:00:11.926403080 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:1755:proxy_autoplug_query_signal:<uri-decode-bin> autoplug-query returned 0
0:00:11.927002987 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:1140:new_decoded_pad_added_cb:<decodebin0> new decoded pad, name: <src_0>
0:00:11.927524819 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:1124:copy_sticky_events:<'':src_0> store sticky event stream-start event: 0x7ee40049f0, time 99:99:99.999999999, seq-num 292, GstEventStreamStart, stream-id=(string)b9049c323800fa1dbf0c9c2f5d6dcf0e63b50fc2c5030d1c14e44a893d14e333/video:0:0:RTP:AVP:26, flags=(GstStreamFlags)GST_STREAM_FLAG_NONE;
0:00:11.927636957 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:1124:copy_sticky_events:<'':src_0> store sticky event caps event: 0x7ee4059c90, time 99:99:99.999999999, seq-num 334, GstEventCaps, caps=(GstCaps)"video/x-raw\(memory:NVMM\)\,\ format\=\(string\)I420\,\ width\=\(int\)640\,\ height\=\(int\)480\,\ interlace-mode\=\(string\)progressive\,\ multiview-mode\=\(string\)mono\,\ multiview-flags\=\(GstVideoMultiviewFlagsSet\)0:ffffffff:/right-view-first/left-flipped/left-flopped/right-flipped/right-flopped/half-aspect/mixed-mono\,\ pixel-aspect-ratio\=\(fraction\)1/1\,\ chroma-site\=\(string\)jpeg\,\ colorimetry\=\(string\)1:4:0:0\,\ framerate\=\(fraction\)0/1";
In cb_newpad

gstname= video/x-raw
features= <Gst.CapsFeatures object at 0x7f9ae5bb28 (GstCapsFeatures at 0x7ee407f500)>
0:00:11.929227299 15540     0x227de8f0 DEBUG           uridecodebin gsturidecodebin.c:946:no_more_pads_full:<decodebin0> no more pads, 1 pending
Error: gst-stream-error-quark: Internal data stream error. (1): gstbasesrc.c(3055): gst_base_src_loop (): /GstPipeline:pipeline0/GstBin:source-bin-00/GstURIDecodeBin:uri-decode-bin/GstRTSPSrc:source/GstUDPSrc:udpsrc0:
streaming stopped, reason not-negotiated (-4)
Exiting app

[NvMultiObjectTracker] De-initialized
0:00:12.144362525 15540     0x235eb550 DEBUG           uridecodebin gsturidecodebin.c:1206:pad_removed_cb:<decodebin0> pad removed name: <decodebin0:src_0>
0:00:12.145265719 15540     0x235eb550 DEBUG           uridecodebin gsturidecodebin.c:2837:gst_uri_decode_bin_change_state: paused to ready
0:00:12.145310772 15540     0x235eb550 DEBUG           uridecodebin gsturidecodebin.c:1632:remove_decoders:<uri-decode-bin> removing old decoder element
0:00:12.145486244 15540     0x235eb550 DEBUG           uridecodebin gsturidecodebin.c:2068:remove_source:<uri-decode-bin> removing old src element
0:00:32.182319585 15540     0x235eb550 DEBUG           uridecodebin gsturidecodebin.c:2846:gst_uri_decode_bin_change_state: ready to null
0:00:32.185025416 15540     0x235eb550 DEBUG           uridecodebin gsturidecodebin.c:1206:pad_removed_cb:<decodebin0> pad removed name: <decodebin0:sink>

Are you using deepstream-app? Please provide your deepstream-app configuration file.

Hey Fiona.Chen,

Yes, I’m using the DeepStream SDK in combination with the python bindings.
Right now the app takes RTSP streams or files as input and creates a H264 encoded output file.

The goal is to have multiple RTSP streams coming from ESP32 microcontrollers as an input but I get the errors I mentioned above.

In the meanwhile I figured out something strange. If I run the following get-launch command, everything works fine:

gst-launch-1.0 rtspsrc location=rtsp://192.168.1.115:8554/mjpeg/1 ! queue ! rtpjpegdepay ! nvjpegdec ! matroskamux ! filesink location=received_h264.mkv

This command takes the input from the ESP32 and writes it to a video file that I can play.

I have attached the code for my deepstream app here:

deepstream_app.zip (11.4 KB)

Hey, any success with this issue? I also have no luck to get working decoding MJPEG rtsp stream with DeepStream on Jetson Nano.

I have similar problem with my pipeline. I get nvv4l2decoder0:src could not send sticky events