Low FPS, randomness RTSP Stream

I’m developing deepstream pipeline for simple face recognition app using multiprocessing + pycuda + TensorRT follow deepstream-test-3 sample. Bellow is the modified code:

#!/usr/bin/env python3
import os
import cv2
import sys
import gi
from copy import deepcopy
import configparser
gi.require_version('Gst', '1.0')
import multiprocessing as mp

try:
   mp.set_start_method('spawn', force=True)
except RuntimeError:
   pass

from gi.repository import GObject, Gst
from gi.repository import GLib
import random
from ctypes import *
import time
import sys
import math
import platform
import json
import datetime
import numpy as np
from common.is_aarch_64 import is_aarch64
from common.FPS import GETFPS
import pyds
from parsers.parser_scrfd import nvds_infer_parse_custom_scrfd
from face_ds import face_recog_mp

from multiprocessing import Queue, Process

MUXER_BATCH_TIMEOUT_USEC=-1
MAX_NUM_SOURCES = 4

def init_pipeline():
    global g_num_sources, g_source_id_list, g_eos_list, g_source_enabled, g_source_bin_list, g_source_base_meta_list, g_source_camera_id_list, fps_streams
    g_num_sources = 0
    g_source_id_list = [0] * MAX_NUM_SOURCES
    g_eos_list = [False] * MAX_NUM_SOURCES
    g_source_enabled = [False] * MAX_NUM_SOURCES
    g_source_bin_list = [None] * MAX_NUM_SOURCES
    g_source_base_meta_list = dict()
    g_source_camera_id_list = [None] * MAX_NUM_SOURCES
    fps_streams = dict()


def pgie_src_pad_buffer_probe(pad,info,u_data):
    global g_source_base_meta_list, main_queue

    frame_number=0
    num_rects=0
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return

    # Retrieve batch metadata from the gst_buffer
    # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
    # C address of gst_buffer as input, which is obtained with hash(gst_buffer)
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list

    lst_meta = []
    lst_frame = {}
    while l_frame is not None:
        try:
            # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
            # The casting is done by pyds.NvDsFrameMeta.cast()
            # The casting also keeps ownership of the underlying memory
            # in the C code, so the Python garbage collector will leave
            # it alone.
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        frame_number = frame_meta.frame_num
        source_id = frame_meta.source_id
        fps_streams[g_source_camera_id_list[source_id]].get_fps()

        n_frame = pyds.get_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
        frame_copy = np.array(n_frame, copy=True, order='C')
        frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_RGBA2RGB)
        l_user = frame_meta.frame_user_meta_list

        while l_user is not None:
            try:
                user_meta = pyds.NvDsUserMeta.cast(l_user.data)
            except StopIteration:
                break

            if (
                    user_meta.base_meta.meta_type
                    != pyds.NvDsMetaType.NVDSINFER_TENSOR_OUTPUT_META
            ):
                continue

            tensor_meta = pyds.NvDsInferTensorMeta.cast(user_meta.user_meta_data)
            layers_info = []

            for i in range(tensor_meta.num_output_layers):
                layer = pyds.get_nvds_LayerInfo(tensor_meta, i)
                layers_info.append(layer)

            #################
            main_queue.put(deepcopy(layers_info))
            #################
            
            try:
                l_user = l_user.next
            except StopIteration:
                break

        try:
            # indicate inference is performed on the frame
            frame_meta.bInferDone = True
            l_frame = l_frame.next
        except StopIteration:
            break

        
    return Gst.PadProbeReturn.OK



def cb_newpad(decodebin,pad,data):
    global streammux
    caps=pad.get_current_caps()
    gststruct=caps.get_structure(0)
    gstname=gststruct.get_name()

    # Need to check if the pad created by the decodebin is for video and not
    # audio.
    if(gstname.find("video")!=-1):
        source_id = data
        pad_name = "sink_%u" % source_id
        # Get a sink pad from the streammux, link to decodebin
        sinkpad = streammux.get_request_pad(pad_name)
        if pad.link(sinkpad) == Gst.PadLinkReturn.OK:
            print(" --> Decodebin linked to pipeline")
        else:
            sys.stderr.write("Failed to link decodebin to pipeline\n")

def decodebin_child_added(child_proxy,Object,name,user_data):
    print("Decodebin child added:", name, "\n")
    if(name.find("decodebin") != -1):
        Object.connect("child-added",decodebin_child_added,user_data)   
    if(name.find("nvv4l2decoder") != -1):
        if (is_aarch64()):
            Object.set_property("enable-max-performance", True)
            Object.set_property("drop-frame-interval", 0)
            Object.set_property("num-extra-surfaces", 0)
        else:
            Object.set_property("gpu_id", 0)

