Issue with Sending Custom Events Using Python

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU) GPU
• DeepStream Version 7.0
• Issue Type( questions, new requirements, bugs) questions
I am working on dynamically changing the inference interval of nvinfer using Python and have encountered some challenges.
I referred to the latest documentation on how to send custom events from Python to downstream elements (link). I’m trying to apply this to the function gst_nvevent_infer_interval_update located at /opt/nvidia/deepstream/deepstream-7.0/sources/libs/gstnvdscustomhelper/gst-nvdscustomevent.c.

I successfully obtained the Python bindings on my x86 machine and followed an implementation similar to what was suggested in this forum post (link), implementing it identically in bindfunctions.cpp.

However, I am unsure how to use this bound function in Python. When calling the function, the size_t gst_element in C++ corresponds to an int in Python, but I can only obtain the Python object corresponding to pgie.
What should I pass in Python? Does this int value correspond to the address of pgie in C++? The function in the documentation example seems to have the same issue. Is there an official implementation or recommended usage? I need a Python usage example, not just a binding example.

pyds-1.1.11-py3-none-linux_x86_64.zip (602.6 KB)
Can an implementation be made based on deepstream-test1?

#!/usr/bin/env python3

################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2019-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import threading
import sys
sys.path.append('../')
import os
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
from common.platform_info import PlatformInfo
from common.bus_call import bus_call

import pyds

PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3
MUXER_BATCH_TIMEOUT_USEC = 33000

def osd_sink_pad_buffer_probe(pad,info,u_data):
    frame_number=0
    num_rects=0

    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return

    # Retrieve batch metadata from the gst_buffer
    # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
    # C address of gst_buffer as input, which is obtained with hash(gst_buffer)
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list
    while l_frame is not None:
        try:
            # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
            # The casting is done by pyds.NvDsFrameMeta.cast()
            # The casting also keeps ownership of the underlying memory
            # in the C code, so the Python garbage collector will leave
            # it alone.
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
            #source_id = frame_meta.source_id
            #print("source_id: ", source_id)
        except StopIteration:
            break

        #Intiallizing object counter with 0.
        obj_counter = {
            PGIE_CLASS_ID_VEHICLE:0,
            PGIE_CLASS_ID_PERSON:0,
            PGIE_CLASS_ID_BICYCLE:0,
            PGIE_CLASS_ID_ROADSIGN:0
        }
        frame_number=frame_meta.frame_num
        num_rects = frame_meta.num_obj_meta
        l_obj=frame_meta.obj_meta_list
        while l_obj is not None:
            try:
                # Casting l_obj.data to pyds.NvDsObjectMeta
                obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)
            except StopIteration:
                break
            obj_counter[obj_meta.class_id] += 1
            obj_meta.rect_params.border_color.set(0.0, 0.0, 1.0, 0.8) #0.8 is alpha (opacity)
            try: 
                l_obj=l_obj.next
            except StopIteration:
                break

        # Acquiring a display meta object. The memory ownership remains in
        # the C code so downstream plugins can still access it. Otherwise
        # the garbage collector will claim it when this probe function exits.
        display_meta=pyds.nvds_acquire_display_meta_from_pool(batch_meta)
        display_meta.num_labels = 1
        py_nvosd_text_params = display_meta.text_params[0]
        # Setting display text to be shown on screen
        # Note that the pyds module allocates a buffer for the string, and the
        # memory will not be claimed by the garbage collector.
        # Reading the display_text field here will return the C address of the
        # allocated string. Use pyds.get_string() to get the string content.
        py_nvosd_text_params.display_text = "Frame Number={} Number of Objects={} Vehicle_count={} Person_count={}".format(frame_number, num_rects, obj_counter[PGIE_CLASS_ID_VEHICLE], obj_counter[PGIE_CLASS_ID_PERSON])

        # Now set the offsets where the string should appear
        py_nvosd_text_params.x_offset = 10
        py_nvosd_text_params.y_offset = 12

        # Font , font-color and font-size
        py_nvosd_text_params.font_params.font_name = "Serif"
        py_nvosd_text_params.font_params.font_size = 10
        # set(red, green, blue, alpha); set to White
        py_nvosd_text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)

        # Text background color
        py_nvosd_text_params.set_bg_clr = 1
        # set(red, green, blue, alpha); set to Black
        py_nvosd_text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
        # Using pyds.get_string() to get display_text as string
        print(pyds.get_string(py_nvosd_text_params.display_text))
        pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
        try:
            l_frame=l_frame.next
        except StopIteration:
            break
			
    return Gst.PadProbeReturn.OK	


