Using nvmsgconv and RTSP output on the same DEEPSTREAM pipeline

Hello!

I am trying to use a pipeline that sends metadata regarding the inference results using the nvmsgconv plugin through MQTT/Kafka. To do this I followed the Deepstream-test-4 from “deepstream python apps” examples. However, the example shows how to see the feed on screen, and I would like to “send” it through RTSP. I have done this on other pipelines, but I can’t seem to do it on this one, and I don’t know why. I will leave my code below to see if anyone can help me! Thank you!

#!/usr/bin/env python3


import sys 
import configparser                                                                                                                                                                                                                                                                                                                                                             
sys.path.append('../')
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst, GstRtspServer
from gi.repository import GLib
import sys
import platform
from optparse import OptionParser
from common.is_aarch_64 import is_aarch64
from common.bus_call import bus_call
from common.utils import long_to_int
import pyds

MAX_DISPLAY_LEN=64
MAX_TIME_STAMP_LEN=32
PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3
past_tracking_meta=[0]
MUXER_OUTPUT_WIDTH=1920
MUXER_OUTPUT_HEIGHT=1080
MUXER_BATCH_TIMEOUT_USEC=4000000
input_file = None
schema_type = 0
frame_number = 0
proto_lib = None
#conn_str="localhost;9092;quickstart-events"
conn_str="192.168.0.25;1883;test"
cfg_file = None
topic = None
no_display = False

#PGIE_CONFIG_FILE="/opt/nvidia/deepstream/deepstream-5.1/samples/configs/tlt_pretrained_models/config_infer_primary_trafficcamnet.txt"
PGIE_CONFIG_FILE="dstest4_pgie_config.txt"
MSCONV_CONFIG_FILE="dstest4_msgconv_config.txt"


pgie_classes_str=["Car", "TwoWheeler", "Person","Roadsign"]

# Callback function for deep-copying an NvDsEventMsgMeta struct
def meta_copy_func(data,user_data):
    # Cast data to pyds.NvDsUserMeta
    user_meta=pyds.NvDsUserMeta.cast(data)
    src_meta_data=user_meta.user_meta_data
    # Cast src_meta_data to pyds.NvDsEventMsgMeta
    srcmeta=pyds.NvDsEventMsgMeta.cast(src_meta_data)
    # Duplicate the memory contents of srcmeta to dstmeta
    # First use pyds.get_ptr() to get the C address of srcmeta, then
    # use pyds.memdup() to allocate dstmeta and copy srcmeta into it.
    # pyds.memdup returns C address of the allocated duplicate.
    dstmeta_ptr=pyds.memdup(pyds.get_ptr(srcmeta), sys.getsizeof(pyds.NvDsEventMsgMeta))
    # Cast the duplicated memory to pyds.NvDsEventMsgMeta
    dstmeta=pyds.NvDsEventMsgMeta.cast(dstmeta_ptr)

    # Duplicate contents of ts field. Note that reading srcmeat.ts
    # returns its C address. This allows to memory operations to be
    # performed on it.
    dstmeta.ts=pyds.memdup(srcmeta.ts, MAX_TIME_STAMP_LEN+1)

    # Copy the sensorStr. This field is a string property.
    # The getter (read) returns its C address. The setter (write)
    # takes string as input, allocates a string buffer and copies
    # the input string into it.
    # pyds.get_string() takes C address of a string and returns
    # the reference to a string object and the assignment inside the binder copies content.
    
    dstmeta.sensorStr=pyds.get_string(srcmeta.sensorStr)

    if(srcmeta.objSignature.size>0):
        dstmeta.objSignature.signature=pyds.memdup(srcmeta.objSignature.signature,srcMeta.objSignature.size)
        dstmeta.objSignature.size = srcmeta.objSignature.size

    if(srcmeta.extMsgSize>0):
        if(srcmeta.objType==pyds.NvDsObjectType.NVDS_OBJECT_TYPE_VEHICLE):
            srcobj = pyds.NvDsVehicleObject.cast(srcmeta.extMsg)
            obj = pyds.alloc_nvds_vehicle_object()
            obj.type=pyds.get_string(srcobj.type)
            obj.make=pyds.get_string(srcobj.make)
            obj.model=pyds.get_string(srcobj.model)
            obj.color=pyds.get_string(srcobj.color)
            obj.license = pyds.get_string(srcobj.license)
            obj.region = pyds.get_string(srcobj.region)
            dstmeta.extMsg = obj
            dstmeta.extMsgSize = sys.getsizeof(pyds.NvDsVehicleObject)
        if(srcmeta.objType==pyds.NvDsObjectType.NVDS_OBJECT_TYPE_BICYCLE):
            srcobj = pyds.NvDsVehicleObject.cast(srcmeta.extMsg)
            obj = pyds.alloc_nvds_vehicle_object()
            obj.type=pyds.get_string(srcobj.type)
            obj.make=pyds.get_string(srcobj.make)
            obj.model=pyds.get_string(srcobj.model)
            obj.color=pyds.get_string(srcobj.color)
            obj.license = pyds.get_string(srcobj.license)
            obj.region = pyds.get_string(srcobj.region)
            dstmeta.extMsg = obj
        dstmeta.extMsgSize = sys.getsizeof(pyds.NvDsVehicleObject)
    if(srcmeta.objType==pyds.NvDsObjectType.NVDS_OBJECT_TYPE_PERSON):
        srcobj = pyds.NvDsPersonObject.cast(srcmeta.extMsg)
        obj = pyds.alloc_nvds_person_object()
        obj.age = srcobj.age
        obj.gender = pyds.get_string(srcobj.gender)
        obj.cap = pyds.get_string(srcobj.cap)
        obj.hair = pyds.get_string(srcobj.hair)
        obj.apparel = pyds.get_string(srcobj.apparel)
        dstmeta.extMsg = obj
        dstmeta.extMsgSize = sys.getsizeof(pyds.NvDsVehicleObject)

