Benchmark real FPS with appsrc

• Hardware Platform (Jetson / GPU)
Jetson Nx
• DeepStream Version
5.0.0-1
• JetPack Version (valid for Jetson only)
4.4-b144

Hello. I try to feed data through appsrc in deepstream-test2 example. I made a following pipeline:
appsrc -> nvvideoconvert -> capsfilter -> streammux -> nvinfer (decoder) -> tracker -> nvinfer (car color) -> nvinfer (car manufacturer) -> nvinfer (vehicle type) -> fakesink

Here is how I create a source and capsfilters:

source = Gst.ElementFactory.make("appsrc", "source")
source.set_property('caps', 
            Gst.caps_from_string(','.join([
            'video/x-raw',
            'format=RGBA',
            'framerate=1000/1',
            F'width={IM_W}',
            F'height={IM_H}',
            ])))
if not source:
    sys.stderr.write(" Unable to create Source \n")

caps_str = Gst.Caps.from_string("video/x-raw(memory:NVMM)")
capsfilter = Gst.ElementFactory.make("capsfilter", "filter")
capsfilter.set_property("caps", caps_str)

Here is my simple test:

pipeline.set_state(Gst.State.PLAYING)
try:
    #loop.run()
    th = threading.Thread(target = loop.run, args=())
    th.start()
except:
    pass

# prefetching set of images, so we will not decode them in the test-loop
cap = cv2.VideoCapture(sys.argv[1])
imgs = []
for i in range(100):
    ret, im_orig = cap.read()
    imgs.append(im_orig)

start_time = time.time()
count = 0
while True:
    im_orig = imgs[count%len(imgs)]
    im = cv2.cvtColor(im_orig, cv2.COLOR_BGR2RGBA)
    frame = im.tobytes()
    buf = Gst.Buffer.new_wrapped_full(Gst.MemoryFlags.READONLY, frame, len(frame), 0, None)
    res = source.emit('push-buffer', buf)
    #print(res)
    curr_time = time.time()
    count += 1
    if curr_time - start_time > TEST_TIME:
        break

print(F"N pushed frames: {count}. Test time is {TEST_TIME} seconds")

It shows me that it pushed ~4000 frames per 10 seconds. So, it’s ~400 FPS. But I inserted a counter in sink_pad_buffer_probe and I see that only ~900 frames are really processed.

Am I doing something wrong or deepstream/gstreamer can drop some buffers?
Is there a way to ensure that a frame was really processed?

  1. With GST_DEBUG=2 environment variable you should be able to see whats going on (GST_DEBUG numbers and meaning). You’ll get in console if/when frames are being dropped.
  2. The buffer probe you inserted was attached to which element? Maybe check for elements which batch several frames. For example, check batch-size, batched-push-timeout, etc, in nvstreammux.
  3. For exmaple, see sample python, where several frames are processed from a single probe call (I assume its because frames were batched by a previous element upstream).
  4. To process frames “as fast as possible, but without frame dropping”, I’ve had good results adding queue elements and setting the sink element’s property sync=false.

Thanks for answer,

  1. It show no warning messages

  2. I attach the buffer probe function to the final fakesink
    streammux properties are

    streammux.set_property('width', 1920)
    streammux.set_property('height', 1080)
    streammux.set_property('batch-size', 1)
    streammux.set_property('batched-push-timeout', 4000000)

I tried to add probes on different pipeline elements (streammux, capsfilter, nvvideoconvert, appsrc) and every time the counter was approximately the same.

I made a simple test:

import sys
sys.path.append('../')
import platform
import configparser

import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
from common.is_aarch_64 import is_aarch64
from common.bus_call import bus_call

import pyds


import time
import threading
import cv2
import numpy as np

IM_H = 1080
IM_W = 1920
TEST_TIME=10 #seconds

thread_unsafe_counter = 0
def sink_sink_pad_buffer_probe(pad,info,u_data):
    global thread_unsafe_counter
    thread_unsafe_counter += 1
    return Gst.PadProbeReturn.OK