def create_uridecode_bin(index,filename):
    global g_source_id_list, g_source_enabled
    print(" --- Creating uridecodebin for [%s]" % filename)

    # Create a source GstBin to abstract this bin's content from the rest of the
    # pipeline
    g_source_id_list[index] = index
    bin_name="source-bin-%02d" % index
    print(" --- " + bin_name)

    # Source element for reading from the uri.
    # We will use decodebin and let it figure out the container format of the
    # stream and the codec and plug the appropriate demux and decode plugins.
    bin=Gst.ElementFactory.make("uridecodebin", bin_name)
    if not bin:
        sys.stderr.write(" Unable to create uri decode bin \n")
    # We set the input uri to the source element
    bin.set_property("uri",filename)
    # Connect to the "pad-added" signal of the decodebin which generates a
    # callback once a new pad for raw data has been created by the decodebin
    bin.connect("pad-added",cb_newpad,g_source_id_list[index])
    bin.connect("child-added",decodebin_child_added,g_source_id_list[index])

    #Set status of the source to enabled
    g_source_enabled[index] = True

    return bin

def bus_call(bus, message, loop):
    global g_eos_list
    t = message.type
    if t == Gst.MessageType.EOS:
        print("End-of-stream\n")
        loop.quit()
    elif t==Gst.MessageType.WARNING:
        err, debug = message.parse_warning()
        print("Warning: %s: %s\n" % (err, debug))
    elif t == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        print("Error: %s: %s\n" % (err, debug))
        loop.quit()
    return True


def run(lst_video_info, infer_size = (1920, 1080)):
    global streammux, loop, pipeline, face_processor, main_queue
    # Check input arguments
    g_num_sources=len(lst_video_info)
    assert g_num_sources <= MAX_NUM_SOURCES, "Current support only {} parallel streams".format(MAX_NUM_SOURCES)

    # Standard GStreamer initialization
    GObject.threads_init()
    Gst.init(None)

    # Create gstreamer elements */
    # Create Pipeline element that will form a connection of other elements
    print('='*30 + " Creating Pipeline " + '='*30)

    pipeline = Gst.Pipeline()
    is_live = False

    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline")

    print("[INFO] Creating streamux")
    # Create nvstreammux instance to form batches from one or more sources.
    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        sys.stderr.write(" Unable to create NvStreamMux \n")
    if infer_size[0] is not None and infer_size[1] is not None:
        print(' --> Optimized streammux size: {}'.format(infer_size))
        MUXED_OUTPUT_WIDTH = infer_size[0]
        MUXED_OUTPUT_HEIGHT = infer_size[1]
        streammux.set_property('width', infer_size[0])
        streammux.set_property('height', infer_size[1])
        time.sleep(1)
    else:
        MUXED_OUTPUT_WIDTH = 1280
        MUXED_OUTPUT_HEIGHT = 720
        streammux.set_property('width', MUXED_OUTPUT_WIDTH)
        streammux.set_property('height', MUXED_OUTPUT_HEIGHT)

    streammux.set_property('batch-size', g_num_sources)
    streammux.set_property('batched-push-timeout', MUXER_BATCH_TIMEOUT_USEC)
    

    print("[INFO] Creating source_bin")
    for i in range(g_num_sources):
        print(" --> Creating source_bin ",i)
        uri_name=lst_video_info[i]["urlLocal"]
        if uri_name.find("rtsp://") == 0:
            is_live = True
        #Create first source bin and add to pipeline
        source_bin=create_uridecode_bin(i, uri_name)
        if not source_bin:
            sys.stderr.write("Failed to create source bin. Exiting. \n")
            sys.exit(1)
        g_source_bin_list[i] = source_bin
        camera_id = lst_video_info[i]["cameraID"]
        g_source_camera_id_list[i] = camera_id
        g_source_base_meta_list[camera_id] = lst_video_info[i]
        fps_streams[camera_id] = GETFPS(camera_id)
        pipeline.add(source_bin)


    print("[INFO] Creating Pgie")
    pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    if not pgie:
        sys.stderr.write(" Unable to create pgie")
    pgie.set_property('config-file-path', "configs/config_scrfd.txt")
    pgie_batch_size=pgie.get_property("batch-size")
    if(pgie_batch_size != g_num_sources):
        print("WARNING: Overriding infer-config batch-size", pgie_batch_size," with number of sources ", g_num_sources," \n")
        pgie.set_property("batch-size", g_num_sources)


    print("[INFO] Creating nvvidconv")
    nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
    if not nvvidconv:
        sys.stderr.write(" Unable to create nvvidconv")

    print("[INFO] Creating filter")
    caps1 = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
    filter1 = Gst.ElementFactory.make("capsfilter", "filter1")
    if not filter1:
        sys.stderr.write(" Unable to get the caps filter1 \n")
    filter1.set_property("caps", caps1)

    print("[INFO] Creating FakeLSink")
    sink = Gst.ElementFactory.make("fakesink", "fakesink")
    if not sink:
        sys.stderr.write(" Unable to create fakesink \n")
    
    queue1 = Gst.ElementFactory.make("queue","queue1")
    queue2 = Gst.ElementFactory.make("queue","queue2")
    queue3 = Gst.ElementFactory.make("queue","queue3")
    queue4 = Gst.ElementFactory.make("queue","queue4")

    if is_live:
        print("[WARNING] Atleast one of the sources is live")
        streammux.set_property('live-source', 1)

    sink.set_property("sync", 0)
    sink.set_property("qos", 0)

    if not is_aarch64():
        # Use CUDA unified memory in the pipeline so frames
        # can be easily accessed on CPU in Python.
        mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)

        streammux.set_property("nvbuf-memory-type", mem_type)
        nvvidconv.set_property("nvbuf-memory-type", mem_type)


    print("Adding elements to Pipeline \n")
    pipeline.add(streammux)
    pipeline.add(pgie)
    pipeline.add(nvvidconv)
    pipeline.add(filter1)
    pipeline.add(sink)
    pipeline.add(queue1)
    pipeline.add(queue2)
    pipeline.add(queue3)
    pipeline.add(queue4)

    print("Linking elements in the Pipeline \n")
    streammux.link(queue1)
    queue1.link(nvvidconv)
    nvvidconv.link(queue2)
    queue2.link(filter1)
    filter1.link(queue3)
    queue3.link(pgie)
    pgie.link(queue4)
    queue4.link(sink)


    # create an event loop and feed gstreamer bus mesages to it
    loop = GObject.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect ("message", bus_call, loop)
    pgie_src_pad = pgie.get_static_pad("src")
    if not pgie_src_pad:
        sys.stderr.write(" Unable to get pgie src pad \n")
    else:
        pgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, pgie_src_pad_buffer_probe, 0)

    # Init face recognition model in another process
    main_queue = Queue()
    face_processor = Process(target = face_recog_mp, args = (main_queue, lst_video_info))
    face_processor.start()
    

    print("Starting pipeline \n")
    # start play back and listed to events      
    pipeline.set_state(Gst.State.PLAYING)

    loop.run()

    print("Exiting app\n")
    pipeline.set_state(Gst.State.NULL)

    main_queue.put(None) # Stop face recognition 

 