def callback():
    global pgie
    interval = 10
    # ***********************************************************************
    pyds.gst_element_send_nvevent_interval_update(pgie, '0', interval)


def main(args):
    global pgie
    # Check input arguments
    if len(args) != 2:
        sys.stderr.write("usage: %s <media file or uri>\n" % args[0])
        sys.exit(1)

    platform_info = PlatformInfo()
    # Standard GStreamer initialization
    Gst.init(None)

    # Create gstreamer elements
    # Create Pipeline element that will form a connection of other elements
    print("Creating Pipeline \n ")
    pipeline = Gst.Pipeline()

    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")

    # Source element for reading from the file
    print("Creating Source \n ")
    source = Gst.ElementFactory.make("filesrc", "file-source")
    if not source:
        sys.stderr.write(" Unable to create Source \n")

    # Since the data format in the input file is elementary h264 stream,
    # we need a h264parser
    print("Creating H264Parser \n")
    h264parser = Gst.ElementFactory.make("h264parse", "h264-parser")
    if not h264parser:
        sys.stderr.write(" Unable to create h264 parser \n")

    # Use nvdec_h264 for hardware accelerated decode on GPU
    print("Creating Decoder \n")
    decoder = Gst.ElementFactory.make("nvv4l2decoder", "nvv4l2-decoder")
    if not decoder:
        sys.stderr.write(" Unable to create Nvv4l2 Decoder \n")

    # Create nvstreammux instance to form batches from one or more sources.
    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        sys.stderr.write(" Unable to create NvStreamMux \n")

    # Use nvinfer to run inferencing on decoder's output,
    # behaviour of inferencing is set through config file
    pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    if not pgie:
        sys.stderr.write(" Unable to create pgie \n")

    # Use convertor to convert from NV12 to RGBA as required by nvosd
    nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
    if not nvvidconv:
        sys.stderr.write(" Unable to create nvvidconv \n")

    # Create OSD to draw on the converted RGBA buffer
    nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")

    if not nvosd:
        sys.stderr.write(" Unable to create nvosd \n")

    # Finally render the osd output
    if platform_info.is_integrated_gpu():
        print("Creating nv3dsink \n")
        sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
        if not sink:
            sys.stderr.write(" Unable to create nv3dsink \n")
    else:
        if platform_info.is_platform_aarch64():
            print("Creating nv3dsink \n")
            sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
        else:
            print("Creating EGLSink \n")
            sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
        if not sink:
            sys.stderr.write(" Unable to create egl sink \n")

    print("Playing file %s " %args[1])
    source.set_property('location', args[1])
    if os.environ.get('USE_NEW_NVSTREAMMUX') != 'yes': # Only set these properties if not using new gst-nvstreammux
        streammux.set_property('width', 1920)
        streammux.set_property('height', 1080)
        streammux.set_property('batched-push-timeout', MUXER_BATCH_TIMEOUT_USEC)
    
    streammux.set_property('batch-size', 1)
    pgie.set_property('config-file-path', "dstest1_pgie_config.txt")

    print("Adding elements to Pipeline \n")
    pipeline.add(source)
    pipeline.add(h264parser)
    pipeline.add(decoder)
    pipeline.add(streammux)
    pipeline.add(pgie)
    pipeline.add(nvvidconv)
    pipeline.add(nvosd)
    pipeline.add(sink)

    # we link the elements together
    # file-source -> h264-parser -> nvh264-decoder ->
    # nvinfer -> nvvidconv -> nvosd -> video-renderer
    print("Linking elements in the Pipeline \n")
    source.link(h264parser)
    h264parser.link(decoder)

    sinkpad = streammux.request_pad_simple("sink_0")
    if not sinkpad:
        sys.stderr.write(" Unable to get the sink pad of streammux \n")
    srcpad = decoder.get_static_pad("src")
    if not srcpad:
        sys.stderr.write(" Unable to get source pad of decoder \n")
    srcpad.link(sinkpad)
    streammux.link(pgie)
    pgie.link(nvvidconv)
    nvvidconv.link(nvosd)
    nvosd.link(sink)

    # create an event loop and feed gstreamer bus mesages to it
    loop = GLib.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect ("message", bus_call, loop)

    # Lets add probe to get informed of the meta data generated, we add probe to
    # the sink pad of the osd element, since by that time, the buffer would have
    # had got all the metadata.
    osdsinkpad = nvosd.get_static_pad("sink")
    if not osdsinkpad:
        sys.stderr.write(" Unable to get sink pad of nvosd \n")

    osdsinkpad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0)

    # start play back and listen to events
    print("Starting pipeline \n")
    pipeline.set_state(Gst.State.PLAYING)

    t = threading.Timer(10, callback)
    t.start()

    try:
        loop.run()
    except:
        pass
    # cleanup
    pipeline.set_state(Gst.State.NULL)

