Artifacts in RTSP Stream with nvurisrcbin in DeepStreamSDK 7.0

Thanks @fanzh
finally with this streammux properties, the artifacts issue is resolved.

[property]

algorithm-type=1

batch-size=30

max-fps-control=0

overall-max-fps-n=25

overall-max-fps-d=1

overall-min-fps-n=25

overall-min-fps-d=1

max-same-source-frames=1

Hi @fanzh there is an issue
When I use file_loop as True and use nvurisrcbin, with only MP4 files, the MP4 files decode slowly or fps is reduced comparing to running on uridecodebin. And whenever rtsp channels get into the mix.
All the channels are giving normal frames to triton.

Thanks for the sharing! seems the origin problem is fixed… could you help to open a topic to focus on the new issue?

No @fanzh
the original issue is not resolved. Everything is related.

It is happening when file-loop property is turned on and using mixture of rtsp and mp4 file as input


I am getting, smooth rtsp videos if there are only rtsp channels in the input.
But whenever I adds a mp4 file, artifacts in the rtsp out is happening. At the same time it mp4 file stutters as well.

Also this artifacts issue is happening only when I use nvurisrcbin, and not happening while I use uridecodebin

I can’t reproduce this issue with the modified deepstream-test3.py on DS7.0. Here are the details.
code: 1.diff (2.9 KB)
command-line: python3 deepstream_test_3.py -i rtsp://xxx/media/video1 file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4 --file-loop
media info: 1080p, 30fps.
result: no artifacts and mp4 stuttering after mp4 looping. log:log-0820.txt (9.8 KB)

I will help reproduce the issue. I will share all the code and configurations over here. and steps to reproduce it shortly

please note new streammux does not support scaling. the output of your RTSP and mp4 should have the same resolution. please refer to the doc.

Thats okey.

Hi @fanzh

Please find the code and configurations to replicate the issue

#!/usr/bin/env python3

################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2019-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import sys
sys.path.append('../')
from pathlib import Path
import gi
import configparser
import argparse
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
from ctypes import *
import time
import sys
import math
import platform
from common.is_aarch_64 import is_aarch64
from common.FPS import PERF_DATA

from ssd_parser import nvds_infer_parse_custom_tf_ssd, DetectionParam, NmsParam, BoxSizeParam

import os
import pyds

import pickle
import socket
from queue import Queue
import threading
import numpy as np

import logging
import traceback

import json

logging.basicConfig(
    filename='app.log',      # Specify the file name
    filemode='a',            # Append mode
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # Log format
    level=logging.INFO       # Set the logging level
)

# Get a logger instance
FILE_LOGGER = logging.getLogger('FILE_LOGGER')

no_display = False
silent = False
file_loop = True
perf_data = None

MAX_DISPLAY_LEN=64
PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3
MUXER_OUTPUT_WIDTH=1920
MUXER_OUTPUT_HEIGHT=1080
MUXER_BATCH_TIMEOUT_USEC = 33000
TILED_OUTPUT_WIDTH=1280
TILED_OUTPUT_HEIGHT=720
GST_CAPS_FEATURES_NVMM="memory:NVMM"
OSD_PROCESS_MODE= 0
OSD_DISPLAY_TEXT= 1
pgie_classes_str= ["Vehicle", "TwoWheeler", "Person","RoadSign"]


MAX_DISPLAY_LEN = 64
PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3
MUXER_OUTPUT_WIDTH = 1920
MUXER_OUTPUT_HEIGHT = 1080
MUXER_BATCH_TIMEOUT_USEC = 4000000
TILED_OUTPUT_WIDTH = 1280
TILED_OUTPUT_HEIGHT = 720
GST_CAPS_FEATURES_NVMM = "memory:NVMM"
OSD_PROCESS_MODE = 0
OSD_DISPLAY_TEXT = 0
pgie_classes_str = ["Vehicle", "TwoWheeler", "Person", "RoadSign"]


CLASS_NB = 91
ACCURACY_ALL_CLASS = 0.5
UNTRACKED_OBJECT_ID = 0xffffffffffffffff
IMAGE_HEIGHT = 1080
IMAGE_WIDTH = 1920
MIN_BOX_WIDTH = 32
MIN_BOX_HEIGHT = 32
TOP_K = 20
IOU_THRESHOLD = 0.3
OUTPUT_VIDEO_NAME = "./out.mp4"
MUXER_BATCH_TIMEOUT_USEC = 33000

MAX_NUM_SOURCES = 3



class TCPConnectionThread(threading.Thread):
    def __init__(self, host, port):
        super().__init__()
        self.host = host
        self.port = port
        self.queue = Queue(maxsize=100)
        self.stop_event = threading.Event()
        self.sock = None

    def connect(self):
        while not self.stop_event.is_set():
            try:
                self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.sock.connect((self.host, self.port))
                return True
            except Exception as e:
                print("From DS-7 pipeline- Error connecting:", e)
                time.sleep(1)  # Retry after 1 second

        return False

    def run(self):
        while not self.stop_event.is_set():
            if not self.sock:
                if not self.connect():
                    # If unable to connect, wait before retrying
                    time.sleep(1)
                    continue

            try:
                while not self.stop_event.is_set():
                    if not self.queue.empty():
                        data = self.queue.get()
                        # Pickle the data
                        serialized_data = pickle.dumps(data)
                        # Calculate the length of the data
                        data_length = len(serialized_data)
                        # Convert data length to bytes (4 bytes for an integer)
                        length_bytes = data_length.to_bytes(4, byteorder='big')
                        # Send the length of the data
                        self.sock.sendall(length_bytes)
                        # Send the data
                        self.sock.sendall(serialized_data)
                    else:
                        time.sleep(0.1)  # Sleep briefly to avoid busy waiting
            except Exception as e:
                print("Error sending data:", e)
                self.sock.close()
                self.sock = None

    def stop(self):
        self.stop_event.set()
        if self.sock:
            self.sock.close()

    def send_message(self, obj):
        try:
            self.queue.put(obj,block=False)
        except Exception as e:
            pass



def get_label_names_from_file(filepath):
    """ Read a label file and convert it to string list """
    labels = "connected components"
    return labels



