How to dynamically add and delete RTSP input sources and corresponding RTSP server outputs

• Hardware Platform (GPU)
• DeepStream Version : 7.0
• TensorRT Version: 12.4
• NVIDIA GPU Driver Version: 550
• Issue Type : questions

How to dynamically add and delete RTSP input sources and corresponding RTSP server outputs.

My requirement is to implement dynamic multi-channel rtsp inputs on a server and output them through the rtsp server, with each rtsp input corresponding to each rtsp server output.

I am currently using the method of starting multiple pipelines, where one pipeline handles one RTSP input and the corresponding RTSP server output.

But the problem is that the first video can run normally, and when the second pipeline starts, both videos cannot output images through the RTSP server.

#!/usr/bin/env python3

################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2021-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import os
os.environ["GST_DEBUG_DUMP_DOT_DIR"] = "/home/edyang/code/tunnel-fire-det-ds/dot"
os.putenv('GST_DEBUG_DUMP_DIR_DIR', '/home/edyang/code/tunnel-fire-det-ds/dot')

import argparse
import sys

sys.path.append('../')
import gi
import configparser

gi.require_version('Gst', '1.0')
gi.require_version('GstRtspServer', '1.0')
from gi.repository import GLib, Gst, GstRtspServer
from ctypes import *
import sys
import math
from common.is_aarch_64 import is_aarch64
from common.bus_call import bus_call
from common.utils import long_to_uint64,get_md5_str,get_now_time_str

from common.FPS import PERF_DATA
import numpy as np
import pyds
import cv2
import os
import os.path
from os import path

from MediaServerAPI import MediaServer



perf_data = None
frame_count = {}
saved_count = {}


MUXER_BATCH_TIMEOUT_USEC = 33000

FRAME_W = 960
FRAME_H = 540

PGIE_CONFIG_FILE = "yolov5m-official/config_infer_primary_yoloV5-test.txt"
MSG_BROKER_CONN = '192.168.31.9;11883'
MSG_BROKER_TOPIC = 'tunnel/monitor/firework'
MSCONV_CONFIG_FILE = "yolov5m-official/mqtt/cfg_msgconv.txt"
MSG_BROKER_CONFIG_FILE = "yolov5m-official/mqtt/cfg_mqtt.txt"
DEVICE_CODE = ''

RTSP_SERVER_PORT = 8551

IS_SAVE_RESULT = True
SAVE_IMAGE_DIR = "./data/frame"
SAVE_VIDEO_DIR = "./data/video"


RECORD_VIDEO_FPS = 13
RECORD_VIDEO_WRITER = None
RECORD_VIDEO_FILE_NAME = None
RECORD_VIDEO_KEEP_FRAME = RECORD_VIDEO_FPS * 2
RECORD_VIDEO_FOURCC = cv2.VideoWriter_fourcc(*'XVID')



HAS_OBJECT_FRAME_COUNT = 0


MAX_TIME_STAMP_LEN = 32

def caculate(x1,y1,x2,y2):
    u = (x1 + x1 + x1 + x2) / 4
    v = y1
    fx = 9095.541
    fy = 8969.147
    u0 = 2053.659
    v0 = 1084.515

    H = 6.577
    h = 5.670
    erfa = math.pi * 30 / 180
    delta = math.pi * 10 / 180

    d1 = float(H-h) / math.tan(math.atan((abs(v-v0)) / fy) - erfa)
    l1 = float(abs(u - u0)) * (math.sqrt(d1 * d1+(H-h)*(H-h))/ math.sqrt(fy*fy + abs((v-v0)*(v-v0))))
    d = (d1 / math.cos(math.atan(l1 / d1))) * math.cos(delta - math.atan(l1 / d1))
    l = (d1 / math.cos(math.atan(l1 / d1))) * math.sin(delta - math.atan(l1 / d1))
    horizontal_distance = d
    vertical_distance = l

    #result = [['horizontal_distance', 'vertical_distance'],[horizontal_distance, vertical_distance]]

    return horizontal_distance,vertical_distance

def create_record():
    file_name = "{}/record_video_{}.avi".format(SAVE_VIDEO_DIR, get_now_time_str())
    out = cv2.VideoWriter(file_name, fourcc=RECORD_VIDEO_FOURCC, fps=RECORD_VIDEO_FPS,frameSize=(FRAME_W,FRAME_H))
    return out,file_name