if __name__ == '__main__':
    sys.exit(main(sys.argv))

cmd:

/opt/nvidia/deepstream/deepstream-7.0/sources/deepstream_python_apps/apps/deepstream-test1$ python3 deepstream_test_1.py /opt/nvidia/deepstream/deepstream-7.0/samples/streams/sample_720p.h264

I am getting an error:

Frame Number=284 Number of Objects=29 Vehicle_count=24 Person_count=5
Frame Number=285 Number of Objects=28 Vehicle_count=22 Person_count=6
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.10/threading.py", line 1378, in run
    self.function(*self.args, **self.kwargs)
  File "/opt/nvidia/deepstream/deepstream-7.0/sources/deepstream_python_apps/apps/deepstream-test1/deepstream_test_1.py", line 134, in callback
    pyds.gst_element_send_nvevent_interval_update(pgie, '0', interval)
TypeError: gst_element_send_nvevent_interval_update(): incompatible function arguments. The following argument types are supported:
    1. (arg0: int, arg1: str, arg2: int) -> int

Invoked with: <__gi__.GstNvInfer object at 0x7f59f818e5c0 (GstNvInfer at 0x55e9edc47ad0)>, '0', 10
Frame Number=286 Number of Objects=28 Vehicle_count=23 Person_count=5
Frame Number=287 Number of Objects=30 Vehicle_count=24 Person_count=6

This is a complex problem, involving pygobject and GIL.

For more information about Python’s meta function hash, please refer to the following link

Details of GIL

I will give some sample patchs, I have tried them in DS-7.0, they can work.

for deepstream_test_1.py

diff --git a/apps/deepstream-test1/deepstream_test_1.py b/apps/deepstream-test1/deepstream_test_1.py
index 1367fb4..8ff4f6f 100755
--- a/apps/deepstream-test1/deepstream_test_1.py
+++ b/apps/deepstream-test1/deepstream_test_1.py
@@ -18,6 +18,7 @@
 ################################################################################
 
 import sys
+import threading
 sys.path.append('../')
 import os
 import gi
@@ -68,6 +69,8 @@ def osd_sink_pad_buffer_probe(pad,info,u_data):
         }
         frame_number=frame_meta.frame_num
         num_rects = frame_meta.num_obj_meta
+        source_id = frame_meta.source_id
+        print("source_id: ", source_id)
         l_obj=frame_meta.obj_meta_list
         while l_obj is not None:
             try:
@@ -119,8 +122,15 @@ def osd_sink_pad_buffer_probe(pad,info,u_data):
 			
     return Gst.PadProbeReturn.OK	
 