def osd_sink_pad_buffer_probe(pad, info, u_data):
    frame_number = 0
    # Intiallizing object counter with 0.
    obj_counter = dict(enumerate([0] * CLASS_NB))
    num_rects = 0

    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return

    # Retrieve batch metadata from the gst_buffer
    # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
    # C address of gst_buffer as input, which is obtained with hash(gst_buffer)
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list
    while l_frame is not None:
        try:
            # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
            # The casting also keeps ownership of the underlying memory
            # in the C code, so the Python garbage collector will leave
            # it alone.
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        frame_number = frame_meta.frame_num
        num_rects = frame_meta.num_obj_meta
        l_obj = frame_meta.obj_meta_list
        while l_obj is not None:
            try:
                # Casting l_obj.data to pyds.NvDsObjectMeta
                obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
            except StopIteration:
                break
            obj_counter[obj_meta.class_id] += 1
            try:
                l_obj = l_obj.next
            except StopIteration:
                break

        # Acquiring a display meta object. The memory ownership remains in
        # the C code so downstream plugins can still access it. Otherwise
        # the garbage collector will claim it when this probe function exits.
        display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
        display_meta.num_labels = 1
        py_nvosd_text_params = display_meta.text_params[0]
        # Setting display text to be shown on screen
        # Note that the pyds module allocates a buffer for the string, and the
        # memory will not be claimed by the garbage collector.
        # Reading the display_text field here will return the C address of the
        # allocated string. Use pyds.get_string() to get the string content.
        id_dict = {
            val: index
            for index, val in enumerate(get_label_names_from_file("labels.txt"))
        }
        disp_string = "Frame Number={} Number of Objects={} Vehicle_count={} Person_count={}"
        py_nvosd_text_params.display_text = disp_string.format(
            frame_number,
            num_rects,
            obj_counter[id_dict["car"]],
            obj_counter[id_dict["person"]],
        )

        # Now set the offsets where the string should appear
        py_nvosd_text_params.x_offset = 10
        py_nvosd_text_params.y_offset = 12

        # Font , font-color and font-size
        py_nvosd_text_params.font_params.font_name = "Serif"
        py_nvosd_text_params.font_params.font_size = 10
        # set(red, green, blue, alpha); set to White
        py_nvosd_text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)

        # Text background color
        py_nvosd_text_params.set_bg_clr = 1
        # set(red, green, blue, alpha); set to Black
        py_nvosd_text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
        # Using pyds.get_string() to get display_text as string
        print(pyds.get_string(py_nvosd_text_params.display_text))
        pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
        try:
            l_frame = l_frame.next
        except StopIteration:
            break

    return Gst.PadProbeReturn.OK


def add_obj_meta_to_frame(frame_object, batch_meta, frame_meta, label_names):
    """ Inserts an object into the metadata """
    # this is a good place to insert objects into the metadata.
    # Here's an example of inserting a single object.
    obj_meta = pyds.nvds_acquire_obj_meta_from_pool(batch_meta)
    # Set bbox properties. These are in input resolution.
    rect_params = obj_meta.rect_params
    rect_params.left = int(frame_object.left)
    rect_params.top = int(frame_object.top)
    rect_params.width = int(frame_object.width)
    rect_params.height = int(frame_object.height)

    # print(rect_params.height)

    # Semi-transparent yellow backgroud
    rect_params.has_bg_color = 0
    rect_params.bg_color.set(1, 1, 0, 0.4)

    # Red border of width 3
    rect_params.border_width = 3
    rect_params.border_color.set(1, 0, 0, 1)

    # Set object info including class, detection confidence, etc.
    obj_meta.confidence = frame_object.detectionConfidence
    obj_meta.class_id = frame_object.classId

    # There is no tracking ID upon detection. The tracker will
    # assign an ID.
    obj_meta.object_id = UNTRACKED_OBJECT_ID

    lbl_id = frame_object.classId
    if lbl_id >= len(label_names):
        lbl_id = 0

    # Set the object classification label.
    obj_meta.obj_label = label_names[lbl_id]

    # Set display text for the object.
    txt_params = obj_meta.text_params
    if txt_params.display_text:
        pyds.free_buffer(txt_params.display_text)

    txt_params.x_offset = int(rect_params.left)
    txt_params.y_offset = max(0, int(rect_params.top) - 10)
    txt_params.display_text = (
        label_names[lbl_id] + " " + "{:04.3f}".format(frame_object.detectionConfidence)
    )
    # Font , font-color and font-size
    txt_params.font_params.font_name = "Serif"
    txt_params.font_params.font_size = 10
    # set(red, green, blue, alpha); set to White
    txt_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)

    # Text background color
    txt_params.set_bg_clr = 1
    # set(red, green, blue, alpha); set to Black
    txt_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)

    # Inser the object into current frame meta
    # This object has no parent
    pyds.nvds_add_obj_meta_to_frame(frame_meta, obj_meta, None)


def pgie_sink_pad_buffer_probe(pad, info, u_data,q,c,t0):
    #print(['Data_Probed',time.ctime()])  # c+=1
    # print(['from sink probes ',c,(time.time()-t0)//c])
    ordered_dict = OrderedDict()
    # print("\n\n")
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return

    # Retrieve batch metadata from the gst_buffer
    # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
    # C address of gst_buffer as input, which is obtained with hash(gst_buffer)
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list

    while l_frame is not None:
        try:
            # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
            # The casting also keeps ownership of the underlying memory
            # in the C code, so the Python garbage collector will leave
            # it alone.
            
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
            source_id = frame_meta.source_id
            frame_number = frame_meta.frame_num
            # print(f"frame number sending from sink pad={frame_number}")
            if source_id not in ordered_dict:
                ordered_dict[source_id]=[time.time(),frame_number]
            l_frame = l_frame.next
        except StopIteration:
            break

    
    #print(ordered_dict)
    # Define the host and port of the server
    host = 'localhost'
    port = 12345

    
    # q.put(ordered_dict, host, port)
    return Gst.PadProbeReturn.OK

def pgie_src_pad_buffer_probe(pad, info, u_data,connection_thread):
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return

    # Retrieve batch metadata from the gst_buffer
    # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
    # C address of gst_buffer as input, which is obtained with hash(gst_buffer)
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list

    detection_params = DetectionParam(CLASS_NB, ACCURACY_ALL_CLASS)
    box_size_param = BoxSizeParam(IMAGE_HEIGHT, IMAGE_WIDTH,
                                  MIN_BOX_WIDTH, MIN_BOX_HEIGHT)
    nms_param = NmsParam(TOP_K, IOU_THRESHOLD)

    label_names = get_label_names_from_file("labels.txt")
    source_id_list=[]
    while l_frame is not None:
        try:
            # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
            # The casting also keeps ownership of the underlying memory
            # in the C code, so the Python garbage collector will leave
            # it alone.
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
            source_id = frame_meta.source_id
            source_id_list.append(source_id)
            
            # print(f"source is is {source_id}")
        except StopIteration:
            break

        l_user = frame_meta.frame_user_meta_list
        while l_user is not None:
            try:
                # Note that l_user.data needs a cast to pyds.NvDsUserMeta
                # The casting also keeps ownership of the underlying memory
                # in the C code, so the Python garbage collector will leave
                # it alone.
                user_meta = pyds.NvDsUserMeta.cast(l_user.data)
            except StopIteration:
                break

            if (
                    user_meta.base_meta.meta_type
                    != pyds.NvDsMetaType.NVDSINFER_TENSOR_OUTPUT_META
            ):
                continue

            tensor_meta = pyds.NvDsInferTensorMeta.cast(user_meta.user_meta_data)

            # Boxes in the tensor meta should be in network resolution which is
            # found in tensor_meta.network_info. Use this info to scale boxes to
            # the input frame resolution.
            layers_info = []

            for i in range(tensor_meta.num_output_layers):
                layer = pyds.get_nvds_LayerInfo(tensor_meta, i)
                layers_info.append(layer)

            frame_object_list = nvds_infer_parse_custom_tf_ssd(
                layers_info, detection_params, box_size_param, nms_param
            )
            try:
                l_user = l_user.next
            except StopIteration:
                break

            for frame_object in frame_object_list:
                add_obj_meta_to_frame(frame_object, batch_meta, frame_meta, label_names)

        try:
            # indicate inference is performed on the frame
            frame_meta.bInferDone = True
            l_frame = l_frame.next
        except StopIteration:
            break

    connection_thread.send_message(np.array(source_id_list))
    return Gst.PadProbeReturn.OK