return dstmeta

# Callback function for freeing an NvDsEventMsgMeta instance
def meta_free_func(data,user_data):
    user_meta=pyds.NvDsUserMeta.cast(data)
    srcmeta=pyds.NvDsEventMsgMeta.cast(user_meta.user_meta_data)

# pyds.free_buffer takes C address of a buffer and frees the memory
# It's a NOP if the address is NULL
pyds.free_buffer(srcmeta.ts)
pyds.free_buffer(srcmeta.sensorStr)

if(srcmeta.objSignature.size > 0):
    pyds.free_buffer(srcmeta.objSignature.signature)
    srcmeta.objSignature.size = 0

if(srcmeta.extMsgSize > 0):
    if(srcmeta.objType == pyds.NvDsObjectType.NVDS_OBJECT_TYPE_VEHICLE):
        obj =pyds.NvDsVehicleObject.cast(srcmeta.extMsg)
        pyds.free_buffer(obj.type)
        pyds.free_buffer(obj.color)
        pyds.free_buffer(obj.make)
        pyds.free_buffer(obj.model)
        pyds.free_buffer(obj.license)
        pyds.free_buffer(obj.region)
    if(srcmeta.objType == pyds.NvDsObjectType.NVDS_OBJECT_TYPE_BICYCLE):
        obj =pyds.NvDsVehicleObject.cast(srcmeta.extMsg)
        pyds.free_buffer(obj.type)
        pyds.free_buffer(obj.color)
        pyds.free_buffer(obj.make)
        pyds.free_buffer(obj.model)
        pyds.free_buffer(obj.license)
        pyds.free_buffer(obj.region)
    if(srcmeta.objType == pyds.NvDsObjectType.NVDS_OBJECT_TYPE_PERSON):
        obj = pyds.NvDsPersonObject.cast(srcmeta.extMsg)
        pyds.free_buffer(obj.gender)
        pyds.free_buffer(obj.cap)
        pyds.free_buffer(obj.hair)
        pyds.free_buffer(obj.apparel)
    pyds.free_gbuffer(srcmeta.extMsg)
    srcmeta.extMsgSize = 0

