Memory leak when I put for recording (smart recording)

#!/usr/bin/env python3

################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2019-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import sys
sys.path.append('../')
from pathlib import Path
import gi
import configparser
import argparse
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
from ctypes import *
import time
import sys
import math
import platform

from common.platform_info import PlatformInfo
from common.bus_call import bus_call
from common.FPS import PERF_DATA
import time, threading
import pyds
import ctypes

no_display = False
silent = False
file_loop = False
perf_data = None

MAX_DISPLAY_LEN=64
PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3
MUXER_OUTPUT_WIDTH=1920
MUXER_OUTPUT_HEIGHT=1080
MUXER_BATCH_TIMEOUT_USEC = 33000
TILED_OUTPUT_WIDTH=1280
TILED_OUTPUT_HEIGHT=720
GST_CAPS_FEATURES_NVMM="memory:NVMM"
OSD_PROCESS_MODE= 0
OSD_DISPLAY_TEXT= 1
pgie_classes_str= ["Vehicle", "TwoWheeler", "Person","RoadSign"]
list_for_nvurisrcbin = []



class SRUserContext(ctypes.Structure):
    _fields_ = [
        ("sessionid", ctypes.c_int),
        ("name", ctypes.c_char * 32)
    ]

# pgie_src_pad_buffer_probe  will extract metadata received on tiler sink pad
# and update params for drawing rectangle, object information etc.
def pgie_src_pad_buffer_probe(pad,info,u_data):
    frame_number=0
    num_rects=0
    got_fps = False
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return
    # Retrieve batch metadata from the gst_buffer
    # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
    # C address of gst_buffer as input, which is obtained with hash(gst_buffer)
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list
    while l_frame is not None:
        try:
            # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
            # The casting is done by pyds.NvDsFrameMeta.cast()
            # The casting also keeps ownership of the underlying memory
            # in the C code, so the Python garbage collector will leave
            # it alone.
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        frame_number=frame_meta.frame_num
        l_obj=frame_meta.obj_meta_list
        num_rects = frame_meta.num_obj_meta
        obj_counter = {
        PGIE_CLASS_ID_VEHICLE:0,
        PGIE_CLASS_ID_PERSON:0,
        PGIE_CLASS_ID_BICYCLE:0,
        PGIE_CLASS_ID_ROADSIGN:0
        }
        while l_obj is not None:
            try: 
                # Casting l_obj.data to pyds.NvDsObjectMeta
                obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)
            except StopIteration:
                break
            obj_counter[obj_meta.class_id] += 1
            try: 
                l_obj=l_obj.next
            except StopIteration:
                break
        if not silent:
            print("Frame Number=", frame_number, "Number of Objects=",num_rects,"Vehicle_count=",obj_counter[PGIE_CLASS_ID_VEHICLE],"Person_count=",obj_counter[PGIE_CLASS_ID_PERSON])

        # Update frame rate through this probe
        stream_index = "stream{0}".format(frame_meta.pad_index)
        global perf_data
        perf_data.update_fps(stream_index)

        try:
            l_frame=l_frame.next
        except StopIteration:
            break

    return Gst.PadProbeReturn.OK



def cb_newpad(decodebin, decoder_src_pad,data):
    print("In cb_newpad\n")
    caps=decoder_src_pad.get_current_caps()
    if not caps:
        caps = decoder_src_pad.query_caps()
    gststruct=caps.get_structure(0)
    gstname=gststruct.get_name()
    source_bin=data
    features=caps.get_features(0)

    # Need to check if the pad created by the decodebin is for video and not
    # audio.
    print("gstname=",gstname)
    if(gstname.find("video")!=-1):
        # Link the decodebin pad only if decodebin has picked nvidia
        # decoder plugin nvdec_*. We do this by checking if the pad caps contain
        # NVMM memory features.
        print("features=",features)
        if features.contains("memory:NVMM"):
            # Get the source bin ghost pad
            bin_ghost_pad=source_bin.get_static_pad("src")
            if not bin_ghost_pad.set_target(decoder_src_pad):
                sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
        else:
            sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")

def decodebin_child_added(child_proxy,Object,name,user_data):
    print("Decodebin child added:", name, "\n")
    if(name.find("decodebin") != -1):
        Object.connect("child-added",decodebin_child_added,user_data)

    if "source" in name:
        source_element = child_proxy.get_by_name("source")
        if source_element.find_property('drop-on-latency') != None:
            Object.set_property("drop-on-latency", True)



def create_source_bin(index,uri):
    print("Creating source bin")

    # Create a source GstBin to abstract this bin's content from the rest of the
    # pipeline
    bin_name="source-bin-%02d" %index
    print(bin_name)
    nbin=Gst.Bin.new(bin_name)
    if not nbin:
        sys.stderr.write(" Unable to create source bin \n")

    # Source element for reading from the uri.
    # We will use decodebin and let it figure out the container format of the
    # stream and the codec and plug the appropriate demux and decode plugins.
    if not file_loop:
        # use nvurisrcbin to enable file-loop
        print("HEY DEB I CAME INSIDE FILE LOOP --------------------------------> I don't know what to do ")
        uri_decode_bin=Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin" + str(index))
        # uri_decode_bin.set_property("file-loop", 1)
        uri_decode_bin.set_property("cudadec-memtype", 0)
    else:
        uri_decode_bin=Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
    if not uri_decode_bin:
        sys.stderr.write(" Unable to create uri decode bin \n")
    uri_decode_bin.set_property("smart-record", 2)
    uri_decode_bin.set_property("smart-rec-dir-path", ".")
    uri_decode_bin.set_property("smart-rec-cache", 20) # enable smart record.
    uri_decode_bin.set_property("source-id",index)
    uri_decode_bin.connect("sr-done", record_done, nbin)

    # We set the input uri to the source element
    uri_decode_bin.set_property("uri",uri)
    global list_for_nvurisrcbin
    list_for_nvurisrcbin.append(uri_decode_bin)
    # Connect to the "pad-added" signal of the decodebin which generates a
    # callback once a new pad for raw data has beed created by the decodebin
    uri_decode_bin.connect("pad-added",cb_newpad,nbin)
    uri_decode_bin.connect("child-added",decodebin_child_added,nbin)

    # We need to create a ghost pad for the source bin which will act as a proxy
    # for the video decoder src pad. The ghost pad will not have a target right
    # now. Once the decode bin creates the video decoder and generates the
    # cb_newpad callback, we will set the ghost pad target to the video decoder
    # src pad.
    Gst.Bin.add(nbin,uri_decode_bin)
    bin_pad=nbin.add_pad(Gst.GhostPad.new_no_target("src",Gst.PadDirection.SRC))
    if not bin_pad:
        sys.stderr.write(" Failed to add ghost pad in source bin \n")
        return None
    return nbin