def cb_newpad(decodebin, decoder_src_pad,data):
    print("In cb_newpad\n")
    caps=decoder_src_pad.get_current_caps()
    if not caps:
        caps = decoder_src_pad.query_caps()
    gststruct=caps.get_structure(0)
    gstname=gststruct.get_name()
    source_bin=data
    features=caps.get_features(0)

    # Need to check if the pad created by the decodebin is for video and not
    # audio.
    print("gstname=",gstname)
    if(gstname.find("video")!=-1):
        # Link the decodebin pad only if decodebin has picked nvidia
        # decoder plugin nvdec_*. We do this by checking if the pad caps contain
        # NVMM memory features.
        print("features=",features)
        if features.contains("memory:NVMM"):
            # Get the source bin ghost pad
            bin_ghost_pad=source_bin.get_static_pad("src")
            if not bin_ghost_pad.set_target(decoder_src_pad):
                sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
        else:
            sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")

def decodebin_child_added(child_proxy,Object,name,user_data):
    print("Decodebin child added:", name, "\n")
    if(name.find("decodebin") != -1):
        Object.connect("child-added",decodebin_child_added,user_data)

    if "source" in name:
        source_element = child_proxy.get_by_name("source")
        if source_element.find_property('drop-on-latency') != None:
            Object.set_property("drop-on-latency", True)



def create_source_bin(index,uri):
    print("Creating source bin")

    # Create a source GstBin to abstract this bin's content from the rest of the
    # pipeline
    bin_name="source-bin-%02d" %index
    print(bin_name)
    nbin=Gst.Bin.new(bin_name)
    if not nbin:
        sys.stderr.write(" Unable to create source bin \n")

    # Source element for reading from the uri.
    # We will use decodebin and let it figure out the container format of the
    # stream and the codec and plug the appropriate demux and decode plugins.
    if file_loop:
        # use nvurisrcbin to enable file-loop
        uri_decode_bin=Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
        uri_decode_bin.set_property("file-loop", 1)
        uri_decode_bin.set_property("rtsp-reconnect-interval", 10)
        uri_decode_bin.set_property("rtsp-reconnect-attempts", 2160)
        # uri_decode_bin.set_property("cudadec-memtype", 2)

    else:
        uri_decode_bin=Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
    if not uri_decode_bin:
        sys.stderr.write(" Unable to create uri decode bin \n")
    # We set the input uri to the source element
    uri_decode_bin.set_property("uri",uri)
    # Connect to the "pad-added" signal of the decodebin which generates a
    # callback once a new pad for raw data has beed created by the decodebin
    uri_decode_bin.connect("pad-added",cb_newpad,nbin)
    uri_decode_bin.connect("child-added",decodebin_child_added,nbin)

    # We need to create a ghost pad for the source bin which will act as a proxy
    # for the video decoder src pad. The ghost pad will not have a target right
    # now. Once the decode bin creates the video decoder and generates the
    # cb_newpad callback, we will set the ghost pad target to the video decoder
    # src pad.
    Gst.Bin.add(nbin,uri_decode_bin)
    bin_pad=nbin.add_pad(Gst.GhostPad.new_no_target("src",Gst.PadDirection.SRC))
    if not bin_pad:
        sys.stderr.write(" Failed to add ghost pad in source bin \n")
        return None
    return nbin

def kill_the_pipeline(loop,pipeline,connection_thread):
    pipeline.set_state(Gst.State.NULL)
    connection_thread.stop()
    loop.quit()
    sys.exit(1)                    


def bus_call(bus, message, loop,pipeline,connection_thread):
    """
    This is call back from gstreamer
    
    This method takes 1 parameter and access the messages that is posted on pipeline's bus.
    
    :param loop: this is added from mainloop so that u can kill the loop to kill entire pipeline from messges
    :return: None
    """

    t = message.type
    if t == Gst.MessageType.EOS:
        sys.stdout.write("End-of-stream\n")
        FILE_LOGGER.info("I am here at the EOS part")
        FILE_LOGGER.error(f"The EOS recieved from the triton Pipeline, Exiting the application")
        kill_the_pipeline(loop,pipeline,connection_thread)


    elif t==Gst.MessageType.WARNING:
        err, debug = message.parse_warning()
        FILE_LOGGER.warning(f"Warning: {err}: {debug}")
    elif t == Gst.MessageType.ERROR:

        err, debug = message.parse_error()
        FILE_LOGGER.error(f"Error: {err}: {debug}")
        print(f"Error: {err}: {debug}")
        src_element = message.src
        parent_obj = src_element.get_parent()
        if parent_obj:
            try:
                parent_obj_name = parent_obj.name
                FILE_LOGGER.info(f"parent object name is {parent_obj_name}")
                print(f"parent object name is {parent_obj.name}")
                # kill_the_pipeline(loop,pipeline,connection_thread)
            except Exception as e:
                FILE_LOGGER.info(traceback.print_exc())
                print("Error! exiting the app ")
                FILE_LOGGER.error("Error! exiting the app ",exc_info=True)
                # kill_the_pipeline(loop,pipeline,connection_thread)
    

    elif t == Gst.MessageType.ELEMENT:
        struct = message.get_structure()
        #Check for stream-eos message
        if struct is not None and struct.has_name("stream-eos"):
            parsed, stream_id = struct.get_uint("stream-id")
            if parsed:
                #Set eos status of stream to True, to be deleted in delete-sources
                print("Got EOS from stream %d" % stream_id)
                FILE_LOGGER.info("Got EOS from stream %d" % stream_id)
                # Uncomment the below code if you need to restart the entire system
                # kill_the_pipeline(loop,pipeline,connection_thread) 


    return True