def stop_record(out):
    out.release()

def draw_bounding_boxes(image, obj_meta, confidence):
    confidence = '{0:.2f}'.format(confidence)
    rect_params = obj_meta.rect_params
    top = int(rect_params.top)
    left = int(rect_params.left)
    width = int(rect_params.width)
    height = int(rect_params.height)
    obj_name = str(obj_meta.class_id)
    image = cv2.rectangle(image, (left, top), (left + width, top + height), (0, 0, 255, 0), 2, cv2.LINE_4)

    image = cv2.putText(image, obj_name + ',C=' + str(confidence), (left - 10, top - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5,
                        (0, 0, 255, 0), 2)
    return image

def draw_rtsp_url(image,rtsp_url):
    image = cv2.putText(image, rtsp_url, (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.3,
                        (255, 0, 0, 100), 3)
    return image


# tiler_sink_pad_buffer_probe  will extract metadata received on tiler sink pad
# and update params for drawing rectangle, object information etc.
def tiler_sink_pad_buffer_probe(pad, info, u_data):
    frame_number = 0
    num_rects = 0
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return

    # Retrieve batch metadata from the gst_buffer
    # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
    # C address of gst_buffer as input, which is obtained with hash(gst_buffer)
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))

    l_frame = batch_meta.frame_meta_list

    global RECORD_VIDEO_WRITER
    global RECORD_VIDEO_FILE_NAME
    global HAS_OBJECT_FRAME_COUNT

    while l_frame is not None:
        try:
            # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
            # The casting is done by pyds.NvDsFrameMeta.cast()
            # The casting also keeps ownership of the underlying memory
            # in the C code, so the Python garbage collector will leave
            # it alone.
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        frame_number = frame_meta.frame_num
        l_obj = frame_meta.obj_meta_list
        num_rects = frame_meta.num_obj_meta
        is_first_obj = True
        save_image = False
        frame_copy = None

        alarm_type = "DETECT_OTHER"
        
        if IS_SAVE_RESULT:
            if l_obj is not None:
                HAS_OBJECT_FRAME_COUNT += 1
                if HAS_OBJECT_FRAME_COUNT < 0:
                    HAS_OBJECT_FRAME_COUNT = 0
                if HAS_OBJECT_FRAME_COUNT > RECORD_VIDEO_KEEP_FRAME and RECORD_VIDEO_WRITER is None:
                    RECORD_VIDEO_WRITER,RECORD_VIDEO_FILE_NAME = create_record()
                    '''
                    @param type -> alarmType : RECORD_VIDEO_START, RECORD_VIDEO_STOP , DETECT_SMOKE, DETECT_FIRE
                    @param color -> imgUrl / url(videoSetName)
                    @param model -> horizontalDistance
                    @param region -> verticalDistance
                    '''
                    add_user_meta(batch_meta,frame_meta,None,frame_number, 'RECORD_VIDEO_START',RECORD_VIDEO_FILE_NAME)
            else:
                if HAS_OBJECT_FRAME_COUNT > 0:
                    HAS_OBJECT_FRAME_COUNT = 0
                else:
                    HAS_OBJECT_FRAME_COUNT -= 1
                if HAS_OBJECT_FRAME_COUNT < -RECORD_VIDEO_KEEP_FRAME and RECORD_VIDEO_WRITER is not None:
                    stop_record(RECORD_VIDEO_WRITER)
                    
                    '''
                    @param type -> alarmType : RECORD_VIDEO_START, RECORD_VIDEO_STOP , DETECT_SMOKE, DETECT_FIRE
                    @param color -> imgUrl / url(videoSetName)
                    @param model -> horizontalDistance
                    @param region -> verticalDistance
                    '''
                    add_user_meta(batch_meta,frame_meta,None,frame_number, 'RECORD_VIDEO_STOP',RECORD_VIDEO_FILE_NAME)

                    RECORD_VIDEO_WRITER = None
                    RECORD_VIDEO_FILE_NAME = None
                

        


        while l_obj is not None:

            try:
                # Casting l_obj.data to pyds.NvDsObjectMeta
                obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
            except StopIteration:
                break

            
            alarm_type = "DETECT_OTHER"
            if(obj_meta.class_id == 0):
                alarm_type = "DETECT_SMOKE"
            if(obj_meta.class_id == 1):
                alarm_type = "DETECT_FIRE"

            if IS_SAVE_RESULT:
                n_frame = pyds.get_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
                #draw_rtsp_url(n_frame,DEVICE_CODE)
                #n_frame = crop_object(n_frame, obj_meta)
                frame_copy = draw_bounding_boxes(n_frame.copy(), obj_meta, obj_meta.confidence)
                # convert python array into numpy array format in the copy mode.
                frame_copy = np.array(frame_copy, copy=True, order='C')
                # convert the array into cv2 default color format
                frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_RGBA2BGR)
                if is_aarch64(): # If Jetson, since the buffer is mapped to CPU for retrieval, it must also be unmapped 
                    pyds.unmap_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id) # The unmap call should be made after operations with the original array are complete.
                                                                                        #  The original array cannot be accessed after this call.
                
                if saved_count["stream_{}".format(frame_meta.pad_index)] % 30 == 0:
                    save_image = True

            try:
                l_obj = l_obj.next
            except StopIteration:
                break
        # Update frame rate through this probe
        stream_index = "stream{0}".format(frame_meta.pad_index)
        global perf_data
        perf_data.update_fps(stream_index)
        if save_image and IS_SAVE_RESULT and saved_count["stream_{}".format(frame_meta.pad_index)] % (25*5) == 0:
            global SAVE_IMAGE_DIR
            img_path = "{}/frame_{}.jpg".format(SAVE_IMAGE_DIR, get_now_time_str())
            cv2.imwrite(img_path, frame_copy.copy())

            '''
            @param type -> alarmType : RECORD_VIDEO_START, RECORD_VIDEO_STOP , DETECT_SMOKE, DETECT_FIRE
            @param color -> imgUrl / url(videoSetName)
            @param model -> horizontalDistance
            @param region -> verticalDistance
            '''
            add_user_meta(batch_meta,frame_meta,obj_meta,frame_number, alarm_type,img_path)

        saved_count["stream_{}".format(frame_meta.pad_index)] += 1

        if RECORD_VIDEO_WRITER is not None and frame_copy is not None and IS_SAVE_RESULT:  
            ret = RECORD_VIDEO_WRITER.write(frame_copy)

        try:
            l_frame = l_frame.next
        except StopIteration:
            break

    return Gst.PadProbeReturn.OK