def start_sr_function(element):
    sessionid = pyds.get_native_ptr(pyds.alloc_buffer(4))
    print(f"sessionid {sessionid}")
    sr_user_context_size = ctypes.sizeof(SRUserContext)
    sr_user_context_buf = pyds.get_native_ptr(pyds.alloc_buffer(sr_user_context_size))
    sr_user_context = pyds.SRUserContext.cast(sr_user_context_buf)
    sr_user_context.sessionid = 42
    sr_user_context.name = "sr-demo " + element.get_name()
    print(f"sr_user_context_buf {sr_user_context_buf} {element.get_name()}")
    element.emit('start-sr', sessionid, 5, 5, sr_user_context_buf)
    pyds.free_gbuffer(sessionid)
    print('******start sr*****')
    return True

def record_done(nvurisrcbin, recordingInfo, user_ctx, user_data):
    print('******sr done*****')
    sr = pyds.SRUserContext.cast(hash(user_ctx))
    print(f"session id {sr.sessionid} -- name {sr.name}")
    pyds.free_buffer(hash(user_ctx))

def main(args, requested_pgie=None, config=None, disable_probe=False):
    global perf_data
    perf_data = PERF_DATA(len(args))

    number_sources=len(args)

    platform_info = PlatformInfo()
    # Standard GStreamer initialization
    Gst.init(None)

    # Create gstreamer elements */
    # Create Pipeline element that will form a connection of other elements
    print("Creating Pipeline \n ")
    pipeline = Gst.Pipeline()
    is_live = False

    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")
    print("Creating streamux \n ")

    # Create nvstreammux instance to form batches from one or more sources.
    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        sys.stderr.write(" Unable to create NvStreamMux \n")

    pipeline.add(streammux)
    for i in range(number_sources):
        print("Creating source_bin ",i," \n ")
        uri_name=args[i]
        if uri_name.find("rtsp://") == 0 :
            is_live = True
        source_bin=create_source_bin(i, uri_name)
        if not source_bin:
            sys.stderr.write("Unable to create source bin \n")
        pipeline.add(source_bin)
        padname="sink_%u" %i
        sinkpad= streammux.get_request_pad(padname) 
        if not sinkpad:
            sys.stderr.write("Unable to create sink pad bin \n")
        srcpad=source_bin.get_static_pad("src")
        if not srcpad:
            sys.stderr.write("Unable to create src pad bin \n")
        srcpad.link(sinkpad)
    queue1=Gst.ElementFactory.make("queue","queue1")
    queue2=Gst.ElementFactory.make("queue","queue2")
    queue3=Gst.ElementFactory.make("queue","queue3")
    queue4=Gst.ElementFactory.make("queue","queue4")
    queue5=Gst.ElementFactory.make("queue","queue5")
    pipeline.add(queue1)
    pipeline.add(queue2)
    pipeline.add(queue3)
    pipeline.add(queue4)
    pipeline.add(queue5)

    nvdslogger = None

    print("Creating Pgie \n ")
    if requested_pgie != None and (requested_pgie == 'nvinferserver' or requested_pgie == 'nvinferserver-grpc') :
        pgie = Gst.ElementFactory.make("nvinferserver", "primary-inference")
    elif requested_pgie != None and requested_pgie == 'nvinfer':
        pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    else:
        pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")

    if not pgie:
        sys.stderr.write(" Unable to create pgie :  %s\n" % requested_pgie)

    if disable_probe:
        # Use nvdslogger for perf measurement instead of probe function
        print ("Creating nvdslogger \n")
        nvdslogger = Gst.ElementFactory.make("nvdslogger", "nvdslogger")

    print("Creating tiler \n ")
    tiler=Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
    if not tiler:
        sys.stderr.write(" Unable to create tiler \n")
    print("Creating nvvidconv \n ")
    nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
    if not nvvidconv:
        sys.stderr.write(" Unable to create nvvidconv \n")
    print("Creating nvosd \n ")
    nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
    if not nvosd:
        sys.stderr.write(" Unable to create nvosd \n")
    nvosd.set_property('process-mode',OSD_PROCESS_MODE)
    nvosd.set_property('display-text',OSD_DISPLAY_TEXT)

    if file_loop:
        if platform_info.is_integrated_gpu():
            # Set nvbuf-memory-type=4 for aarch64 for file-loop (nvurisrcbin case)
            streammux.set_property('nvbuf-memory-type', 4)
        else:
            # Set nvbuf-memory-type=2 for x86 for file-loop (nvurisrcbin case)
            streammux.set_property('nvbuf-memory-type', 2)

    if no_display:
        print("Creating Fakesink \n")
        sink = Gst.ElementFactory.make("fakesink", "fakesink")
        sink.set_property('enable-last-sample', 0)
        sink.set_property('sync', 0)
    else:
        if platform_info.is_integrated_gpu():
            print("Creating nv3dsink \n")
            sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
            if not sink:
                sys.stderr.write(" Unable to create nv3dsink \n")
        else:
            print("Creating EGLSink \n")
            sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
            if not sink:
                sys.stderr.write(" Unable to create egl sink \n")

    if not sink:
        sys.stderr.write(" Unable to create sink element \n")

    if is_live:
        print("At least one of the sources is live")
        streammux.set_property('live-source', 1)

    streammux.set_property('width', 1920)
    streammux.set_property('height', 1080)
    streammux.set_property('batch-size', number_sources)
    streammux.set_property('batched-push-timeout', MUXER_BATCH_TIMEOUT_USEC)
    if requested_pgie == "nvinferserver" and config != None:
        pgie.set_property('config-file-path', config)
    elif requested_pgie == "nvinferserver-grpc" and config != None:
        pgie.set_property('config-file-path', config)
    elif requested_pgie == "nvinfer" and config != None:
        pgie.set_property('config-file-path', config)
    else:
        pgie.set_property('config-file-path', "dstest3_pgie_config.txt")
    pgie_batch_size=pgie.get_property("batch-size")
    if(pgie_batch_size != number_sources):
        print("WARNING: Overriding infer-config batch-size",pgie_batch_size," with number of sources ", number_sources," \n")
        pgie.set_property("batch-size",number_sources)
    tiler_rows=int(math.sqrt(number_sources))
    tiler_columns=int(math.ceil((1.0*number_sources)/tiler_rows))
    tiler.set_property("rows",tiler_rows)
    tiler.set_property("columns",tiler_columns)
    tiler.set_property("width", TILED_OUTPUT_WIDTH)
    tiler.set_property("height", TILED_OUTPUT_HEIGHT)
    sink.set_property("qos",0)

    print("Adding elements to Pipeline \n")
    pipeline.add(pgie)
    if nvdslogger:
        pipeline.add(nvdslogger)
    pipeline.add(tiler)
    pipeline.add(nvvidconv)
    pipeline.add(nvosd)
    pipeline.add(sink)

    print("Linking elements in the Pipeline \n")
    streammux.link(queue1)
    queue1.link(pgie)
    pgie.link(queue2)
    if nvdslogger:
        queue2.link(nvdslogger)
        nvdslogger.link(tiler)
    else:
        queue2.link(tiler)
    tiler.link(queue3)
    queue3.link(nvvidconv)
    nvvidconv.link(queue4)
    queue4.link(nvosd)
    nvosd.link(queue5)
    queue5.link(sink)   

    # create an event loop and feed gstreamer bus mesages to it
    loop = GLib.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect ("message", bus_call, loop)
    pgie_src_pad=pgie.get_static_pad("src")
    if not pgie_src_pad:
        sys.stderr.write(" Unable to get src pad \n")
    else:
        if not disable_probe:
            pgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, pgie_src_pad_buffer_probe, 0)
            # perf callback function to print fps every 5 sec
            GLib.timeout_add(5000, perf_data.perf_print_callback)

    # List the sources
    print("Now playing...")
    for i, source in enumerate(args):
        print(i, ": ", source)

    print("Starting pipeline \n")
    # start play back and listed to events		
    pipeline.set_state(Gst.State.PLAYING)
    global list_for_nvurisrcbin
    for uribin in list_for_nvurisrcbin:
        timer_id = GLib.timeout_add_seconds(10, start_sr_function, uribin)
    try:
        loop.run()
    except:
        pass
    # cleanup
    print("Exiting app\n")
    pipeline.set_state(Gst.State.NULL)