def main(args, requested_pgie=None, config=None, disable_probe=False):

    host = '127.0.0.1'
    port = 54321     

    connection_thread = TCPConnectionThread(host, port)
    connection_thread.start()
    #connection_thread.send_message(stats)

    global perf_data
    perf_data = PERF_DATA(len(args))

    number_sources=len(args)

    # Standard GStreamer initialization
    Gst.init(None)

    # Create gstreamer elements */
    # Create Pipeline element that will form a connection of other elements
    print("Creating Pipeline \n ")
    pipeline = Gst.Pipeline()
    is_live = False

    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")
    print("Creating streamux \n ")

    # Create nvstreammux instance to form batches from one or more sources.
    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        sys.stderr.write(" Unable to create NvStreamMux \n")

    pipeline.add(streammux)
    for i in range(number_sources):
        print("Creating source_bin ",i," \n ")
        uri_name=args[i]
        if uri_name.find("rtsp://") == 0 :
            is_live = True
        source_bin=create_source_bin(i, uri_name)
        if not source_bin:
            sys.stderr.write("Unable to create source bin \n")
        pipeline.add(source_bin)
        padname="sink_%u" %i
        sinkpad= streammux.get_request_pad(padname) 
        if not sinkpad:
            sys.stderr.write("Unable to create sink pad bin \n")
        srcpad=source_bin.get_static_pad("src")
        if not srcpad:
            sys.stderr.write("Unable to create src pad bin \n")
        srcpad.link(sinkpad)
    queue1=Gst.ElementFactory.make("queue","queue1")
    queue2=Gst.ElementFactory.make("queue","queue2")
    queue3=Gst.ElementFactory.make("queue","queue3")
    queue4=Gst.ElementFactory.make("queue","queue4")
    queue5=Gst.ElementFactory.make("queue","queue5")
    pipeline.add(queue1)
    pipeline.add(queue2)
    pipeline.add(queue3)
    pipeline.add(queue4)
    pipeline.add(queue5)

    nvdslogger = None

    print("Creating Pgie \n ")
    if True :
        pgie = Gst.ElementFactory.make("nvinferserver", "primary-inference")
    # elif requested_pgie != None and requested_pgie == 'nvinfer':
    #     pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    # else:
    #     pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")

    if not pgie:
        sys.stderr.write(" Unable to create pgie :  %s\n" % requested_pgie)

    if disable_probe:
        # Use nvdslogger for perf measurement instead of probe function
        print ("Creating nvdslogger \n")
        nvdslogger = Gst.ElementFactory.make("nvdslogger", "nvdslogger")

    print("Creating tiler \n ")
    tiler=Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
    if not tiler:
        sys.stderr.write(" Unable to create tiler \n")
    print("Creating nvvidconv \n ")
    nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
    if not nvvidconv:
        sys.stderr.write(" Unable to create nvvidconv \n")
    print("Creating nvosd \n ")
    nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
    if not nvosd:
        sys.stderr.write(" Unable to create nvosd \n")
    nvosd.set_property('process-mode',OSD_PROCESS_MODE)
    nvosd.set_property('display-text',OSD_DISPLAY_TEXT)

    if file_loop:
        if is_aarch64():
            # Set nvbuf-memory-type=4 for aarch64 for file-loop (nvurisrcbin case)
            streammux.set_property('nvbuf-memory-type', 4)
        else:
            # Set nvbuf-memory-type=2 for x86 for file-loop (nvurisrcbin case)
            # streammux.set_property('nvbuf-memory-type', 1)
            pass

    if no_display:
        print("Creating Fakesink \n")
        sink = Gst.ElementFactory.make("fakesink", "fakesink")
        sink.set_property('enable-last-sample', 0)
        sink.set_property('sync', 0)
    else:
        if is_aarch64():
            print("Creating nv3dsink \n")
            sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
            if not sink:
                sys.stderr.write(" Unable to create nv3dsink \n")
        else:
            print("Creating EGLSink \n")
            sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
            if not sink:
                sys.stderr.write(" Unable to create egl sink \n")

    if not sink:
        sys.stderr.write(" Unable to create sink element \n")

    # if is_live:
    #     print("At least one of the sources is live")
    #     streammux.set_property('live-source', 1)

    if os.environ["USE_NEW_NVSTREAMMUX"]=="yes":
        streammux.set_property("config-file-path", "/opt/nvidia/deepstream/deepstream-7.0/sources/src/streammux.txt")
        streammux.set_property("batch-size", 30)
        print("###################Using the new SteramMUX########################\n\n\n\n")
    else:
        streammux.set_property("batched-push-timeout", 25000)
        streammux.set_property("width", 640) #640 3840
        streammux.set_property("height", 480) #480 2160
        streammux.set_property("batch-size", 30)
    streammux.set_property('batched-push-timeout', MUXER_BATCH_TIMEOUT_USEC)
    if requested_pgie == "nvinferserver" and config != None:
        pgie.set_property('config-file-path', config)
    elif requested_pgie == "nvinferserver-grpc" and config != None:
        pgie.set_property('config-file-path', config)
    elif requested_pgie == "nvinfer" and config != None:
        pgie.set_property('config-file-path', config)
    else:
        #pgie.set_property('config-file-path', "/opt/nvidia/deepstream/deepstream-7.0/sources/src/dstest1_pgie_inferserver_config_fake_1080.txt") # dstest1_pgie_inferserver_config_torch_resnet50.txt
        # pgie.set_property('config-file-path', "/opt/nvidia/deepstream/deepstream-7.0/sources/src/dstest1_pgie_inferserver_config_fake_640_10.txt") # dstest1_pgie_inferserver_config_torch_resnet50.txt

        print('########################################OK')
        # pgie.set_property('config-file-path', "/opt/nvidia/deepstream/deepstream-7.0/sources/src/dstest1_pgie_inferserver_config_torch_resnet50.txt")
    pgie_batch_size=pgie.get_property("batch-size")
    if(pgie_batch_size != number_sources):
        print("WARNING: Overriding infer-config batch-size",pgie_batch_size," with number of sources ", number_sources," \n")
        pgie.set_property("batch-size",number_sources)
    tiler_rows=int(math.sqrt(number_sources))
    tiler_columns=int(math.ceil((1.0*number_sources)/tiler_rows))
    tiler.set_property("rows",tiler_rows)
    tiler.set_property("columns",tiler_columns)
    tiler.set_property("width", TILED_OUTPUT_WIDTH)
    tiler.set_property("height", TILED_OUTPUT_HEIGHT)
    sink.set_property("qos",0)

    print("Adding elements to Pipeline \n")
    # pipeline.add(pgie)
    if nvdslogger:
        pipeline.add(nvdslogger)
    pipeline.add(tiler)
    pipeline.add(nvvidconv)
    pipeline.add(nvosd)
    pipeline.add(sink)

    print("Linking elements in the Pipeline \n")
    streammux.link(queue1)
    # queue1.link(pgie)
    queue1.link(queue2)
    if nvdslogger:
        queue2.link(nvdslogger)
        nvdslogger.link(tiler)
    else:
        queue2.link(tiler)
    tiler.link(queue3)
    queue3.link(nvvidconv)
    nvvidconv.link(queue4)
    queue4.link(nvosd)
    nvosd.link(queue5)
    queue5.link(sink)   

    # create an event loop and feed gstreamer bus mesages to it
    loop = GLib.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect ("message", bus_call, loop,pipeline,connection_thread)
    pgie_src_pad=pgie.get_static_pad("src")
    if not pgie_src_pad:
        sys.stderr.write(" Unable to get src pad \n")
    else:
        if not disable_probe:
            pgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, pgie_src_pad_buffer_probe, 0,connection_thread)
            # perf callback function to print fps every 5 sec
            # GLib.timeout_add(5000, perf_data.perf_print_callback)

    # List the sources
    print("Now playing...")
    for i, source in enumerate(args):
        print(i, ": ", source)

    print("Starting pipeline \n")
    # start play back and listed to events		
    pipeline.set_state(Gst.State.PLAYING)
    try:
        loop.run()
    except:
        pass
    # cleanup
    connection_thread.stop()
    print("Exiting app\n")
    pipeline.set_state(Gst.State.NULL)