def generate_vehicle_meta(data,class_id,type,license,color,make,model,region):
    
    obj = pyds.NvDsVehicleObject.cast(data)
    obj.type = type
    obj.license = license
    obj.color = color
    obj.make = make
    obj.model = model
    obj.region = region
    return obj


def generate_event_msg_meta(data, class_id, type,license,color,make,model,region):
    '''
    @param type -> alarmType : RECORD_VIDEO_START , DETECT_SMOKE, DETECT_FIRE
    @param license -> deviceCode
    @param color -> imgUrl / url(videoSetName)
    @param make -> monitorTime
    @param model -> horizontalDistance
    @param region -> verticalDistance
    '''
    meta = pyds.NvDsEventMsgMeta.cast(data)
    meta.sensorId = 0
    meta.placeId = 0
    meta.moduleId = 0
    meta.sensorStr = "sensor-0"
    meta.ts = pyds.alloc_buffer(MAX_TIME_STAMP_LEN + 1)
    pyds.generate_ts_rfc3339(meta.ts, MAX_TIME_STAMP_LEN)

    # This demonstrates how to attach custom objects.
    # Any custom object as per requirement can be generated and attached
    # like NvDsVehicleObject / NvDsPersonObject. Then that object should
    # be handled in payload generator library (nvmsgconv.cpp) accordingly.

    meta.type = pyds.NvDsEventType.NVDS_EVENT_MOVING
    meta.objType = pyds.NvDsObjectType.NVDS_OBJECT_TYPE_VEHICLE
    meta.objClassId = class_id
    obj = pyds.alloc_nvds_vehicle_object()
    obj = generate_vehicle_meta(obj,class_id,type,license,color,make,model,region)
    meta.extMsg = obj
    meta.extMsgSize = sys.getsizeof(pyds.NvDsVehicleObject)
    return meta