def main(args):
    # Check input arguments
    if len(args) != 2:
        sys.stderr.write("usage: %s <media file or uri>\n" % args[0])
        sys.exit(1)

    # Standard GStreamer initialization
    GObject.threads_init()
    Gst.init(None)

    # Create gstreamer elements
    # Create Pipeline element that will form a connection of other elements
    print("Creating Pipeline \n ")
    pipeline = Gst.Pipeline()

    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")

    # Source element for reading from the file
    print("Creating Source \n ")

    source = Gst.ElementFactory.make("appsrc", "file-source")
    source.set_property('caps', 
                Gst.caps_from_string(','.join([
                'video/x-raw',
                'format=RGBA',
                'framerate=3000/1',
                F'width={IM_W}',
                F'height={IM_H}',
                ])))
    if not source:
        sys.stderr.write(" Unable to create Source \n")

    caps_str = Gst.Caps.from_string("video/x-raw(memory:NVMM)")
    capsfilter = Gst.ElementFactory.make("capsfilter", "filter")
    capsfilter.set_property("caps", caps_str)

    print("Creating Decoder \n")
    decoder = Gst.ElementFactory.make("nvvideoconvert", "nvv4l2-decoder")
    if not decoder:
        sys.stderr.write(" Unable to create Nvv4l2 Decoder \n")

    sink = Gst.ElementFactory.make("fakesink", "nvvideo-renderer")
    if not sink:
        sys.stderr.write(" Unable to create egl sink \n")

    pipeline.add(source)
    pipeline.add(decoder)
    pipeline.add(capsfilter)
    pipeline.add(sink)
    sink_src = sink.get_static_pad("sink")
    sink_src.add_probe(Gst.PadProbeType.BUFFER, sink_sink_pad_buffer_probe, 0)
    
    source.link(decoder)
    decoder.link(capsfilter)
    capsfilter.link(sink)
    #source.link(sink)

    loop = GObject.MainLoop()

    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect ("message", bus_call, loop)
    
    
    pipeline.set_state(Gst.State.PLAYING)
    try:
        th = threading.Thread(target = loop.run, args=())
        th.start()
    except:
        pass

    # prefetching set of images, so we will not decode them in test-loop
    cap = cv2.VideoCapture(sys.argv[1])
    imgs = []
    for i in range(100):
        ret, im_orig = cap.read()
        imgs.append(im_orig)

    #feeding the data to the pipeline
    start_time = time.time()
    count = 0
    while True:
        im_orig = imgs[count%len(imgs)]
        im = cv2.cvtColor(im_orig, cv2.COLOR_BGR2RGBA)
        frame = im.tobytes()
        buf = Gst.Buffer.new_wrapped_full(Gst.MemoryFlags.READONLY, frame, len(frame), 0, None)
        res = source.emit('push-buffer', buf)
        #print(res)
        curr_time = time.time()
        count += 1
        if curr_time - start_time > TEST_TIME:
            break

    print(F"N pushed frames: {count}. Test time is {TEST_TIME} seconds")
    print("counter:", thread_unsafe_counter)
    # cleanup
    pipeline.set_state(Gst.State.NULL)


if __name__ == '__main__':
    sys.exit(main(sys.argv))

And counters are really close in this example (for example, 3701 and 3640). But they are still not the same. Maybe I need to use locks but I not sure which one to use (threading or async or multiprocessing?) because I don’t understand how probes are processed in python-gstreamer.

So, is there a way to ensure that the frame was really processed and wasn’t dropped?

  1. There is batch_size=1 in the detection network and batch_size=16 in classifiers. I inserted a counter inside the while loop, so it should take into account frames, not batches. Or it’s not your point?

  2. Still haven’t tried synk=false. Thank you for this, I will check it a little bit later

inside your sink_sink_pad_buffer_probe:

gst_buffer = info.get_buffer()
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
frame_iter = batch_meta.frame_meta_list
while frame_iter is not None:
    try:
        # FIXME update your counter here
        frame_metadata = pyds.NvDsFrameMeta.cast(frame_iter.data)
        frame_num = frame_metadata.frame_num
        print("do sth with gst_buffer, frame_iter")
    except StopIteration:
        break
    try:  # Check if this is the last frame
        frame_iter = frame_iter.next
    except StopIteration:
        break

See reference

Are you sure that batches are created when there is no nvstreammux in the pipeline? I tried this before and got this:

  File "little_test.py", line 29, in sink_sink_pad_buffer_probe
    frame_iter = batch_meta.frame_meta_list
AttributeError: 'NoneType' object has no attribute 'frame_meta_list'

Everything is fine in tests.
Seems like frames are passed asynchronously and I need to wait until they are processed.

When I wait after the loop all counters match perfectly.

My bad. That code was referring to the previous pipeline.

  1. Your buffers are not leaving the appsrc until they can be consumed, so they are being queued inside it. To check this, modify your code with this just before nulling the pipe:

    print(f"srcbytes={source.get_property('current-level-bytes')}")
    

    As you said, if you wait long enough before printing, it would show zero instead, because it actually emitted the last buffer instead of queuing.

  2. To make the difference more evident, you could actually build the buffers beforehand (unrealistic example):

    ...
    for i in range(100):
         ret, im_orig = cap.read()
         imgs.append(cv2.cvtColor(im_orig, cv2.COLOR_BGR2RGBA).tobytes())
    
     #feeding the data to the pipeline
     start_time = time.time()
     count = 0
     while True:
         frame = imgs[count%len(imgs)]
         buf = Gst.Buffer.new_wrapped_full(Gst.MemoryFlags.READONLY, frame, len(frame), 0, None)
         res = source.emit('push-buffer', buf)
         #print(res)
         curr_time = time.time()
         count += 1
         if curr_time - start_time > TEST_TIME:
             break
    ...
    
  3. If you block the appsrc, you’ll get more realistic values than you would be reading otherwise. Using the previous “unrealistic” frame caching:

    • non-blocked appsrc (use source.set_property('block', False), which is the default, what you’re doing now):

      N pushed frames: 550510. Test time is 10 seconds
      counter: 8360
      srcbytes=4496800665600
      
    • blocked appsrc (use source.set_property('block', True)):

      N pushed frames: 9138. Test time is 10 seconds
      counter: 9138
      srcbytes=0
      