def parse_args():

    parser = argparse.ArgumentParser(prog="deepstream_test_3",
                    description="deepstream-test3 multi stream, multi model inference reference app")
    # parser.add_argument(
    #     "-i",
    #     "--input",
    #     help="Path to input streams",
    #     nargs="+",
    #     metavar="URIs",
    #     default=["a"],
    #     required=True,
    # )
    # parser.add_argument(
    #     "-c",
    #     "--configfile",
    #     metavar="config_location.txt",
    #     default=None,
    #     help="Choose the config-file to be used with specified pgie",
    # )
    # parser.add_argument(
    #     "-g",
    #     "--pgie",
    #     default=None,
    #     help="Choose Primary GPU Inference Engine",
    #     choices=["nvinfer", "nvinferserver", "nvinferserver-grpc"],
    # )
    # parser.add_argument(
    #     "--no-display",
    #     action="store_true",
    #     default=False,
    #     dest='no_display',
    #     help="Disable display of video output",
    # )
    # parser.add_argument(
    #     "--file-loop",
    #     action="store_true",
    #     default=False,
    #     dest='file_loop',
    #     help="Loop the input file sources after EOS",
    # )
    # parser.add_argument(
    #     "--disable-probe",
    #     action="store_true",
    #     default=False,
    #     dest='disable_probe',
    #     help="Disable the probe function and use nvdslogger for FPS",
    # )
    # parser.add_argument(
    #     "-s",
    #     "--silent",
    #     action="store_true",
    #     default=False,
    #     dest='silent',
    #     help="Disable verbose output",
    # )
    # # Check input arguments
    # # if len(sys.argv) == 1:
    # #     parser.print_help(sys.stderr)
    # #     sys.exit(1)
    # args = parser.parse_args()

    # # stream_paths = args.input
    # pgie = args.pgie
    # config = args.configfile
    # disable_probe = args.disable_probe
    global no_display
    global silent
    global file_loop
    pgie = None
    config = None
    disable_probe = False
    # no_display = args.no_display
    # silent = args.silent
    # file_loop = args.file_loop

    # if config and not pgie or pgie and not config:
    #     sys.stderr.write ("\nEither pgie or configfile is missing. Please specify both! Exiting...\n\n\n\n")
    #     parser.print_help()
    #     sys.exit(1)
    # if config:
    #     config_path = Path(config)
    #     if not config_path.is_file():
    #         sys.stderr.write ("Specified config-file: %s doesn't exist. Exiting...\n\n" % config)
    #         sys.exit(1)

    # print(vars(args))
    return pgie, config, disable_probe



def write_json(D,filename):
    with open(filename, 'w') as f:
        json.dump(D, f, indent=4)

def read_json(filename):
    with open(filename, 'r') as f:
        D = json.load(f)
    return D


def save_dict(di_, filename_):
    with open(filename_, 'wb') as f:
        pickle.dump(di_, f)


def load_dict(filename_):
    with open(filename_, 'rb') as f:
        ret_di = pickle.load(f)
    return ret_di


def extract_addresses(cam_list):
    addresses = []
    if isinstance(cam_list[0], dict):
        for key, cam in cam_list[0].items():
            if isinstance(cam, dict) and 'Address' in cam:
                addresses.append(cam['Address'])
    return addresses

if __name__ == '__main__':

    root_directory = '/opt/nvidia/deepstream/deepstream-7.0/sources/src/SP2023'
    FILE_LOGGER.info("root dir is read as {}".format(root_directory))
    Meta_File = os.path.join(root_directory, "Config/Meta.json")
    Config_File = os.path.join(root_directory, "Config/Config.pkl")
    meta_params = read_json(Meta_File)
    Config=load_dict(Config_File)
    list_of_cams = []
    for key in Config:
        if key != 'Set':
            list_of_cams.append(Config[key])
    stream_paths = extract_addresses(list_of_cams)

    pgie, config, disable_probe = parse_args()
    stream_paths = ['rtsp://192.168.1.4:8555/video1',
                    'file:///opt/nvidia/deepstream/deepstream-7.0/sources/src/2.mp4',
                    'rtsp://192.168.1.4:8555/video1']
    sys.exit(main(stream_paths, pgie, config, disable_probe))

streammux.txt

[property]

algorithm-type=1

batch-size=30

max-fps-control=1

overall-max-fps-n=30

overall-max-fps-d=1

overall-min-fps-n=30

overall-min-fps-d=1

max-same-source-frames=1

This is one of the mp4 file as input

You can place any rtsp camera url instead of given ones.

docker-compose yml

services:
  aivs_service_triton_70:
    container_name: aivs_deepstream_70_container
    build:
      context: .
      args:
        - NODE_ENV=local
      dockerfile: Dockerfile
    network_mode: "host"
    runtime: nvidia
    shm_size: 6g
    volumes:
      - "./src:/opt/nvidia/deepstream/deepstream-7.0/sources/src"
      - "/tmp/.X11-unix:/tmp/.X11-unix:rw"
      - "//var/run/docker.sock://var/run/docker.sock"
      - "/etc/localtime:/etc/localtime:ro"
    environment:
      - USE_NEW_NVSTREAMMUX=yes
      - DISPLAY=$DISPLAY
      - QT_X11_NO_MITSHM=1
      - ENABLE_ALERTS=TRUE
      - PRINT_OUT=FALSE
      - ROOT_DIR=/workspace/SP2023/
      # - GST_DEBUG=1
      # - GST_DEBUG_LEVEL=3
    restart: always
    # command: ["deepstream_python_apps/apps/deepstream-demux-multi-in-multi-out/deepstream_demux_multi_in_multi_out.py","-i", "file:///opt/nvidia/deepstream/deepstream-5.1/workspace/1.mp4"]
    command: ["deepstream_test_3.py"]
    entrypoint: ["python3", "-u"]

Dockerfile

FROM nvcr.io/nvidia/deepstream:7.0-triton-multiarch
RUN apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64/3bf863cc.pub
RUN apt-get update
RUN apt-get install -y python3-pip \
    cmake 