def add_user_meta(batch_meta,frame_meta,obj_meta,frame_number, type,color):
    '''
    @param type -> alarmType : RECORD_VIDEO_START , DETECT_SMOKE, DETECT_FIRE
    @param color -> imgUrl / url(videoSetName)
    @param model -> horizontalDistance
    @param region -> verticalDistance
    '''
    global DEVICE_CODE
    user_event_meta = pyds.nvds_acquire_user_meta_from_pool(batch_meta)
    if user_event_meta:
        # Allocating an NvDsEventMsgMeta instance and getting
        # reference to it. The underlying memory is not manged by
        # Python so that downstream plugins can access it. Otherwise
        # the garbage collector will free it when this probe exits.
        class_id = 0
        msg_meta = pyds.alloc_nvds_event_msg_meta(user_event_meta)
        horizontalDistance = '0'
        verticalDistance = '0'
        if obj_meta is not None :
            class_id = obj_meta.class_id
            msg_meta.bbox.top = obj_meta.rect_params.top
            msg_meta.bbox.left = obj_meta.rect_params.left
            msg_meta.bbox.width = obj_meta.rect_params.width
            msg_meta.bbox.height = obj_meta.rect_params.height
            msg_meta.frameId = frame_number
            msg_meta.trackingId = long_to_uint64(obj_meta.object_id)
            msg_meta.confidence = obj_meta.confidence
            horizontal_distance,vertical_distance = caculate(msg_meta.bbox.left,msg_meta.bbox.top,(msg_meta.bbox.left+msg_meta.bbox.width),(msg_meta.bbox.top+msg_meta.bbox.height))
            horizontalDistance = str(horizontal_distance)
            verticalDistance = str(vertical_distance)
        msg_meta = generate_event_msg_meta(msg_meta, class_id, type,DEVICE_CODE,color,get_now_time_str(),horizontalDistance,verticalDistance)

        user_event_meta.user_meta_data = msg_meta
        user_event_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_EVENT_MSG_META
        pyds.nvds_add_user_meta_to_frame(frame_meta,
                                            user_event_meta)
    else:
        print("Error in attaching event meta to buffer\n")


def cb_newpad(decodebin, decoder_src_pad,data):
    print("In cb_newpad\n")
    caps=decoder_src_pad.get_current_caps()
    if not caps:
        caps = decoder_src_pad.query_caps()
    gststruct=caps.get_structure(0)
    gstname=gststruct.get_name()
    source_bin=data
    features=caps.get_features(0)

    # Need to check if the pad created by the decodebin is for video and not
    # audio.
    print("gstname=",gstname)
    if(gstname.find("video")!=-1):
        # Link the decodebin pad only if decodebin has picked nvidia
        # decoder plugin nvdec_*. We do this by checking if the pad caps contain
        # NVMM memory features.
        print("features=",features)
        if features.contains("memory:NVMM"):
            # Get the source bin ghost pad
            bin_ghost_pad=source_bin.get_static_pad("src")
            if not bin_ghost_pad.set_target(decoder_src_pad):
                sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
        else:
            sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")

def decodebin_child_added(child_proxy,Object,name,user_data):
    print("Decodebin child added:", name, "\n")
    if(name.find("decodebin") != -1):
        Object.connect("child-added",decodebin_child_added,user_data)

    if "source" in name:
        source_element = child_proxy.get_by_name("source")
        if source_element.find_property('drop-on-latency') != None:
            Object.set_property("drop-on-latency", True)

def decodebin_child_added(child_proxy, Object, name, user_data):
    print("Decodebin child added:", name, "\n")
    if name.find("decodebin") != -1:
        Object.connect("child-added", decodebin_child_added, user_data)
    if name.find("source") != -1:
            pyds.configure_source_for_ntp_sync(hash(Object))