+def callback():
+    global pgie
+    interval = 13
+    print(f"Send NvDsEventIntervalUpdate type {pgie.__class__} {hash(pgie)}")
+    pyds.gst_element_send_nvevent_interval_update(hash(pgie), '0', interval)
+    return True
 
 def main(args):
+    global pgie
     # Check input arguments
     if len(args) != 2:
         sys.stderr.write("usage: %s <media file or uri>\n" % args[0])
@@ -191,7 +201,8 @@ def main(args):
             sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
         else:
             print("Creating EGLSink \n")
-            sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
+            sink = Gst.ElementFactory.make("fakesink", "nvvideo-renderer")
+            sink.set_property('sync', True)
         if not sink:
             sys.stderr.write(" Unable to create egl sink \n")
 
@@ -252,6 +263,11 @@ def main(args):
     # start play back and listen to events
     print("Starting pipeline \n")
     pipeline.set_state(Gst.State.PLAYING)
+
+    GLib.timeout_add(5000, callback)
+    # t = threading.Timer(5, callback)
+    # t.start()
+
     try:
         loop.run()
     except:

for the python bindings.

diff --git a/bindings/CMakeLists.txt b/bindings/CMakeLists.txt
index 7b01fb2..abe5042 100644
--- a/bindings/CMakeLists.txt
+++ b/bindings/CMakeLists.txt
@@ -98,7 +98,7 @@ function(add_ds_lib libname)
         target_link_libraries(pyds ${libname})
 endfunction()
 
-foreach(nvds_lib nvds_osd nvds_meta nvds_infer nvdsgst_meta nvbufsurface nvbufsurftransform nvdsgst_helper)
+foreach(nvds_lib nvds_osd nvds_meta nvds_infer nvdsgst_meta nvbufsurface nvbufsurftransform nvdsgst_helper nvdsgst_customhelper)
         add_ds_lib(${nvds_lib})
 endforeach()
 
diff --git a/bindings/src/bindfunctions.cpp b/bindings/src/bindfunctions.cpp
index cc2c9cc..6879e04 100644
--- a/bindings/src/bindfunctions.cpp
+++ b/bindings/src/bindfunctions.cpp
@@ -17,6 +17,7 @@
 
 #include "bind_string_property_definitions.h"
 #include "bindfunctions.hpp"
+#include "gst-nvdscustomevent.h"
 
 namespace py = pybind11;
 using namespace std;
@@ -797,6 +798,20 @@ namespace pydeepstream {
               },
               pydsdoc::methodsDoc::gst_element_send_nvevent_new_stream_reset);
 
+        m.def("gst_element_send_nvevent_interval_update",
+            [](size_t gst_element, char* stream_id, int interval) {
+                  bool ret = false;
+                  auto *element = reinterpret_cast<GstElement *>(gst_element);
+                  auto *event = gst_nvevent_infer_interval_update(stream_id, interval);
+                  std::cout << "Reached gst_nvevent_infer_interval_update " << element << std::endl;
+                  Py_BEGIN_ALLOW_THREADS;
+                  ret = gst_element_send_event(element, event);
+                  Py_END_ALLOW_THREADS;
+                  std::cout << "Reached gst_element_send_nvevent_interval_update " << ret << std::endl;
+                  return ret;
+            },
+            "pydsdoc::methodsDoc::gst_element_send_nvevent_interval_update");
+
1 Like

Great job! That was a perfect answer. I believe your response will be helpful to many others as well. Thank you very much for your insightful contribution!

I hope I can ask an additional question: Why does the documentation not consider the GIL issue for the binding of gst_nvevent_new_stream_reset , whereas gst_nvevent_infer_interval_update must take it into account? The difference in handling here is confusing to me.

In fact, this is a case by case bindings. Depending on the specific needs, gst_nvevent_infer_interval_update may cause blocking so that the GIL needs to be explicitly released.

Got it, thank you once again for your detailed answer.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.