RUN python3 -m pip install scikit-build
RUN python3 -m pip install numpy
RUN apt-get install -y gstreamer-1.0 \
     gir1.2-gst-rtsp-server-1.0  \
     python3-gi \
     iputils-ping \
     python3-gst-1.0 \
     libgstreamer1.0-dev \
     libgstreamer-plugins-base1.0-dev \
     cmake \
     pkg-config
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
    gir1.2-gst-rtsp-server-1.0

RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
    gstreamer1.0-rtsp
RUN apt install libgl1-mesa-glx -y
RUN apt-get install 'ffmpeg'\
    'libsm6'\
    'libxext6'  -y
RUN apt-get install -y libgirepository1.0-dev \
    gobject-introspection gir1.2-gst-rtsp-server-1.0 \
    python3-numpy

RUN python3 -m pip install pyds_ext

WORKDIR /opt/nvidia/deepstream/deepstream-7.0/sources/src

issue seen from my end

Noticing there are many custom codes and the app can’t run, could you simplify the code? it would be good if the app can run directly after copying to deepstream-test3 directory. Thanks!

Ok, I will remove the extra part and will send the code here

–file-loop option caused

Traceback (most recent call last):
File “/opt/nvidia/deepstream/deepstream-7.0/sources/src/driver5.py”, line 521, in
sys.exit(main(stream_paths, pgie, config, disable_probe))
File “/opt/nvidia/deepstream/deepstream-7.0/sources/src/driver5.py”, line 338, in main
streammux.set_property(‘live-source’, 1)
TypeError: object of type GstNvStreamMux' does not have property live-source’

So commenting those lines, that is unnecessary

Replicated the issue in the code
Please find the code

#!/usr/bin/env python3

################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2019-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import sys
sys.path.append('../')
from pathlib import Path
from os import environ
import gi
import configparser
import argparse
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
from ctypes import *
import time
import sys
import math
import platform
from common.platform_info import PlatformInfo
from common.bus_call import bus_call
from common.FPS import PERF_DATA

import os

import pyds

no_display = False
silent = False
file_loop = False
perf_data = None
measure_latency = False

MAX_DISPLAY_LEN=64
PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3
MUXER_OUTPUT_WIDTH=1920
MUXER_OUTPUT_HEIGHT=1080
MUXER_BATCH_TIMEOUT_USEC = 33000
TILED_OUTPUT_WIDTH=1280
TILED_OUTPUT_HEIGHT=720
GST_CAPS_FEATURES_NVMM="memory:NVMM"
OSD_PROCESS_MODE= 0
OSD_DISPLAY_TEXT= 1
pgie_classes_str= ["Vehicle", "TwoWheeler", "Person","RoadSign"]

# pgie_src_pad_buffer_probe  will extract metadata received on tiler sink pad
# and update params for drawing rectangle, object information etc.
def pgie_src_pad_buffer_probe(pad,info,u_data):
    frame_number=0
    num_rects=0
    got_fps = False
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return
    # Retrieve batch metadata from the gst_buffer
    # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
    # C address of gst_buffer as input, which is obtained with hash(gst_buffer)

    # Enable latency measurement via probe if environment variable NVDS_ENABLE_LATENCY_MEASUREMENT=1 is set.
    # To enable component level latency measurement, please set environment variable
    # NVDS_ENABLE_COMPONENT_LATENCY_MEASUREMENT=1 in addition to the above.
    global measure_latency
    if measure_latency:
        num_sources_in_batch = pyds.nvds_measure_buffer_latency(hash(gst_buffer))
        if num_sources_in_batch == 0:
            print("Unable to get number of sources in GstBuffer for latency measurement")

    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list
    while l_frame is not None:
        try:
            # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
            # The casting is done by pyds.NvDsFrameMeta.cast()
            # The casting also keeps ownership of the underlying memory
            # in the C code, so the Python garbage collector will leave
            # it alone.
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        frame_number=frame_meta.frame_num
        l_obj=frame_meta.obj_meta_list
        num_rects = frame_meta.num_obj_meta
        obj_counter = {
        PGIE_CLASS_ID_VEHICLE:0,
        PGIE_CLASS_ID_PERSON:0,
        PGIE_CLASS_ID_BICYCLE:0,
        PGIE_CLASS_ID_ROADSIGN:0
        }
        while l_obj is not None:
            try: 
                # Casting l_obj.data to pyds.NvDsObjectMeta
                obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)
            except StopIteration:
                break
            obj_counter[obj_meta.class_id] += 1
            try: 
                l_obj=l_obj.next
            except StopIteration:
                break
        if not silent:
            print("Frame Number=", frame_number, "Number of Objects=",num_rects,"Vehicle_count=",obj_counter[PGIE_CLASS_ID_VEHICLE],"Person_count=",obj_counter[PGIE_CLASS_ID_PERSON])

        # Update frame rate through this probe
        stream_index = "stream{0}".format(frame_meta.pad_index)
        global perf_data
        perf_data.update_fps(stream_index)

        try:
            l_frame=l_frame.next
        except StopIteration:
            break

    return Gst.PadProbeReturn.OK



def cb_newpad(decodebin, decoder_src_pad,data):
    print("In cb_newpad\n")
    caps=decoder_src_pad.get_current_caps()
    if not caps:
        caps = decoder_src_pad.query_caps()
    gststruct=caps.get_structure(0)
    gstname=gststruct.get_name()
    source_bin=data
    features=caps.get_features(0)

    # Need to check if the pad created by the decodebin is for video and not
    # audio.
    print("gstname=",gstname)
    if(gstname.find("video")!=-1):
        # Link the decodebin pad only if decodebin has picked nvidia
        # decoder plugin nvdec_*. We do this by checking if the pad caps contain
        # NVMM memory features.
        print("features=",features)
        if features.contains("memory:NVMM"):
            # Get the source bin ghost pad
            bin_ghost_pad=source_bin.get_static_pad("src")
            if not bin_ghost_pad.set_target(decoder_src_pad):
                sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
        else:
            sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")

def decodebin_child_added(child_proxy,Object,name,user_data):
    print("Decodebin child added:", name, "\n")
    if(name.find("decodebin") != -1):
        Object.connect("child-added",decodebin_child_added,user_data)

    if "source" in name:
        source_element = child_proxy.get_by_name("source")
        if source_element.find_property('drop-on-latency') != None:
            Object.set_property("drop-on-latency", True)