def create_source_bin(index,uri):
    print("Creating source bin")

    # Create a source GstBin to abstract this bin's content from the rest of the
    # pipeline
    bin_name="source-bin-%02d" %index
    print(bin_name)
    nbin=Gst.Bin.new(bin_name)
    if not nbin:
        sys.stderr.write(" Unable to create source bin \n")

    # Source element for reading from the uri.
    # We will use decodebin and let it figure out the container format of the
    # stream and the codec and plug the appropriate demux and decode plugins.

    uri_decode_bin=Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
    uri_decode_bin.set_property("cudadec-memtype", 0)
    #uri_decode_bin.set_property("drop-frame-interval", 2)
    #uri_decode_bin=Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
    if not uri_decode_bin:
        sys.stderr.write(" Unable to create uri decode bin \n")
    # We set the input uri to the source element
    uri_decode_bin.set_property("uri",uri)
    uri_decode_bin.set_property("rtsp-reconnect-interval", 5) 
    # Connect to the "pad-added" signal of the decodebin which generates a
    # callback once a new pad for raw data has beed created by the decodebin
    uri_decode_bin.connect("pad-added",cb_newpad,nbin)
    uri_decode_bin.connect("child-added",decodebin_child_added,nbin)

    # We need to create a ghost pad for the source bin which will act as a proxy
    # for the video decoder src pad. The ghost pad will not have a target right
    # now. Once the decode bin creates the video decoder and generates the
    # cb_newpad callback, we will set the ghost pad target to the video decoder
    # src pad.
    Gst.Bin.add(nbin,uri_decode_bin)
    bin_pad=nbin.add_pad(Gst.GhostPad.new_no_target("src",Gst.PadDirection.SRC))
    if not bin_pad:
        sys.stderr.write(" Failed to add ghost pad in source bin \n")
        return None
    return nbin