def generate_vehicle_meta(data):
    obj = pyds.NvDsVehicleObject.cast(data)
    obj.type =""
    obj.color=""
    obj.make =""
    obj.model = ""
    obj.license =""
    obj.region =""
    return obj

def generate_bicycle_meta(data):
    obj = pyds.NvDsVehicleObject.cast(data)
    obj.type =""
    obj.color=""
    obj.make =""
    obj.model = ""
    obj.license =""
    obj.region =""
    return obj

def generate_person_meta(data):
    obj = pyds.NvDsPersonObject.cast(data)
    obj.age = 0
    obj.cap = ""
    obj.hair = ""
    obj.gender = ""
    obj.apparel= ""
    return obj

def generate_event_msg_meta(data, class_id):
    meta = pyds.NvDsEventMsgMeta.cast(data)
    meta.sensorId = 0
    meta.placeId = 0
    meta.moduleId = 0
    meta.sensorStr = "CAMERA_NAME:MAC_ADDRESS"
    meta.ts = pyds.alloc_buffer(MAX_TIME_STAMP_LEN + 1)
    pyds.generate_ts_rfc3339(meta.ts, MAX_TIME_STAMP_LEN)

# This demonstrates how to attach custom objects.
# Any custom object as per requirement can be generated and attached
# like NvDsVehicleObject / NvDsPersonObject. Then that object should
# be handled in payload generator library (nvmsgconv.cpp) accordingly.
if(class_id==PGIE_CLASS_ID_VEHICLE):
    meta.type = pyds.NvDsEventType.NVDS_EVENT_MOVING
    meta.objType = pyds.NvDsObjectType.NVDS_OBJECT_TYPE_VEHICLE
    meta.objClassId = PGIE_CLASS_ID_VEHICLE
    obj = pyds.alloc_nvds_vehicle_object()
    obj = generate_vehicle_meta(obj)
    meta.extMsg = obj
    meta.extMsgSize = sys.getsizeof(pyds.NvDsVehicleObject)
if(class_id==PGIE_CLASS_ID_BICYCLE):
    meta.type = pyds.NvDsEventType.NVDS_EVENT_MOVING
    meta.objType = pyds.NvDsObjectType.NVDS_OBJECT_TYPE_BICYCLE
    meta.objClassId = PGIE_CLASS_ID_BICYCLE
    obj = pyds.alloc_nvds_vehicle_object()
    obj = generate_bicycle_meta(obj)
    meta.extMsg = obj
    meta.extMsgSize = sys.getsizeof(pyds.NvDsVehicleObject)
if(class_id == PGIE_CLASS_ID_PERSON):
    meta.type =pyds.NvDsEventType.NVDS_EVENT_ENTRY
    meta.objType = pyds.NvDsObjectType.NVDS_OBJECT_TYPE_PERSON
    meta.objClassId = PGIE_CLASS_ID_PERSON
    obj = pyds.alloc_nvds_person_object()
    obj=generate_person_meta(obj)
    meta.extMsg = obj
    meta.extMsgSize = sys.getsizeof(pyds.NvDsPersonObject)
return meta

# osd_sink_pad_buffer_probe  will extract metadata received on OSD sink pad
# and update params for drawing rectangle, object information etc.
# IMPORTANT NOTE:
# a) probe() callbacks are synchronous and thus holds the buffer
#    (info.get_buffer()) from traversing the pipeline until user return.
# b) loops inside probe() callback could be costly in python.
#    So users shall optimize according to their use-case.
def osd_sink_pad_buffer_probe(pad,info,u_data):
frame_number=0
#Intiallizing object counter with 0.
obj_counter = {
    PGIE_CLASS_ID_VEHICLE:0,
    PGIE_CLASS_ID_PERSON:0,
    PGIE_CLASS_ID_BICYCLE:0,
    PGIE_CLASS_ID_ROADSIGN:0
}
num_rects = 0
is_first_object=True
gst_buffer = info.get_buffer()
if not gst_buffer:
    print("Unable to get GstBuffer ")
    return