def parse_args():

    parser = argparse.ArgumentParser(prog="deepstream_test_3",
                    description="deepstream-test3 multi stream, multi model inference reference app")
    parser.add_argument(
        "-i",
        "--input",
        help="Path to input streams",
        nargs="+",
        metavar="URIs",
        default=["a"],
        required=True,
    )
    parser.add_argument(
        "-c",
        "--configfile",
        metavar="config_location.txt",
        default=None,
        help="Choose the config-file to be used with specified pgie",
    )
    parser.add_argument(
        "-g",
        "--pgie",
        default=None,
        help="Choose Primary GPU Inference Engine",
        choices=["nvinfer", "nvinferserver", "nvinferserver-grpc"],
    )
    parser.add_argument(
        "--no-display",
        action="store_true",
        default=True,
        dest='no_display',
        help="Disable display of video output",
    )
    parser.add_argument(
        "--file-loop",
        action="store_true",
        default=False,
        dest='file_loop',
        help="Loop the input file sources after EOS",
    )
    parser.add_argument(
        "--disable-probe",
        action="store_true",
        default=False,
        dest='disable_probe',
        help="Disable the probe function and use nvdslogger for FPS",
    )
    parser.add_argument(
        "-s",
        "--silent",
        action="store_true",
        default=False,
        dest='silent',
        help="Disable verbose output",
    )
    # Check input arguments
    if len(sys.argv) == 1:
        parser.print_help(sys.stderr)
        sys.exit(1)
    args = parser.parse_args()

    stream_paths = args.input
    pgie = args.pgie
    config = args.configfile
    disable_probe = args.disable_probe
    global no_display
    global silent
    global file_loop
    # no_display = args.no_display
    no_display = True
    # silent = args.silent
    silent = True
    # file_loop = args.file_loop
    file_loop = True

    if config and not pgie or pgie and not config:
        sys.stderr.write ("\nEither pgie or configfile is missing. Please specify both! Exiting...\n\n\n\n")
        parser.print_help()
        sys.exit(1)
    if config:
        config_path = Path(config)
        if not config_path.is_file():
            sys.stderr.write ("Specified config-file: %s doesn't exist. Exiting...\n\n" % config)
            sys.exit(1)

    print(vars(args))
    return stream_paths, pgie, config, disable_probe

if __name__ == '__main__':
    stream_paths, pgie, config, disable_probe = parse_args()
    sys.exit(main(stream_paths, pgie, config, disable_probe))

This is the sample app code with smart recording which we have discussed in this topic.

with sr-sone signal
With 20 streams, it is giving sr-done signal and recording as expected and no issues.