def create_source_bin(index,uri):
    print("Creating source bin")

    # Create a source GstBin to abstract this bin's content from the rest of the
    # pipeline
    bin_name="source-bin-%02d" %index
    print(bin_name)
    nbin=Gst.Bin.new(bin_name)
    if not nbin:
        sys.stderr.write(" Unable to create source bin \n")

    # Source element for reading from the uri.
    # We will use decodebin and let it figure out the container format of the
    # stream and the codec and plug the appropriate demux and decode plugins.
    if file_loop:
        # use nvurisrcbin to enable file-loop
        uri_decode_bin=Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
        uri_decode_bin.set_property("file-loop", 1)
        uri_decode_bin.set_property("cudadec-memtype", 0)
        uri_decode_bin.set_property("rtsp-reconnect-interval", 10)
        uri_decode_bin.set_property("rtsp-reconnect-attempts", 2160)        
    else:
        uri_decode_bin=Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
    if not uri_decode_bin:
        sys.stderr.write(" Unable to create uri decode bin \n")
    # We set the input uri to the source element
    uri_decode_bin.set_property("uri",uri)
    # Connect to the "pad-added" signal of the decodebin which generates a
    # callback once a new pad for raw data has beed created by the decodebin
    uri_decode_bin.connect("pad-added",cb_newpad,nbin)
    uri_decode_bin.connect("child-added",decodebin_child_added,nbin)

    # We need to create a ghost pad for the source bin which will act as a proxy
    # for the video decoder src pad. The ghost pad will not have a target right
    # now. Once the decode bin creates the video decoder and generates the
    # cb_newpad callback, we will set the ghost pad target to the video decoder
    # src pad.
    Gst.Bin.add(nbin,uri_decode_bin)
    bin_pad=nbin.add_pad(Gst.GhostPad.new_no_target("src",Gst.PadDirection.SRC))
    if not bin_pad:
        sys.stderr.write(" Failed to add ghost pad in source bin \n")
        return None
    return nbin

def main(args, requested_pgie=None, config=None, disable_probe=False):
    global perf_data
    perf_data = PERF_DATA(len(args))

    number_sources=len(args)

    platform_info = PlatformInfo()
    # Standard GStreamer initialization
    Gst.init(None)

    # Create gstreamer elements */
    # Create Pipeline element that will form a connection of other elements
    print("Creating Pipeline \n ")
    pipeline = Gst.Pipeline()
    is_live = False

    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")
    print("Creating streamux \n ")

    # Create nvstreammux instance to form batches from one or more sources.
    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        sys.stderr.write(" Unable to create NvStreamMux \n")

    pipeline.add(streammux)
    for i in range(number_sources):
        print("Creating source_bin ",i," \n ")
        uri_name=args[i]
        if uri_name.find("rtsp://") == 0 :
            is_live = True
        source_bin=create_source_bin(i, uri_name)
        if not source_bin:
            sys.stderr.write("Unable to create source bin \n")
        pipeline.add(source_bin)
        padname="sink_%u" %i
        sinkpad= streammux.request_pad_simple(padname) 
        if not sinkpad:
            sys.stderr.write("Unable to create sink pad bin \n")
        srcpad=source_bin.get_static_pad("src")
        if not srcpad:
            sys.stderr.write("Unable to create src pad bin \n")
        srcpad.link(sinkpad)
    queue1=Gst.ElementFactory.make("queue","queue1")
    queue2=Gst.ElementFactory.make("queue","queue2")
    queue3=Gst.ElementFactory.make("queue","queue3")
    queue4=Gst.ElementFactory.make("queue","queue4")
    queue5=Gst.ElementFactory.make("queue","queue5")
    pipeline.add(queue1)
    pipeline.add(queue2)
    pipeline.add(queue3)
    pipeline.add(queue4)
    pipeline.add(queue5)

    nvdslogger = None

    # print("Creating Pgie \n ")
    # if requested_pgie != None and (requested_pgie == 'nvinferserver' or requested_pgie == 'nvinferserver-grpc') :
    #     pgie = Gst.ElementFactory.make("nvinferserver", "primary-inference")
    # elif requested_pgie != None and requested_pgie == 'nvinfer':
    #     pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    # else:
    #     pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")

    # if not pgie:
    #     sys.stderr.write(" Unable to create pgie :  %s\n" % requested_pgie)

    if disable_probe:
        # Use nvdslogger for perf measurement instead of probe function
        print ("Creating nvdslogger \n")
        nvdslogger = Gst.ElementFactory.make("nvdslogger", "nvdslogger")

    print("Creating tiler \n ")
    tiler=Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
    if not tiler:
        sys.stderr.write(" Unable to create tiler \n")
    print("Creating nvvidconv \n ")
    nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
    if not nvvidconv:
        sys.stderr.write(" Unable to create nvvidconv \n")
    print("Creating nvosd \n ")
    nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
    if not nvosd:
        sys.stderr.write(" Unable to create nvosd \n")
    nvosd.set_property('process-mode',OSD_PROCESS_MODE)
    nvosd.set_property('display-text',OSD_DISPLAY_TEXT)

    if file_loop:
        if platform_info.is_integrated_gpu():
            # Set nvbuf-memory-type=4 for integrated gpu for file-loop (nvurisrcbin case)
            streammux.set_property('nvbuf-memory-type', 4)
        else:
            # Set nvbuf-memory-type=2 for x86 for file-loop (nvurisrcbin case)
            # streammux.set_property('nvbuf-memory-type', 2)
            pass

    if no_display:
        print("Creating Fakesink \n")
        sink = Gst.ElementFactory.make("fakesink", "fakesink")
        sink.set_property('enable-last-sample', 0)
        sink.set_property('sync', 0)
    else:
        if platform_info.is_integrated_gpu():
            print("Creating nv3dsink \n")
            sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
            if not sink:
                sys.stderr.write(" Unable to create nv3dsink \n")
        else:
            if platform_info.is_platform_aarch64():
                print("Creating nv3dsink \n")
                sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
            else:
                print("Creating EGLSink \n")
                sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
            if not sink:
                sys.stderr.write(" Unable to create egl sink \n")

    if not sink:
        sys.stderr.write(" Unable to create sink element \n")

    if is_live:
        print("At least one of the sources is live")
        # streammux.set_property('live-source', 1)
    if os.environ["USE_NEW_NVSTREAMMUX"]=="yes":
        streammux.set_property("config-file-path", "/opt/nvidia/deepstream/deepstream-7.0/sources/src/streammux.txt")
        streammux.set_property("batch-size", 30)
        print("###################Using the new SteramMUX########################\n\n\n\n")
    else:
        streammux.set_property("batched-push-timeout", 25000)
        streammux.set_property("width", 640) #640 3840
        streammux.set_property("height", 480) #480 2160
        streammux.set_property("batch-size", 30)

    # if requested_pgie == "nvinferserver" and config != None:
    #     pgie.set_property('config-file-path', config)
    # elif requested_pgie == "nvinferserver-grpc" and config != None:
    #     pgie.set_property('config-file-path', config)
    # elif requested_pgie == "nvinfer" and config != None:
    #     pgie.set_property('config-file-path', config)
    # else:
    #     pgie.set_property('config-file-path', "dstest3_pgie_config.txt")
    # pgie_batch_size=pgie.get_property("batch-size")
    # if(pgie_batch_size != number_sources):
    #     print("WARNING: Overriding infer-config batch-size",pgie_batch_size," with number of sources ", number_sources," \n")
    #     pgie.set_property("batch-size",number_sources)
    tiler_rows=int(math.sqrt(number_sources))
    tiler_columns=int(math.ceil((1.0*number_sources)/tiler_rows))
    tiler.set_property("rows",tiler_rows)
    tiler.set_property("columns",tiler_columns)
    tiler.set_property("width", TILED_OUTPUT_WIDTH)
    tiler.set_property("height", TILED_OUTPUT_HEIGHT)
    sink.set_property("qos",0)

    print("Adding elements to Pipeline \n")
    # pipeline.add(pgie)
    if nvdslogger:
        pipeline.add(nvdslogger)
    pipeline.add(tiler)
    pipeline.add(nvvidconv)
    pipeline.add(nvosd)
    pipeline.add(sink)

    print("Linking elements in the Pipeline \n")
    streammux.link(queue1)
    # queue1.link(pgie)
    # pgie.link(queue2)
    queue1.link(queue2)
    if nvdslogger:
        queue2.link(nvdslogger)
        nvdslogger.link(tiler)
    else:
        queue2.link(tiler)
    tiler.link(queue3)
    queue3.link(nvvidconv)
    nvvidconv.link(queue4)
    queue4.link(nvosd)
    nvosd.link(queue5)
    queue5.link(sink)   

    # create an event loop and feed gstreamer bus mesages to it
    loop = GLib.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect ("message", bus_call, loop)
    # pgie_src_pad=pgie.get_static_pad("src")
    # if not pgie_src_pad:
    #     sys.stderr.write(" Unable to get src pad \n")
    # else:
    #     if not disable_probe:
    #         pgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, pgie_src_pad_buffer_probe, 0)
    #         # perf callback function to print fps every 5 sec
    #         GLib.timeout_add(5000, perf_data.perf_print_callback)

    # Enable latency measurement via probe if environment variable NVDS_ENABLE_LATENCY_MEASUREMENT=1 is set.
    # To enable component level latency measurement, please set environment variable
    # NVDS_ENABLE_COMPONENT_LATENCY_MEASUREMENT=1 in addition to the above.
    if environ.get('NVDS_ENABLE_LATENCY_MEASUREMENT') == '1':
        print ("Pipeline Latency Measurement enabled!\nPlease set env var NVDS_ENABLE_COMPONENT_LATENCY_MEASUREMENT=1 for Component Latency Measurement")
        global measure_latency
        measure_latency = True

    # List the sources
    print("Now playing...")
    for i, source in enumerate(args):
        print(i, ": ", source)

    print("Starting pipeline \n")
    # start play back and listed to events		
    pipeline.set_state(Gst.State.PLAYING)
    try:
        loop.run()
    except:
        pass
    # cleanup
    print("Exiting app\n")
    pipeline.set_state(Gst.State.NULL)