# Retrieve batch metadata from the gst_buffer
# Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
# C address of gst_buffer as input, which is obtained with hash(gst_buffer)
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
if not batch_meta:
    return Gst.PadProbeReturn.OK
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
    try:
        # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
        # The casting is done by pyds.NvDsFrameMeta.cast()
        # The casting also keeps ownership of the underlying memory
        # in the C code, so the Python garbage collector will leave
        # it alone.
        frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
    except StopIteration:
        continue
    is_first_object = True

    '''
    print("Frame Number is ", frame_meta.frame_num)
    print("Source id is ", frame_meta.source_id)
    print("Batch id is ", frame_meta.batch_id)
    print("Source Frame Width ", frame_meta.source_frame_width)
    print("Source Frame Height ", frame_meta.source_frame_height)
    print("Num object meta ", frame_meta.num_obj_meta)
    '''
    frame_number=frame_meta.frame_num
    num_rects = frame_meta.num_obj_meta
    l_obj=frame_meta.obj_meta_list
    while l_obj is not None:
        try:
            obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)
        except StopIteration:
            continue

        # Update the object text display
        txt_params=obj_meta.text_params

        # Set display_text. Any existing display_text string will be
        # freed by the bindings module.
        txt_params.display_text = pgie_classes_str[obj_meta.class_id]

        obj_counter[obj_meta.class_id] += 1

        # Font , font-color and font-size
        txt_params.font_params.font_name = "Serif"
        txt_params.font_params.font_size = 10
        # set(red, green, blue, alpha); set to White
        txt_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)

        # Text background color
        txt_params.set_bg_clr = 1
        # set(red, green, blue, alpha); set to Black
        txt_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)

        # Ideally NVDS_EVENT_MSG_META should be attached to buffer by the
        # component implementing detection / recognition logic.
        # Here it demonstrates how to use / attach that meta data.
        #if(is_first_object and not (frame_number%30)):
        if (not (frame_number % 30)):
            # Frequency of messages to be send will be based on use case.
            # Here message is being sent for first object every 30 frames.

            # Allocating an NvDsEventMsgMeta instance and getting reference
            # to it. The underlying memory is not manged by Python so that
            # downstream plugins can access it. Otherwise the garbage collector
            # will free it when this probe exits.
            msg_meta=pyds.alloc_nvds_event_msg_meta()
            msg_meta.bbox.top =  obj_meta.rect_params.top
            msg_meta.bbox.left =  obj_meta.rect_params.left
            msg_meta.bbox.width = obj_meta.rect_params.width
            msg_meta.bbox.height = obj_meta.rect_params.height
            msg_meta.frameId = frame_number
            msg_meta.trackingId = long_to_int(obj_meta.object_id)
            msg_meta.confidence = obj_meta.confidence
            msg_meta = generate_event_msg_meta(msg_meta, obj_meta.class_id)
            user_event_meta = pyds.nvds_acquire_user_meta_from_pool(batch_meta)
            if(user_event_meta):
                user_event_meta.user_meta_data = msg_meta
                user_event_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_EVENT_MSG_META
                # Setting callbacks in the event msg meta. The bindings layer
                # will wrap these callables in C functions. Currently only one
                # set of callbacks is supported.
                pyds.user_copyfunc(user_event_meta, meta_copy_func)
                pyds.user_releasefunc(user_event_meta, meta_free_func)
                pyds.nvds_add_user_meta_to_frame(frame_meta, user_event_meta)
            else:
                print("Error in attaching event meta to buffer\n")

            is_first_object = False
        try:
            l_obj=l_obj.next
        except StopIteration:
            break
    try:
        l_frame=l_frame.next
    except StopIteration:
        break