def main(uri_inputs,codec,bitrate,mqtt_server_ip,mqtt_server_port,rtsp_server_ip, rtsp_server_port, mip, mport, musername, mpassword):

    MSG_BROKER_CONN = f'{mqtt_server_ip};{mqtt_server_port}'

    # Check input arguments

    number_sources = len(uri_inputs)
    if number_sources <= 0:
        sys.stderr.write("Rtsp resource can not empty!")
        sys.exit(1)

    global perf_data
    perf_data = PERF_DATA(number_sources)

    global DEVICE_CODE
    global SAVE_IMAGE_DIR
    global SAVE_VIDEO_DIR
    stream_name = get_md5_str(uri_inputs[0])
    DEVICE_CODE = uri_inputs[0]
    SAVE_IMAGE_DIR = "{}/{}".format(SAVE_IMAGE_DIR,stream_name)
    SAVE_VIDEO_DIR = "{}/{}".format(SAVE_VIDEO_DIR,stream_name)
    
    if path.exists(SAVE_IMAGE_DIR) is False:
        os.makedirs(SAVE_IMAGE_DIR)
        print("Frames will be saved in ", SAVE_IMAGE_DIR)
    
    if path.exists(SAVE_VIDEO_DIR) is False:
        os.makedirs(SAVE_VIDEO_DIR)
        print("Video will be saved in ", SAVE_VIDEO_DIR)

    # Standard GStreamer initialization
    Gst.init(None)

    # Create gstreamer elements */
    # Create Pipeline element that will form a connection of other elements
    print("Creating Pipeline \n ")
    pipeline = Gst.Pipeline()
    is_live = False

    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")
    print("Creating streamux \n ")

    # Create nvstreammux instance to form batches from one or more sources.
    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        sys.stderr.write(" Unable to create NvStreamMux \n")

    pipeline.add(streammux)
    for i in range(number_sources):

        frame_count["stream_" + str(i)] = 0
        saved_count["stream_" + str(i)] = 0
        sys.stdout.write("Creating source_bin")
        uri_name = uri_inputs[i]
        if uri_name.find("rtsp://") == 0:
            is_live = True
        source_bin = create_source_bin(i, uri_name)
        if not source_bin:
            sys.stderr.write("Unable to create source bin \n")
        pipeline.add(source_bin)
        padname = "sink_%u" % i
        sinkpad = streammux.get_request_pad(padname)
        if not sinkpad:
            sys.stderr.write("Unable to create sink pad bin \n")
        srcpad = source_bin.get_static_pad("src")
        if not srcpad:
            sys.stderr.write("Unable to create src pad bin \n")
        srcpad.link(sinkpad)
    print("Creating Pgie \n ")
    pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    #pgie = Gst.ElementFactory.make("nvinferserver", "primary-inference")
    if not pgie:
        sys.stderr.write(" Unable to create pgie \n")
    # Add nvvidconv1 and filter1 to convert the frames to RGBA
    # which is easier to work with in Python.
    print("Creating nvvidconv1 \n ")
    nvvidconv1 = Gst.ElementFactory.make("nvvideoconvert", "convertor1")
    if not nvvidconv1:
        sys.stderr.write(" Unable to create nvvidconv1 \n")
    print("Creating filter1 \n ")
    caps1 = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
    filter1 = Gst.ElementFactory.make("capsfilter", "filter1")
    if not filter1:
        sys.stderr.write(" Unable to get the caps filter1 \n")
    filter1.set_property("caps", caps1)
    print("Creating tiler \n ")
    tiler = Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
    if not tiler:
        sys.stderr.write(" Unable to create tiler \n")
    print("Creating nvvidconv \n ")
    nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
    if not nvvidconv:
        sys.stderr.write(" Unable to create nvvidconv \n")
    print("Creating nvosd \n ")
    nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
    if not nvosd:
        sys.stderr.write(" Unable to create nvosd \n")
    nvvidconv_postosd = Gst.ElementFactory.make("nvvideoconvert", "convertor_postosd")
    if not nvvidconv_postosd:
        sys.stderr.write(" Unable to create nvvidconv_postosd \n")
    
    # Create a caps filter
    caps = Gst.ElementFactory.make("capsfilter", "filter")
    caps.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=I420"))
    
    # Make the encoder
    if codec == "H264":
        encoder = Gst.ElementFactory.make("nvv4l2h264enc", "encoder")
        print("Creating H264 Encoder")
    elif codec == "H265":
        encoder = Gst.ElementFactory.make("nvv4l2h265enc", "encoder")
        print("Creating H265 Encoder")
    if not encoder:
        sys.stderr.write(" Unable to create encoder")
    encoder.set_property('bitrate', bitrate)
    if is_aarch64():
        encoder.set_property('preset-level', 1)
        encoder.set_property('insert-sps-pps', 1)
        #encoder.set_property('bufapi-version', 1)
    
    # Make the payload-encode video into RTP packets
    if codec == "H264":
        rtppay = Gst.ElementFactory.make("rtph264pay", "rtppay")
        print("Creating H264 rtppay")
    elif codec == "H265":
        rtppay = Gst.ElementFactory.make("rtph265pay", "rtppay")
        print("Creating H265 rtppay")
    if not rtppay:
        sys.stderr.write(" Unable to create rtppay")


    # mqtt start
    msgconv = Gst.ElementFactory.make("nvmsgconv", "nvmsg-converter")
    if not msgconv:
        sys.stderr.write(" Unable to create msgconv \n")

    msgbroker = Gst.ElementFactory.make("nvmsgbroker", "nvmsg-broker")
    if not msgbroker:
        sys.stderr.write(" Unable to create msgbroker \n")

    tee = Gst.ElementFactory.make("tee", "nvsink-tee")
    if not tee:
        sys.stderr.write(" Unable to create tee \n")

    queue1 = Gst.ElementFactory.make("queue", "nvtee-que1")
    if not queue1:
        sys.stderr.write(" Unable to create queue1 \n")

    queue2 = Gst.ElementFactory.make("queue", "nvtee-que2")
    if not queue2:
        sys.stderr.write(" Unable to create queue2 \n")
    # mqtt end
    
    # Make the UDP sink
    updsink_port_num = 5400
    sink = Gst.ElementFactory.make("udpsink", "udpsink")
    if not sink:
        sys.stderr.write(" Unable to create udpsink")
    
    sink.set_property('host', '224.224.255.255')
    sink.set_property('port', updsink_port_num)
    sink.set_property('async', False)
    sink.set_property('sync', 1)
    
    print("Playing file {} ".format(uri_inputs))
    
    streammux.set_property('width', FRAME_W)
    streammux.set_property('height', FRAME_H)
    streammux.set_property('batch-size', number_sources)
    streammux.set_property('batched-push-timeout', MUXER_BATCH_TIMEOUT_USEC)
    streammux.set_property("attach-sys-ts", 0)
    
    pgie.set_property('config-file-path', PGIE_CONFIG_FILE)
    #pgie.set_property('interval', 5)
    pgie_batch_size = pgie.get_property("batch-size")
    if (pgie_batch_size != number_sources):
        print("WARNING: Overriding infer-config batch-size", pgie_batch_size, " with number of sources ",
              number_sources, " \n")
        pgie.set_property("batch-size", number_sources)
    tiler_rows = int(math.sqrt(number_sources))
    tiler_columns = int(math.ceil((1.0 * number_sources) / tiler_rows))
    tiler.set_property("rows", tiler_rows)
    tiler.set_property("columns", tiler_columns)
    tiler.set_property("width", FRAME_W)
    tiler.set_property("height", FRAME_H)

    if not is_aarch64():
        # Use CUDA unified memory in the pipeline so frames
        # can be easily accessed on CPU in Python.
        mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
        streammux.set_property("nvbuf-memory-type", mem_type)
        nvvidconv.set_property("nvbuf-memory-type", mem_type)
        nvvidconv1.set_property("nvbuf-memory-type", mem_type)
        tiler.set_property("nvbuf-memory-type", mem_type)
        nvvidconv_postosd.set_property("nvbuf-memory-type", mem_type)

    msgconv.set_property('config', MSCONV_CONFIG_FILE)

    '''
    schema = 0; Full message schema with separate payload per object (Default)
    schema = 1; Minimal message with multiple objects in single payload.
    '''
    schema_type = 0
    msgconv.set_property('payload-type', schema_type)
    
    '''
    Adaptor library can be found at /opt/nvidia/deepstream/deepstream-<version>/lib

    kafka lib           - libnvds_kafka_proto.so
    azure device client - libnvds_azure_proto.so
    AMQP lib            - libnvds_amqp_proto.so
    redis lib           - libnvds_redis_proto.so
    mosquitto lib       - libnvds_mqtt_proto.so
    '''
    msgbroker.set_property('proto-lib', '/opt/nvidia/deepstream/deepstream/lib/libnvds_mqtt_proto.so')
    msgbroker.set_property('conn-str', MSG_BROKER_CONN)
    msgbroker.set_property('config', MSG_BROKER_CONFIG_FILE)
    msgbroker.set_property('topic', MSG_BROKER_TOPIC)
    msgbroker.set_property('sync', False)

    print("Adding elements to Pipeline \n")
    pipeline.add(pgie)
    pipeline.add(tiler)
    pipeline.add(nvvidconv)
    pipeline.add(filter1)
    pipeline.add(nvvidconv1)
    pipeline.add(nvosd)
    pipeline.add(nvvidconv_postosd)
    pipeline.add(caps)
    pipeline.add(encoder)
    pipeline.add(rtppay)
    pipeline.add(sink)
    # mqtt start
    pipeline.add(tee)
    pipeline.add(queue1)
    pipeline.add(queue2)
    pipeline.add(msgconv)
    pipeline.add(msgbroker)
    # mqtt end

    print("Linking elements in the Pipeline \n")
    streammux.link(pgie)
    pgie.link(nvvidconv1)
    nvvidconv1.link(filter1)
    filter1.link(tiler)
    tiler.link(nvvidconv)
    nvvidconv.link(nvosd)
    # mqtt start
    nvosd.link(tee)
    queue1.link(msgconv)
    msgconv.link(msgbroker)
    queue2.link(nvvidconv_postosd)
    # mqtt end
    #nvosd.link(nvvidconv_postosd)
    nvvidconv_postosd.link(caps)
    caps.link(encoder)
    encoder.link(rtppay)
    rtppay.link(sink)

    # mqtt start
    
    tee_msg_pad = tee.get_request_pad('src_%u')
    tee_render_pad = tee.get_request_pad("src_%u")
    if not tee_msg_pad or not tee_render_pad:
        sys.stderr.write("Unable to get request pads\n")
    
    sink_pad = queue1.get_static_pad("sink")
    tee_msg_pad.link(sink_pad)

    sink_pad = queue2.get_static_pad("sink")
    tee_render_pad.link(sink_pad)
    # mqtt end
    
    # create an event loop and feed gstreamer bus mesages to it
    loop = GLib.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect("message", bus_call, loop)
    
    # Start streaming
    server = GstRtspServer.RTSPServer.new()
    server.props.service = "%d" % rtsp_server_port
    server.attach(None)
    
    factory = GstRtspServer.RTSPMediaFactory.new()
    factory.set_launch( "(udpsrc name=pay0 port=%d buffer-size=524288 caps=\"application/x-rtp, media=video, clock-rate=90000, encoding-name=(string)%s, payload=96 \" )" % (updsink_port_num, codec))
    factory.set_shared(True)
    server.get_mount_points().add_factory("/bz-live", factory)

    rtsp_url = 'rtsp://{}:{}/bz-live'.format(rtsp_server_ip,rtsp_server_port)
    #print("\n *** DeepStream: Launched RTSP Streaming at %s ***\n\n" % rtsp_url)
    sys.stdout.write("\n *** DeepStream: Launched RTSP Streaming at %s ***\n\n" % rtsp_url)
    
    tiler_sink_pad = tiler.get_static_pad("sink")
    if not tiler_sink_pad:
        sys.stderr.write(" Unable to get sink pad \n")
    else:
        tiler_sink_pad.add_probe(Gst.PadProbeType.BUFFER, tiler_sink_pad_buffer_probe, 0)
        # perf callback function to print fps every 5 sec
        GLib.timeout_add(5000, perf_data.perf_print_callback)


    # Gst.debug_bin_to_dot_file(pipeline, Gst.DebugGraphDetails.ALL, "baize-ds-pipeline.dot")

    print("Starting pipeline \n")
    # start play back and listed to events		
    pipeline.set_state(Gst.State.PLAYING)

    # start media live 
    #media_server = MediaServer(mip, mport, musername, mpassword)
    #add_media_result = media_server.addPushProxy(rtsp_url)
    #print(f"Media server add stream:{add_media_result}")

    try:
        loop.run()
    except:
        pass
    # cleanup
    print("Exiting app\n")
    pipeline.set_state(Gst.State.NULL)