def parse_args():

    parser = argparse.ArgumentParser(prog="deepstream_test_3",
                    description="deepstream-test3 multi stream, multi model inference reference app")
    parser.add_argument(
        "-i",
        "--input",
        help="Path to input streams",
        nargs="+",
        metavar="URIs",
        default=["a"],
        required=True,
    )
    parser.add_argument(
        "-c",
        "--configfile",
        metavar="config_location.txt",
        default=None,
        help="Choose the config-file to be used with specified pgie",
    )
    parser.add_argument(
        "-g",
        "--pgie",
        default=None,
        help="Choose Primary GPU Inference Engine",
        choices=["nvinfer", "nvinferserver", "nvinferserver-grpc"],
    )
    parser.add_argument(
        "--no-display",
        action="store_true",
        default=False,
        dest='no_display',
        help="Disable display of video output",
    )
    parser.add_argument(
        "--file-loop",
        action="store_true",
        default=False,
        dest='file_loop',
        help="Loop the input file sources after EOS",
    )
    parser.add_argument(
        "--disable-probe",
        action="store_true",
        default=False,
        dest='disable_probe',
        help="Disable the probe function and use nvdslogger for FPS",
    )
    parser.add_argument(
        "-s",
        "--silent",
        action="store_true",
        default=False,
        dest='silent',
        help="Disable verbose output",
    )
    # Check input arguments
    if len(sys.argv) == 1:
        parser.print_help(sys.stderr)
        sys.exit(1)
    args = parser.parse_args()

    stream_paths = args.input
    pgie = args.pgie
    config = args.configfile
    disable_probe = args.disable_probe
    global no_display
    global silent
    global file_loop
    no_display = args.no_display
    silent = args.silent
    file_loop = args.file_loop

    if config and not pgie or pgie and not config:
        sys.stderr.write ("\nEither pgie or configfile is missing. Please specify both! Exiting...\n\n\n\n")
        parser.print_help()
        sys.exit(1)
    # if config:
    #     config_path = Path(config)
    #     if not config_path.is_file():
    #         sys.stderr.write ("Specified config-file: %s doesn't exist. Exiting...\n\n" % config)
    #         sys.exit(1)

    print(vars(args))
    return stream_paths, pgie, config, disable_probe

if __name__ == '__main__':
    stream_paths, pgie, config, disable_probe = parse_args()
    sys.exit(main(stream_paths, pgie, config, disable_probe))

These are the changes required, FYI
I removed streammux new related stuffs and file-loop related stuffs, also removed pgie

etc etc

ALSO FYI: streammux.txt

[property]

algorithm-type=1

batch-size=30

max-fps-control=1

overall-max-fps-n=30

overall-max-fps-d=1

overall-min-fps-n=30

overall-min-fps-d=1

max-same-source-frames=1

This is the command that I have used

python3 driver5.py -i file:///opt/nvidia/deepstream/deepstream-7.0/sources/src/2.mp4  file:///opt/nvidia/deepstream/deepstream-7.0/sources/src/3.mp4 rtsp://192.168.1.6:8555/video1 rtsp://192.168.1.6:8555/video1 --pgie nvinfer -c config_infer_primary_peoplenet.txt  --silent --file-loop 

Using your code without any modification, I can only reproduce the stuttering issue. here is command-line: python3 1.py -i file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4 file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4 rtsp://10.19.226.223/media/video1 rtsp://10.19.226.223/media/video1 --pgie nvinfer -c config_infer_primary_peoplenet.txt --silent --file-loop
and log : log-0820-02.txt (55.3 KB)

about the artifacts, you can add this code uri_decode_bin.set_property(“latency”, 4000). the default value is 100ms.
about the stuttering. please add sink.set_property(“sync”, 0), which means “play as soon as possbile”.

I was getting artifacts even with more latency configured.
Anyway, I will check more, and will let you know.

Hi @fanzh

These artifices and missing frames in batches from new streammux are depend on variable frame rate of the specific input videos, right?
We should tune the params in the streammux.txt to fix those issues, right?

did you confirm any bug? As I said on Aug 20, new streammux does not support scaling. the output of your RTSP and mp4 should have the same resolution. does your rtsp://192.168.1.6:8555/video1, 2.mp4 and 3.mp4 have the same resolution? if not, please refer to solution in the link on Aug 20.