#past traking meta data
if(past_tracking_meta[0]==1):
    l_user=batch_meta.batch_user_meta_list
    while l_user is not None:
        try:
            # Note that l_user.data needs a cast to pyds.NvDsUserMeta
            # The casting is done by pyds.NvDsUserMeta.cast()
            # The casting also keeps ownership of the underlying memory
            # in the C code, so the Python garbage collector will leave
            # it alone
            user_meta=pyds.NvDsUserMeta.cast(l_user.data)
        except StopIteration:
            break
        if(user_meta and user_meta.base_meta.meta_type==pyds.NvDsMetaType.NVDS_TRACKER_PAST_FRAME_META):
            try:
                # Note that user_meta.user_meta_data needs a cast to pyds.NvDsPastFrameObjBatch
                # The casting is done by pyds.NvDsPastFrameObjBatch.cast()
                # The casting also keeps ownership of the underlying memory
                # in the C code, so the Python garbage collector will leave
                # it alone
                pPastFrameObjBatch = pyds.NvDsPastFrameObjBatch.cast(user_meta.user_meta_data)
            except StopIteration:
                break
#print("Frame Number =",frame_number,"Vehicle Count =",obj_counter[PGIE_CLASS_ID_VEHICLE],"Person Count =",obj_counter[PGIE_CLASS_ID_PERSON])
return Gst.PadProbeReturn.OK


def main(args):
    GObject.threads_init()
    Gst.init(None)

codec = 'H264'
bitrate = 4000000

# Standard GStreamer initialization
if(len(args)==3):
    past_tracking_meta[0]=int(args[2])

#registering callbacks
pyds.register_user_copyfunc(meta_copy_func)
pyds.register_user_releasefunc(meta_free_func)

print("Creating Pipeline \n ")

pipeline = Gst.Pipeline()

if not pipeline:
    sys.stderr.write(" Unable to create Pipeline \n")

print("Creating Source \n ")
source = Gst.ElementFactory.make("filesrc", "file-source")
if not source:
    sys.stderr.write(" Unable to create Source \n")

print("Creating H264Parser \n")
h264parser = Gst.ElementFactory.make("h264parse", "h264-parser")
if not h264parser:
    sys.stderr.write(" Unable to create h264 parser \n")

print("Creating Decoder \n")
decoder = Gst.ElementFactory.make("nvv4l2decoder", "nvv4l2-decoder")
if not decoder:
    sys.stderr.write(" Unable to create Nvv4l2 Decoder \n")

streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
    sys.stderr.write(" Unable to create NvStreamMux \n")

pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
if not pgie:
    sys.stderr.write(" Unable to create pgie \n")

tracker = Gst.ElementFactory.make("nvtracker", "tracker")
if not tracker:
    sys.stderr.write(" Unable to create tracker \n")
    
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
if not nvvidconv:
    sys.stderr.write(" Unable to create nvvidconv \n")

nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
if not nvosd:
    sys.stderr.write(" Unable to create nvosd \n")

msgconv=Gst.ElementFactory.make("nvmsgconv", "nvmsg-converter")
if not msgconv:
    sys.stderr.write(" Unable to create msgconv \n")

msgbroker=Gst.ElementFactory.make("nvmsgbroker", "nvmsg-broker")
if not msgbroker:
    sys.stderr.write(" Unable to create msgbroker \n")

tee=Gst.ElementFactory.make("tee", "nvsink-tee")
if not tee:
    sys.stderr.write(" Unable to create tee \n")

queue1=Gst.ElementFactory.make("queue", "nvtee-que1")
if not queue1:
    sys.stderr.write(" Unable to create queue1 \n")

queue2=Gst.ElementFactory.make("queue", "nvtee-que2")
if not queue2:
    sys.stderr.write(" Unable to create queue2 \n")

encoder = Gst.ElementFactory.make("nvv4l2h264enc", "encoder")
print("Creating H264 Encoder")
if not encoder:
    sys.stderr.write(" Unable to create encoder")
encoder.set_property('bitrate', bitrate)
if is_aarch64():
    encoder.set_property('preset-level', 1)
    encoder.set_property('insert-sps-pps', 1)
    encoder.set_property('bufapi-version', 1)