def parse_args():
    parser = argparse.ArgumentParser(description='RTSP Output Sample Application Help ')
				  
    parser.add_argument("-i","--uri_inputs", metavar='N', type=str, nargs='+',
                    help='Path to inputs URI e.g. rtsp:// ...  or file:// seperated by space')
					
    parser.add_argument("-c", "--codec", default="H265",
                  help="RTSP Streaming Codec H264/H265 , default=H264", choices=['H264','H265'])
    
    parser.add_argument("-b", "--bitrate", default=2000000,
                  help="Set the encoding bitrate ", type=int)
    
    parser.add_argument("--mqtt_server_ip", default='127.0.0.1',required=False,
                  help="Set the mqtt server ip ")
    
    parser.add_argument("--mqtt_server_port", default=11883,required=False,
                  help="Set the mqtt server ip ", type=int)
    
    parser.add_argument("--rtsp_server_ip", default='127.0.0.1',required=False,
                  help="Set the rtsp server ip ")
    
    parser.add_argument("--rtsp_server_port", default=8551,required=False,
                  help="Set the rtsp server port ", type=int)
    
    parser.add_argument("--media_ip", default='127.0.0.1',required=False,
                  help="Set the live server ip ")
    
    parser.add_argument( "--media_port", default=18080,required=False,
                  help="Set the live server port ", type=int)
    
    parser.add_argument("--media_username", default='admin', required=False,
                  help="Set the live server username ")
    
    parser.add_argument("--media_password", default='Stevejobs_',required=False,
                  help="Set the live server password ")
    
    # Check input arguments
    if len(sys.argv)==1:
        parser.print_help(sys.stderr)
        sys.exit(1)
    args = parser.parse_args()
        
    print("URI Inputs: " + str(args.uri_inputs ))
    
    return args.uri_inputs , args.codec, args.bitrate , args.mqtt_server_ip, args.mqtt_server_port,  args.rtsp_server_ip, args.rtsp_server_port, args.media_ip, args.media_port, args.media_username, args.media_password