In summary, if you could limit the appsrc maxbytes with eg (source.set_property('max-bytes', IM_H*IM_W*4) and source.set_property('block', True), your measurements should match (± the appsrc internal queue size you set)

Thank you for a detailed answer and sorry for such a long delay.

Printing current-level-bytes works but I when I set block property the whole pipeline freezes. Even in little example I provided above. Could you please help me with that?
I see that some frames are going into the pipeline but then nothing happens.

Simple pipeline ( appsrc ! nvvideoconvert ! fakesink) works with block=True. But it fails when I add capsfilter. How can I fix it?

I’m still talking about this simple case:

import sys
sys.path.append('../')
import platform
import configparser

import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
from common.is_aarch_64 import is_aarch64
from common.bus_call import bus_call

import pyds


import time
import threading
import cv2
import numpy as np

IM_H = 1080
IM_W = 1920
TEST_TIME=10 #seconds

thread_unsafe_counter = 0
def sink_sink_pad_buffer_probe(pad,info,u_data):
    global thread_unsafe_counter
    thread_unsafe_counter += 1
    return Gst.PadProbeReturn.OK


def main(args):
    # Check input arguments
    if len(args) != 2:
        sys.stderr.write("usage: %s <media file or uri>\n" % args[0])
        sys.exit(1)

    # Standard GStreamer initialization
    GObject.threads_init()
    Gst.init(None)

    # Create gstreamer elements
    # Create Pipeline element that will form a connection of other elements
    print("Creating Pipeline \n ")
    pipeline = Gst.Pipeline()

    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")

    # Source element for reading from the file
    print("Creating Source \n ")

    source = Gst.ElementFactory.make("appsrc", "file-source")
    source.set_property('caps', 
                Gst.caps_from_string(','.join([
                'video/x-raw',
                'format=RGBA',
                'framerate=3000/1',
                F'width={IM_W}',
                F'height={IM_H}',
                ])))
    if not source:
        sys.stderr.write(" Unable to create Source \n")

    caps_str = Gst.Caps.from_string("video/x-raw(memory:NVMM)")
    capsfilter = Gst.ElementFactory.make("capsfilter", "filter")
    capsfilter.set_property("caps", caps_str)

    print("Creating Decoder \n")
    decoder = Gst.ElementFactory.make("nvvideoconvert", "nvv4l2-decoder")
    if not decoder:
        sys.stderr.write(" Unable to create Nvv4l2 Decoder \n")

    sink = Gst.ElementFactory.make("fakesink", "nvvideo-renderer")
    if not sink:
        sys.stderr.write(" Unable to create egl sink \n")

    pipeline.add(source)
    pipeline.add(decoder)
    pipeline.add(capsfilter)
    pipeline.add(sink)
    sink_src = sink.get_static_pad("sink")
    sink_src.add_probe(Gst.PadProbeType.BUFFER, sink_sink_pad_buffer_probe, 0)
    
    source.link(decoder)
    decoder.link(capsfilter)
    capsfilter.link(sink)
    #source.link(sink)

    loop = GObject.MainLoop()

    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect ("message", bus_call, loop)
    
    
    pipeline.set_state(Gst.State.PLAYING)
    try:
        th = threading.Thread(target = loop.run, args=())
        th.start()
    except:
        pass

    # prefetching set of images, so we will not decode them in test-loop
    cap = cv2.VideoCapture(sys.argv[1])
    imgs = []
    for i in range(100):
        ret, im_orig = cap.read()
        imgs.append(im_orig)

    #feeding the data to the pipeline
    start_time = time.time()
    count = 0
    while True:
        im_orig = imgs[count%len(imgs)]
        im = cv2.cvtColor(im_orig, cv2.COLOR_BGR2RGBA)
        frame = im.tobytes()
        buf = Gst.Buffer.new_wrapped_full(Gst.MemoryFlags.READONLY, frame, len(frame), 0, None)
        res = source.emit('push-buffer', buf)
        #print(res)
        curr_time = time.time()
        count += 1
        if curr_time - start_time > TEST_TIME:
            break

    print(F"N pushed frames: {count}. Test time is {TEST_TIME} seconds")
    print("counter:", thread_unsafe_counter)
    # cleanup
    pipeline.set_state(Gst.State.NULL)


if __name__ == '__main__':
    sys.exit(main(sys.argv))