rtppay = Gst.ElementFactory.make("rtph264pay", "rtppay")
print("Creating H264 rtppay")

if not rtppay:
    sys.stderr.write(" Unable to create rtppay")

nvvidconv_postosd = Gst.ElementFactory.make("nvvideoconvert", "convertor_postosd")
if not nvvidconv_postosd:
    sys.stderr.write(" Unable to create nvvidconv_postosd \n")

# Create a caps filter
caps = Gst.ElementFactory.make("capsfilter", "filter")
caps.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=I420"))

# Make the UDP sink
updsink_port_num = 5400
sink = Gst.ElementFactory.make("udpsink", "udpsink")
if not sink:
    sys.stderr.write(" Unable to create udpsink")

sink.set_property('host', '224.224.255.255')
sink.set_property('port', updsink_port_num)
sink.set_property('async', False)
sink.set_property('sync', 1)

if is_aarch64():
    transform = Gst.ElementFactory.make("nvegltransform", "nvegl-transform")

print("Playing file %s " %input_file)
source.set_property('location', input_file)
streammux.set_property('width', 1920)
streammux.set_property('height', 1080)
streammux.set_property('batch-size', 1)
streammux.set_property('batched-push-timeout', 4000000)


   #Set properties of tracker
    config = configparser.ConfigParser()
    config.read('dstest2_tracker_config.txt')
    config.sections()

for key in config['tracker']:
    if key == 'tracker-width' :
        tracker_width = config.getint('tracker', key)
        tracker.set_property('tracker-width', tracker_width)
    if key == 'tracker-height' :
        tracker_height = config.getint('tracker', key)
        tracker.set_property('tracker-height', tracker_height)
    if key == 'gpu-id' :
        tracker_gpu_id = config.getint('tracker', key)
        tracker.set_property('gpu_id', tracker_gpu_id)
    if key == 'll-lib-file' :
        tracker_ll_lib_file = config.get('tracker', key)
        tracker.set_property('ll-lib-file', tracker_ll_lib_file)
    if key == 'll-config-file' :
        tracker_ll_config_file = config.get('tracker', key)
        tracker.set_property('ll-config-file', tracker_ll_config_file)
    if key == 'enable-batch-process' :
        tracker_enable_batch_process = config.getint('tracker', key)
        tracker.set_property('enable_batch_process', tracker_enable_batch_process)
    if key == 'enable-past-frame' :
        tracker_enable_past_frame = config.getint('tracker', key)
        tracker.set_property('enable_past_frame', tracker_enable_past_frame)

pgie.set_property('config-file-path', PGIE_CONFIG_FILE)
msgconv.set_property('config',MSCONV_CONFIG_FILE)
msgconv.set_property('payload-type', schema_type)
msgbroker.set_property('proto-lib', proto_lib)
msgbroker.set_property('conn-str', conn_str)
if cfg_file is not None:
    msgbroker.set_property('config', cfg_file)
if topic is not None:
    msgbroker.set_property('topic', topic)
msgbroker.set_property('sync', False)

print("Adding elements to Pipeline \n")
pipeline.add(tee)
pipeline.add(queue1)
pipeline.add(queue2)
pipeline.add(msgconv)
pipeline.add(msgbroker)

pipeline.add(nvvidconv_postosd)
pipeline.add(caps)
pipeline.add(encoder)
pipeline.add(rtppay)
if is_aarch64():
    pipeline.add(transform)
pipeline.add(sink)

print("Linking elements in the Pipeline \n")
source.link(h264parser)
h264parser.link(decoder)
sinkpad = streammux.get_request_pad("sink_0")
if not sinkpad:
    sys.stderr.write(" Unable to get the sink pad of streammux \n")

srcpad = decoder.get_static_pad("src")
if not srcpad:
    sys.stderr.write(" Unable to get source pad of decoder \n")

streammux.link(pgie)
pgie.link(nvvidconv)
nvvidconv.link(nvosd)
nvosd.link(tee)
queue1.link(msgconv)
msgconv.link(msgbroker)