if __name__ == '__main__':
    '''
    use:
    python Main.py -i 'rtsp://admin:hk123456@192.168.31.100/cam/realmonitor?channel=1&subtype=0' \
        --rtst_server_ip 192.168.31.9 --rtst_server_port 8551 \
        --media_ip 192.168.31.9 --media_port 18080 --media_username admin --media_password Stevejobs_
    '''
    uri_inputs , out_codec, out_bitrate, mqtt_server_ip, mqtt_server_port, rtsp_server_ip, rtsp_server_port, mip, mport, musername, mpassword = parse_args()
    sys.exit(main(uri_inputs, out_codec, out_bitrate,mqtt_server_ip,mqtt_server_port,rtsp_server_ip,rtsp_server_port,mip, mport, musername, mpassword ))

please refer to this sample for how to add and delete sources dynamically.

can i dynamically add and remove input sources and corresponding rtsp server uri

@2045251631 could you elaborate on your requirement? what is the whole media pipeline? how many sources and sinks do you have?
about dynamically adding and removing input sources, please refer to my last comment, nvstreammux already supports adding and removing sources. if using tiler, there is only one sink. you can recreate rtspserver with new rtsp url.

There is no update from you for a period, assuming this is not an issue anymore. Hence we are closing this topic. If need further support, please open a new one. Thanks

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.