if __name__ == '__main__':
    meta_video_1 = {
                    'urlLocal': '<path-to-stream-1>',
                    'cameraID': '1'
    }
    meta_video_2 = {
                    'urlLocal': '<path-to-stream-2>',
                    'cameraID': '2'
    }
    init_pipeline()
    run([meta_video_1, meta_video_2], (1920, 1080))

For 2 x MP4 input it work well with 45+ FPS each, but for 2 x RTSP input it only show 13-14 FPS each and sometime has delay, sometime I got frames from previous time in probe function (e.x got some frames from 11:45 AM in 11:47 AM)

Thanks

• Hardware Platform (Jetson / GPU): A100
• DeepStream Version: 6.1-Triton
• NVIDIA GPU Driver Version (valid for GPU only): 470.57.02
• Issue Type( questions, new requirements, bugs): bugs

Is the RTSP stream from LAN or from WAN? What’s the frame rate if you play it on this station?

It’s stream from LAN, I can read from opencv with 25+ FPS

Can you remove these two lines and test again?

Yes, I tried. And even I disable TensorRT loaded in child process, it still got around 13 FPS

What is the FPS if you disable the whole probe function?

Currently I’m using

fps_streams[g_source_camera_id_list[source_id]].get_fps()

to get FPS when call probe function. Is that another way to calculate FPS of stream if I disable probe function ?

You have enabled tensor meta output, right? You can remove the tensor meta handling part and disable tensor meta output in the config file to check the FPS

I’ve disabled output-tensor-meta and change probe function to

def pgie_src_pad_buffer_probe(pad,info,u_data):
    fps_streams['1'].get_fps()
    fps_streams['2'].get_fps()
    return Gst.PadProbeReturn.OK

It’s still around 13 FPS. But it’s not the main problem. PGIE is the face detection model and I init face feature extraction model in another process (tried both difference and same GPU), every call in pgie_src_pad_buffer_probe I put (frame_image, output-tensor-meta) to queue and the face feature extraction process it (checked that almost every messages put to queue the pipeline can process intermediately). I’m just wonder about why it sometimes put message to queue not in chronological order (put & got (frame_image, output-tensor-meta) from previous time interleaved with current time).

Can you monitor the GPU usage during the pipeline is running with the command “nvidia-smi dmon”?

What does this mean?

Looks like the problem is the same as

The bug you post has been fixed long time ago.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.