queue2.link(nvvidconv_postosd)    
nvvidconv_postosd.link(caps)
caps.link(encoder)
encoder.link(rtppay)
rtppay.link(sink)

sink_pad=queue1.get_static_pad("sink")
tee_msg_pad=tee.get_request_pad('src_%u')
tee_stream_pad=tee.get_request_pad("src_%u")
if not tee_msg_pad or not tee_stream_pad:
    sys.stderr.write("Unable to get request pads\n")
tee_msg_pad.link(sink_pad)
sink_pad=queue2.get_static_pad("sink")
tee_stream_pad.link(sink_pad)

# create an event loop and feed gstreamer bus mesages to it
loop = GObject.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect ("message", bus_call, loop)

osdsinkpad = nvosd.get_static_pad("sink")
if not osdsinkpad:
    sys.stderr.write(" Unable to get sink pad of nvosd \n")

osdsinkpad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0)

# Start streaming
rtsp_port_num = 8554

server = GstRtspServer.RTSPServer.new()
server.props.service = "%d" % rtsp_port_num
server.attach(None)

factory = GstRtspServer.RTSPMediaFactory.new()
factory.set_launch( "( udpsrc name=pay0 port=%d buffer-size=524288 caps=\"application/x-rtp, media=video, clock-rate=90000, encoding-name=(string)%s, payload=96 \" )" % (updsink_port_num, codec))
factory.set_shared(True)
server.get_mount_points().add_factory("/ds-test", factory)

print("\n *** DeepStream: Launched RTSP Streaming at rtsp://localhost:%d/ds-test ***\n\n" % rtsp_port_num)


print("Starting pipeline \n")

# start play back and listed to events
pipeline.set_state(Gst.State.PLAYING)
try:
    loop.run()
except:
    pass
# cleanup
pyds.unset_callback_funcs()
pipeline.set_state(Gst.State.NULL)

# Parse and validate input arguments
def parse_args():
    parser = OptionParser()
    parser.add_option("-c", "--cfg-file", dest="cfg_file",
                  help="Set the adaptor config file. Optional if connection string has relevant  details.", metavar="FILE")
    parser.add_option("-i", "--input-file", dest="input_file",
                  help="Set the input H264 file", metavar="FILE")
    parser.add_option("-p", "--proto-lib", dest="proto_lib",
                  help="Absolute path of adaptor library", metavar="PATH")
    parser.add_option("", "--conn-str", dest="conn_str",
                  help="Connection string of backend server. Optional if it is part of config file.", metavar="STR")
    parser.add_option("-s", "--schema-type", dest="schema_type", default="0",
                  help="Type of message schema (0=Full, 1=minimal), default=0", metavar="<0|1>")
    parser.add_option("-t", "--topic", dest="topic",
                  help="Name of message topic. Optional if it is part of connection string or config file.", metavar="TOPIC")
    parser.add_option("", "--no-display", action="store_true", dest="no_display", default=False,
                  help="Disable display")

(options, args) = parser.parse_args()

global cfg_file
global input_file
global proto_lib
global conn_str
global topic
global schema_type
global no_display
cfg_file = options.cfg_file
input_file = options.input_file
proto_lib = options.proto_lib
conn_str = options.conn_str
topic = options.topic
no_display = options.no_display

if proto_lib is None or input_file is None:
    print("Usage: python3 deepstream_test_4.py -i <H264 filename> -p <Proto adaptor library> --conn-str=<Connection string>")
    return 1

if (options.schema_type == "0"):
    schema_type = 0
else:
    schema_type = 1

return 0

if __name__ == '__main__':
    ret = parse_args()
    #If argumer parsing fail, return failure (non-zero)
    if ret == 1:
        sys.exit(1)
    sys.exit(main(sys.argv))

Hi,
We have sample code of RTSP output in
deepstream_python_apps/apps/deepstream-test1-rtsp-out at master · NVIDIA-AI-IOT/deepstream_python_apps · GitHub

You may refer to the sample and integrate it to test4.