But the moment I test with 60 streams, It cant able to give sr-done signal for all the streams.

Without sr-done signal
With 60 streams, If I dont give sr-done signal it is not recording the video of expected length.
For example: `element.emit(‘start-sr’, sessionid, 5, 5, sr_user_context_buf)
In this case it is not recording for 10 seconds. It is recording 1 to 10 seconds with varying length.

Can you please tell what might be the the issues as the streams increases??
Pls respond ASAP, Thanks

1.With 60 streams, how is the system load? How is the cpu/gpu usage? Please use top/nvidia-smi to monitor the cpu/gpu usage

2.Can the network bandwidth support 60 streams playback?

3.The start-sr signal triggers NvDsSRStart, which then records the duration of the starttime parameter from the cache and records the duration of the duration parameter from the current time. However, if there is no data in the cache, it can only record a video of up to the length of the duration parameter.

https://docs.nvidia.com/metropolis/deepstream/5.0DP/dev-guide/DeepStream_Development_Guide/baggage/gst-nvdssr_8h.html#ace3f9e2eb2800da76f6ffa37e6ee25a6

Additionally, sr-done is triggered by nvurisrcbin, not manually triggered by you.

  1. There is no issue with CPU/GPU and network bandwidth.
  2. Please look into the below results sheet pasted below.

Question1:
If my event triggering interval is 5 seconds, Why it is recording for 10 seconds?
Question2:
If I test with 60 streams and trigger an event for every 5 or 20 seconds, why it is giving sr-done signal for 5% of streams only.
Question3:
If I test with 60 streams and trigger an event for every 5 or 20 seconds, why the recorded video duration varying in between 1 to 10 seconds?

This is normal, and my reply above and the NvDsSRStart API documentation have explained this issue.

In general

max duration == startTime parameter + duration parameter

Is there an emit stop-sr signal before the sr-done signal arrives?

In addition, if the stop-sr signal is not emitted before sr-done arrives, then start-sr is emitted, and no video file will be stored.

Simply put, when continuously emitting start-sr, either wait for sr-done or actively emit stop-sr to ensure that the video file can be stored normally

I am continuously emitting start-sr, and waiting for sr-done signal.

Even If I use start-sr, stop-sr and sr-sone also, I can not able to get the sr-done signal for all the 60 streams.

SCENARIO1:
With or without stop-sr before sr-done signal , If I record for 10 streams out of 60, I will get sr-done signal for all 10 streams.

SCENARIO2:
With or without stop-sr before sr-done signal, If I record for 60 streams out of 60, I will get sr-done signal for only 5% or 10% of streams only.

1.For a recording, if stop-sr is emitted, sr-done will not come.

2.For the same stream, if the last sr-done does not arrive and there is no stop-sr amitted, and start-sr is sent again, then an sr-done will be drop. Either wait for sr-done signal or actively amit stop-sr, otherwise the smart-recording operation is invalid.

3.When you record 60 streams, are all the files normal? If the recorded file cannot get the audio and video track, it will also be considered a recording failure and will not emit sr-done.
If there is no audio or video track, this is related to the network and source

4.nvurisrcbin is open source, located in /opt/nvidia/deepstream/deepstream/sources/gst-plugins/gst-nvurisrcbin

  1. The issue is that even if I don’t give stop-sr also, I am not getting sr-done signal.
  2. According to your second point, For every 20 seconds once, I am trying to record for 10 seconds only. That means I am waiting more time to get sr-done signal, but still not getting sr-done signal with 60 streams.
  3. According to your third point, With 60 streams, even if I get sr-done signal or not, Video length is very unpredictable (1 to 10 seconds).

Can you pls test with 60 streams if possible?
Thanks.

When the program is running, is there a log like No video stream found?

This means that the recording failed and no sr-done signal will be sent.

In fact, if start-sr is triggered every 10 seconds and then 5 seconds video are recorded, this problem is likely to occur, but this problem may be caused by the inability to cache playable data in the smart recording queue, which is caused by the rtsp stream. In addition, this scenario does not make sense.

In addition, in order to be more consistent with the real scene, I recorded 20s data every 100s. The following is my test environment.

I tested 20 streams and it was completely normal. Each stream could receive the sr-done signal normally.

Start Server

./mediamtx

Publish 20 rtsp streams

#!/usr/bin/env bash

count=20
cmd="ffmpeg -re -stream_loop -1 -i sample_720p.mp4 "

while [ $count -gt 0 ]; do
    count=$(($count - 1))
    cmd="${cmd} -s 680x480 -c:v libx264 -g 30 -keyint_min 30 -preset ultrafast -tune zerolatency -profile:v baseline  -an -f rtsp -rtsp_transport tcp rtsp://ip:8554/mystream${count}"
done

echo $cmd

exec $cmd

Smart recording with 20 rtsp streams

In order to reduce the impact of UDP packet loss, use RTP over TCP

uri_decode_bin.set_property(“select-rtp-protocol”, 4)

python3 test3-sr.py -i rtsp://ip:8554/mystream19 rtsp://ip:8554/mystream18 rtsp://ip:8554/mystream17 rtsp://ip:8554/mystream16 rtsp://ip:8554/mystream15 rtsp://ip:8554/mystream14 rtsp://ip:8554/mystream13 rtsp://ip:8554/mystream12 rtsp://ip:8554/mystream11 rtsp://ip:8554/mystream10 rtsp://ip:8554/mystream9 rtsp://ip:8554/mystream8 rtsp://ip:8554/mystream7 rtsp://ip:8554/mystream6 rtsp://ip:8554/mystream5 rtsp://ip:8554/mystream4 rtsp://ip:8554/mystream3 rtsp://ip:8554/mystream2 rtsp://ip:8554/mystream1 rtsp://ip:8554/mystream0 
#!/usr/bin/env python3

################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2019-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import sys
sys.path.append('../')
from pathlib import Path
import gi
import configparser
import argparse
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst, GstNet
from ctypes import *
import time
import sys
import math
import platform

from common.platform_info import PlatformInfo
from common.bus_call import bus_call
from common.FPS import PERF_DATA
import time, threading
import pyds
import ctypes

no_display = False
silent = False
file_loop = False
perf_data = None

MAX_DISPLAY_LEN=64
PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3
MUXER_OUTPUT_WIDTH=1920
MUXER_OUTPUT_HEIGHT=1080
MUXER_BATCH_TIMEOUT_USEC = 33000
TILED_OUTPUT_WIDTH=1280
TILED_OUTPUT_HEIGHT=720
GST_CAPS_FEATURES_NVMM="memory:NVMM"
OSD_PROCESS_MODE= 0
OSD_DISPLAY_TEXT= 1
pgie_classes_str= ["Vehicle", "TwoWheeler", "Person","RoadSign"]
list_for_nvurisrcbin = []



class SRUserContext(ctypes.Structure):
    _fields_ = [
        ("sessionid", ctypes.c_int),
        ("name", ctypes.c_char * 32)
    ]

# pgie_src_pad_buffer_probe  will extract metadata received on tiler sink pad
# and update params for drawing rectangle, object information etc.
def pgie_src_pad_buffer_probe(pad,info,u_data):
    frame_number=0
    num_rects=0
    got_fps = False
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return
    # Retrieve batch metadata from the gst_buffer
    # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
    # C address of gst_buffer as input, which is obtained with hash(gst_buffer)
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list
    while l_frame is not None:
        try:
            # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
            # The casting is done by pyds.NvDsFrameMeta.cast()
            # The casting also keeps ownership of the underlying memory
            # in the C code, so the Python garbage collector will leave
            # it alone.
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        frame_number=frame_meta.frame_num
        l_obj=frame_meta.obj_meta_list
        num_rects = frame_meta.num_obj_meta
        obj_counter = {
        PGIE_CLASS_ID_VEHICLE:0,
        PGIE_CLASS_ID_PERSON:0,
        PGIE_CLASS_ID_BICYCLE:0,
        PGIE_CLASS_ID_ROADSIGN:0
        }
        while l_obj is not None:
            try: 
                # Casting l_obj.data to pyds.NvDsObjectMeta
                obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)
            except StopIteration:
                break
            obj_counter[obj_meta.class_id] += 1
            try: 
                l_obj=l_obj.next
            except StopIteration:
                break
        if not silent:
            print("Frame Number=", frame_number, "Number of Objects=",num_rects,"Vehicle_count=",obj_counter[PGIE_CLASS_ID_VEHICLE],"Person_count=",obj_counter[PGIE_CLASS_ID_PERSON])

        # Update frame rate through this probe
        stream_index = "stream{0}".format(frame_meta.pad_index)
        global perf_data
        perf_data.update_fps(stream_index)

        try:
            l_frame=l_frame.next
        except StopIteration:
            break

    return Gst.PadProbeReturn.OK



def cb_newpad(decodebin, decoder_src_pad,data):
    print("In cb_newpad\n")
    caps=decoder_src_pad.get_current_caps()
    if not caps:
        caps = decoder_src_pad.query_caps()
    gststruct=caps.get_structure(0)
    gstname=gststruct.get_name()
    source_bin=data
    features=caps.get_features(0)

    # Need to check if the pad created by the decodebin is for video and not
    # audio.
    print("gstname=",gstname)
    if(gstname.find("video")!=-1):
        # Link the decodebin pad only if decodebin has picked nvidia
        # decoder plugin nvdec_*. We do this by checking if the pad caps contain
        # NVMM memory features.
        print("features=",features)
        if features.contains("memory:NVMM"):
            # Get the source bin ghost pad
            bin_ghost_pad=source_bin.get_static_pad("src")
            if not bin_ghost_pad.set_target(decoder_src_pad):
                sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
        else:
            sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")

def decodebin_child_added(child_proxy,Object,name,user_data):
    print("Decodebin child added:", name, "\n")
    if(name.find("decodebin") != -1):
        Object.connect("child-added",decodebin_child_added,user_data)

    if "source" in name:
        source_element = child_proxy.get_by_name("source")
        if source_element.find_property('drop-on-latency') != None:
            Object.set_property("drop-on-latency", True)



def create_source_bin(index,uri):
    print("Creating source bin")

    # Create a source GstBin to abstract this bin's content from the rest of the
    # pipeline
    bin_name="source-bin-%02d" %index
    print(bin_name)
    nbin=Gst.Bin.new(bin_name)
    if not nbin:
        sys.stderr.write(" Unable to create source bin \n")

    # Source element for reading from the uri.
    # We will use decodebin and let it figure out the container format of the
    # stream and the codec and plug the appropriate demux and decode plugins.
    if not file_loop:
        # use nvurisrcbin to enable file-loop
        print("HEY DEB I CAME INSIDE FILE LOOP --------------------------------> I don't know what to do ")
        uri_decode_bin=Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin" + str(index))
        # uri_decode_bin.set_property("file-loop", 1)
        uri_decode_bin.set_property("cudadec-memtype", 0)
    else:
        uri_decode_bin=Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
    if not uri_decode_bin:
        sys.stderr.write(" Unable to create uri decode bin \n")
    uri_decode_bin.set_property("select-rtp-protocol", 4)
    uri_decode_bin.set_property("smart-record", 2)
    uri_decode_bin.set_property("smart-rec-container", 0)
    uri_decode_bin.set_property("smart-rec-dir-path", ".")
    uri_decode_bin.set_property("smart-rec-cache", 20) # enable smart record.
    uri_decode_bin.set_property("source-id",index)
    uri_decode_bin.connect("sr-done", record_done, nbin)

    # We set the input uri to the source element
    uri_decode_bin.set_property("uri",uri)
    global list_for_nvurisrcbin
    list_for_nvurisrcbin.append(uri_decode_bin)
    # Connect to the "pad-added" signal of the decodebin which generates a
    # callback once a new pad for raw data has beed created by the decodebin
    uri_decode_bin.connect("pad-added",cb_newpad,nbin)
    uri_decode_bin.connect("child-added",decodebin_child_added,nbin)

    # We need to create a ghost pad for the source bin which will act as a proxy
    # for the video decoder src pad. The ghost pad will not have a target right
    # now. Once the decode bin creates the video decoder and generates the
    # cb_newpad callback, we will set the ghost pad target to the video decoder
    # src pad.
    Gst.Bin.add(nbin,uri_decode_bin)
    bin_pad=nbin.add_pad(Gst.GhostPad.new_no_target("src",Gst.PadDirection.SRC))
    if not bin_pad:
        sys.stderr.write(" Failed to add ghost pad in source bin \n")
        return None
    return nbin

def start_sr_function(element):
    sessionid = pyds.get_native_ptr(pyds.alloc_buffer(4))
    print(f"sessionid {sessionid}")
    sr_user_context_size = ctypes.sizeof(SRUserContext)
    sr_user_context_buf = pyds.get_native_ptr(pyds.alloc_buffer(sr_user_context_size))
    sr_user_context = pyds.SRUserContext.cast(sr_user_context_buf)
    sr_user_context.sessionid = 42
    sr_user_context.name = "sr-demo " + element.get_name()
    print(f"sr_user_context_buf {sr_user_context_buf} {element.get_name()}")
    element.emit('start-sr', sessionid, 5, 20, sr_user_context_buf)
    pyds.free_gbuffer(sessionid)
    print('******start sr*****')
    return True

def record_done(nvurisrcbin, recordingInfo, user_ctx, user_data):
    print('******sr done*****')
    sr = pyds.SRUserContext.cast(hash(user_ctx))
    print(f"session id {sr.sessionid} -- name {sr.name}")
    pyds.free_buffer(hash(user_ctx))

def main(args, requested_pgie=None, config=None, disable_probe=False):
    global perf_data
    perf_data = PERF_DATA(len(args))

    number_sources=len(args)

    platform_info = PlatformInfo()
    # Standard GStreamer initialization
    Gst.init(None)

    # Create gstreamer elements */
    # Create Pipeline element that will form a connection of other elements
    print("Creating Pipeline \n ")
    pipeline = Gst.Pipeline()
    is_live = False

    # Create the NTP clock
    clock = GstNet.NtpClock.new("ntp-clock", "time1.google.com", 123, 0)

    # Set the pipeline to use the NTP clock
    pipeline.use_clock(clock)

    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")
    print("Creating streamux \n ")

    # Create nvstreammux instance to form batches from one or more sources.
    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        sys.stderr.write(" Unable to create NvStreamMux \n")

    pipeline.add(streammux)
    for i in range(number_sources):
        print("Creating source_bin ",i," \n ")
        uri_name=args[i]
        if uri_name.find("rtsp://") == 0 :
            is_live = True
        source_bin=create_source_bin(i, uri_name)
        if not source_bin:
            sys.stderr.write("Unable to create source bin \n")
        pipeline.add(source_bin)
        padname="sink_%u" %i
        sinkpad= streammux.get_request_pad(padname) 
        if not sinkpad:
            sys.stderr.write("Unable to create sink pad bin \n")
        srcpad=source_bin.get_static_pad("src")
        if not srcpad:
            sys.stderr.write("Unable to create src pad bin \n")
        srcpad.link(sinkpad)
    queue1=Gst.ElementFactory.make("queue","queue1")
    queue2=Gst.ElementFactory.make("queue","queue2")
    queue3=Gst.ElementFactory.make("queue","queue3")
    queue4=Gst.ElementFactory.make("queue","queue4")
    queue5=Gst.ElementFactory.make("queue","queue5")
    pipeline.add(queue1)
    pipeline.add(queue2)
    pipeline.add(queue3)
    pipeline.add(queue4)
    pipeline.add(queue5)

    nvdslogger = None

    print("Creating Pgie \n ")
    if requested_pgie != None and (requested_pgie == 'nvinferserver' or requested_pgie == 'nvinferserver-grpc') :
        pgie = Gst.ElementFactory.make("nvinferserver", "primary-inference")
    elif requested_pgie != None and requested_pgie == 'nvinfer':
        pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    else:
        pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")

    if not pgie:
        sys.stderr.write(" Unable to create pgie :  %s\n" % requested_pgie)

    if disable_probe:
        # Use nvdslogger for perf measurement instead of probe function
        print ("Creating nvdslogger \n")
        nvdslogger = Gst.ElementFactory.make("nvdslogger", "nvdslogger")

    print("Creating tiler \n ")
    tiler=Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
    if not tiler:
        sys.stderr.write(" Unable to create tiler \n")
    print("Creating nvvidconv \n ")
    nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
    if not nvvidconv:
        sys.stderr.write(" Unable to create nvvidconv \n")
    print("Creating nvosd \n ")
    nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
    if not nvosd:
        sys.stderr.write(" Unable to create nvosd \n")
    nvosd.set_property('process-mode',OSD_PROCESS_MODE)
    nvosd.set_property('display-text',OSD_DISPLAY_TEXT)

    if file_loop:
        if platform_info.is_integrated_gpu():
            # Set nvbuf-memory-type=4 for aarch64 for file-loop (nvurisrcbin case)
            streammux.set_property('nvbuf-memory-type', 4)
        else:
            # Set nvbuf-memory-type=2 for x86 for file-loop (nvurisrcbin case)
            streammux.set_property('nvbuf-memory-type', 2)

    if no_display:
        print("Creating Fakesink \n")
        sink = Gst.ElementFactory.make("fakesink", "fakesink")
        sink.set_property('enable-last-sample', 0)
        sink.set_property('sync', 0)
    else:
        if platform_info.is_integrated_gpu():
            print("Creating nv3dsink \n")
            sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
            if not sink:
                sys.stderr.write(" Unable to create nv3dsink \n")
        else:
            print("Creating EGLSink \n")
            sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
            if not sink:
                sys.stderr.write(" Unable to create egl sink \n")

    if not sink:
        sys.stderr.write(" Unable to create sink element \n")

    if is_live:
        print("At least one of the sources is live")
        streammux.set_property('live-source', 1)

    streammux.set_property('width', 1920)
    streammux.set_property('height', 1080)
    streammux.set_property('batch-size', number_sources)
    streammux.set_property('batched-push-timeout', MUXER_BATCH_TIMEOUT_USEC)
    if requested_pgie == "nvinferserver" and config != None:
        pgie.set_property('config-file-path', config)
    elif requested_pgie == "nvinferserver-grpc" and config != None:
        pgie.set_property('config-file-path', config)
    elif requested_pgie == "nvinfer" and config != None:
        pgie.set_property('config-file-path', config)
    else:
        pgie.set_property('config-file-path', "dstest3_pgie_config.txt")
    pgie_batch_size=pgie.get_property("batch-size")
    if(pgie_batch_size != number_sources):
        print("WARNING: Overriding infer-config batch-size",pgie_batch_size," with number of sources ", number_sources," \n")
        pgie.set_property("batch-size",number_sources)
    tiler_rows=int(math.sqrt(number_sources))
    tiler_columns=int(math.ceil((1.0*number_sources)/tiler_rows))
    tiler.set_property("rows",tiler_rows)
    tiler.set_property("columns",tiler_columns)
    tiler.set_property("width", TILED_OUTPUT_WIDTH)
    tiler.set_property("height", TILED_OUTPUT_HEIGHT)
    sink.set_property("qos",0)

    print("Adding elements to Pipeline \n")
    pipeline.add(pgie)
    if nvdslogger:
        pipeline.add(nvdslogger)
    pipeline.add(tiler)
    pipeline.add(nvvidconv)
    pipeline.add(nvosd)
    pipeline.add(sink)

    print("Linking elements in the Pipeline \n")
    streammux.link(queue1)
    queue1.link(pgie)
    pgie.link(queue2)
    if nvdslogger:
        queue2.link(nvdslogger)
        nvdslogger.link(tiler)
    else:
        queue2.link(tiler)
    tiler.link(queue3)
    queue3.link(nvvidconv)
    nvvidconv.link(queue4)
    queue4.link(nvosd)
    nvosd.link(queue5)
    queue5.link(sink)   

    # create an event loop and feed gstreamer bus mesages to it
    loop = GLib.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect ("message", bus_call, loop)
    pgie_src_pad=pgie.get_static_pad("src")
    if not pgie_src_pad:
        sys.stderr.write(" Unable to get src pad \n")
    else:
        if not disable_probe:
            pgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, pgie_src_pad_buffer_probe, 0)
            # perf callback function to print fps every 5 sec
            GLib.timeout_add(5000, perf_data.perf_print_callback)

    # List the sources
    print("Now playing...")
    for i, source in enumerate(args):
        print(i, ": ", source)

    print("Starting pipeline \n")
    # start play back and listed to events
    pipeline.set_state(Gst.State.PLAYING)
    global list_for_nvurisrcbin
    for uribin in list_for_nvurisrcbin:
        timer_id = GLib.timeout_add_seconds(100, start_sr_function, uribin)
    try:
        loop.run()
    except:
        pass
    # cleanup
    print("Exiting app\n")
    pipeline.set_state(Gst.State.NULL)

def parse_args():

    parser = argparse.ArgumentParser(prog="deepstream_test_3",
                    description="deepstream-test3 multi stream, multi model inference reference app")
    parser.add_argument(
        "-i",
        "--input",
        help="Path to input streams",
        nargs="+",
        metavar="URIs",
        default=["a"],
        required=True,
    )
    parser.add_argument(
        "-c",
        "--configfile",
        metavar="config_location.txt",
        default=None,
        help="Choose the config-file to be used with specified pgie",
    )
    parser.add_argument(
        "-g",
        "--pgie",
        default=None,
        help="Choose Primary GPU Inference Engine",
        choices=["nvinfer", "nvinferserver", "nvinferserver-grpc"],
    )
    parser.add_argument(
        "--no-display",
        action="store_true",
        default=True,
        dest='no_display',
        help="Disable display of video output",
    )
    parser.add_argument(
        "--file-loop",
        action="store_true",
        default=False,
        dest='file_loop',
        help="Loop the input file sources after EOS",
    )
    parser.add_argument(
        "--disable-probe",
        action="store_true",
        default=False,
        dest='disable_probe',
        help="Disable the probe function and use nvdslogger for FPS",
    )
    parser.add_argument(
        "-s",
        "--silent",
        action="store_true",
        default=False,
        dest='silent',
        help="Disable verbose output",
    )
    # Check input arguments
    if len(sys.argv) == 1:
        parser.print_help(sys.stderr)
        sys.exit(1)
    args = parser.parse_args()

    stream_paths = args.input
    pgie = args.pgie
    config = args.configfile
    disable_probe = args.disable_probe
    global no_display
    global silent
    global file_loop
    # no_display = args.no_display
    no_display = True
    # silent = args.silent
    silent = True
    # file_loop = args.file_loop
    file_loop = True

    if config and not pgie or pgie and not config:
        sys.stderr.write ("\nEither pgie or configfile is missing. Please specify both! Exiting...\n\n\n\n")
        parser.print_help()
        sys.exit(1)
    if config:
        config_path = Path(config)
        if not config_path.is_file():
            sys.stderr.write ("Specified config-file: %s doesn't exist. Exiting...\n\n" % config)
            sys.exit(1)

    print(vars(args))
    return stream_paths, pgie, config, disable_probe

if __name__ == '__main__':
    stream_paths, pgie, config, disable_probe = parse_args()
    sys.exit(main(stream_paths, pgie, config, disable_probe))

As you said, If I run 20 streams I am getting No video stream found for some streams. Is it something issue with smart recording queue?

No, this is not a smart recording queue issue.

Generally speaking,during recording, the rtsp camera does not send key decoding information such as sps/pps. In fact, my test steps above can work properly even if start-sr is triggered once every 10 seconds. Please check your rtsp camera configuration.

  1. However you suggested to video to rtsp generation, I have done and tested with 20 rtsp streams. But, still getting No video stream found for some streams. BTW, what do you mean by rtsp configurations?

  2. Can you please check the duration of the videos which you have recorded? I want to know duration of video even though you are getting sr-done signal

Please check your network status to see if there is packet loss. As I mentioned above, use rtp over tcp to avoid packet loss. I mean the I frame interval in the encoder configuration of the rtsp camera

I have checked the recording file. The above code triggers a recording every 100 seconds and records for 20 seconds. There is no problem with the duration of the recording file.about 20 seconds.

1 Like

This is the structure of nvRecordingInfo in .cpp. How do I get info out of it in python.

def record_done(nvurisrcbin, recordingInfo, user_ctx,user_data, var):

void register_types(py::module &m) {
        py::enum_<NvDsSRContainerType>(m, "NvDsSRContainerType")
            .value("NVDSSR_CONTAINER_MP4", NVDSSR_CONTAINER_MP4)
            .value("NVDSSR_CONTAINER_MKV", NVDSSR_CONTAINER_MKV)
            .export_values();
        py::class_<NvDsSRRecordingInfo>(m, "NvDsSRRecordingInfo")
                .def(py::init<>())
                .def_property("filename", STRING_CHAR_ARRAY(NvDsSRRecordingInfo, filename))
                .def_property("dirpath", STRING_CHAR_ARRAY(NvDsSRRecordingInfo, dirpath))
                .def_readwrite("duration", &NvDsSRRecordingInfo::duration)
                .def_readwrite("containerType", &NvDsSRRecordingInfo::containerType)
                .def_readwrite("width", &NvDsSRRecordingInfo::width)
                .def_readwrite("height", &NvDsSRRecordingInfo::height)
                .def_readwrite("containsVideo", &NvDsSRRecordingInfo::containsVideo)
                .def_readwrite("channels", &NvDsSRRecordingInfo::channels)
                .def_readwrite("samplingRate", &NvDsSRRecordingInfo::samplingRate)
                .def_readwrite("containsAudio", &NvDsSRRecordingInfo::containsAudio)
                .def("cast", [](size_t data) {
                        return (NvDsSRRecordingInfo *) data;
                     },
                     py::return_value_policy::reference)
                .def("cast", [](void* data) {
                        return (NvDsSRRecordingInfo *) data;
                     },
                     py::return_value_policy::reference);
        struct SRUserContext {
            int sessionid;
            char name[32];
        };
def record_done(nvurisrcbin, recordingInfo, user_ctx, user_data):
    print('******sr done*****')
    print(f"record_done nvurisrcbin {nvurisrcbin} recordingInfo {recordingInfo}"
          f" user_ctx {user_ctx} user_data {user_data}")
    info = pyds.NvDsSRRecordingInfo.cast(hash(recordingInfo))
    sr = pyds.SRUserContext.cast(hash(user_ctx))
    print(f"filename {info.filename} -- dir {info.dirpath} {info.width} x {info.height}")
    print(f"session id {sr.sessionid} -- name {sr.name}")
1 Like

Can you please answer below question ASAP.

Is it possible to get the buffer queue from the nvurisrcbin and if yes, how can I do it???

Because, If any event happens, sometimes instead of video, We want to send an image which is 3 seconds ago

If you mean the smart-recording cache queue, please refer to the code below and find cache_que0. In fact, you can find any element using the following method.

I think this method may not be used to save pictures, because not all video caches can be decoded independently, they must be i-frames.

diff --git a/apps/deepstream-test3/test3-sr.py b/apps/deepstream-test3/test3-sr.py
index cc42c5b..a61d32c 100755
--- a/apps/deepstream-test3/test3-sr.py
+++ b/apps/deepstream-test3/test3-sr.py
@@ -166,7 +166,10 @@ def decodebin_child_added(child_proxy,Object,name,user_data):
         if source_element.find_property('drop-on-latency') != None:
             Object.set_property("drop-on-latency", True)
 
-
+def on_deep_element_added(bin, sub_bin, element, user_data):
+    factory = element.get_factory()
+    if factory.get_name() == "queue":
+        print(f"queue added ===>: {element.get_name()} to {sub_bin.get_name()}")
 
 def create_source_bin(index,uri):
     print("Creating source bin")
@@ -197,6 +200,7 @@ def create_source_bin(index,uri):
     uri_decode_bin.set_property("smart-rec-dir-path", ".")
     uri_decode_bin.set_property("smart-rec-cache", 20) # enable smart record.
     uri_decode_bin.set_property("source-id",index)
+    uri_decode_bin.connect("deep-element-added", on_deep_element_added, uri_decode_bin)
     uri_decode_bin.connect("sr-done", record_done, nbin)
 
     # We set the input uri to the source element

cache_que0 with this, how can I get the cache video?

This cache_que0Iam getting and below is the info I got about it and this info is changing when I trigger the events.
The parent of ‘cache_que0’ is record_bin

name: cache_que0
parent: record_bin0
current-level-buffers: 129
current-level-bytes: 1099752
current-level-time: 5154671214
max-size-buffers: 0
max-size-bytes: 0
max-size-time: 25000000000
min-threshold-buffers: 0
min-threshold-bytes: 0
min-threshold-time: 20000000000
leaky: <enum Leaky on downstream (old buffers) of type main.GstQueueLeaky>
silent: True
flush-on-eos: False

How do I get the queue which is present after the Tee so that, we can get the frames irrespective of whether we are triggering an event or not?