Gstreamer Pipeline to insert metadata using DeepStream

Hello,
Regarding SEI Metadata Injection using nvdsmetainsert in DeepStream 7.1, I have some questions:

What is the correct way for me to attach user_meta_data for SEI injection?
I’m currently wrapping the payload (UUID + KLV bytes) using ctypes and PyCapsule, but I’m unsure if this is the proper approach for compatibility with nvdsmetainsert.

Should I set the meta_type to NVDS_USER_META?
I want to ensure the SEI serialization library (libnvds_sei_serialization.so) correctly identifies and processes the metadata I’m attaching.

Do I need to explicitly set copy_func and release_func for the user_meta?
I’ve noticed some examples include these functions, but I’m unclear whether they’re required for SEI injection to work properly or just for general memory management.

Does nvdsmetainsert expect a specific structure or memory layout for the metadata?
Is there a particular format I should follow when constructing the SEI payload (e.g., alignment, UUID positioning, capsule structure)?

Is there a minimal working Python example that shows how to inject KLV metadata into SEI using nvdsmetainsert?
I’ve looked through the C-based samples, but I would appreciate a Python version using pyds, especially one that demonstrates correct usage of user_meta for this purpose.

Thks.
pipeline

    src       = Gst.ElementFactory.make("udpsrc",   "src")
    queue_net = Gst.ElementFactory.make("queue",    "queue_net")
    tpar      = Gst.ElementFactory.make("tsparse",  "tpar")
    dem       = Gst.ElementFactory.make("tsdemux",  "dem")
    dem_q     = Gst.ElementFactory.make("queue",    "demux_queue")
    hp1       = Gst.ElementFactory.make("h264parse","hp1")
    dec       = Gst.ElementFactory.make("nvh264dec","dec")
    nvconv     = Gst.ElementFactory.make("nvvideoconvert","vconv")
    nvmm_capsfilter = Gst.ElementFactory.make("capsfilter", "nvmm_caps")
    nvstreammux = Gst.ElementFactory.make("nvstreammux", "streammux")
    nvdsmetainsert = Gst.ElementFactory.make("nvdsmetainsert", "nvdsmetainsert")
    cudaconv = Gst.ElementFactory.make("nvvideoconvert", "videoconvert")
    cuda_capsfilter_enc = Gst.ElementFactory.make("capsfilter", "cuda_caps_filter")
    queue_enc = Gst.ElementFactory.make("queue",    "queue_enc")
    enc       = Gst.ElementFactory.make("nvh264enc","enc")
    hp2       = Gst.ElementFactory.make("h264parse","hp2")
    tsmux     = Gst.ElementFactory.make("mpegtsmux","tsmux")
    sink      = Gst.ElementFactory.make("filesink", "sink")

    klv_queue = Gst.ElementFactory.make("queue",    "klv_queue")
    klv_identity = Gst.ElementFactory.make("identity", "klv_identity")
    klv_fakesink = Gst.ElementFactory.make("fakesink", "klv_fakesink")

sei injection


def on_nvdsmetainsert_probe(pad, info, user_data):
    global klv_data_buffer
    print(">>> klv_data_buffer", klv_data_buffer)
    buf = info.get_buffer()
    print(">>> buf", buf)

    if not buf or klv_data_buffer is None:
        print(">>> no buffer", buf)
        return Gst.PadProbeReturn.OK

    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(buf))
    print(">>> batch_meta", batch_meta)

    if not batch_meta:
        print(">>> no batch_meta", batch_meta)
        return Gst.PadProbeReturn.OK

    # Create user meta
    user_meta = pyds.nvds_acquire_user_meta_from_pool(batch_meta)
    print(">>> user_meta", user_meta)

    # Prepare SEI payload with UUID + KLV
    uuid_bytes = uuid.UUID("550e8400-e29b-41d4-a716-446655440000").bytes
    sei_payload = uuid_bytes + klv_data_buffer
    print(">>> sei_payload", sei_payload)

    # Allocate memory for payload
    klv_len = len(sei_payload)
    print(">>> klv_len", klv_len)

    # FIX: Create capsule compatible with PyDS
    try:
        # Create persistent ctypes buffer
        klv_buffer = (ctypes.c_ubyte * klv_len)()
        ctypes.memmove(klv_buffer, sei_payload, klv_len)
        
        # Create capsule using PyDS-compatible approach
        pythonapi.PyCapsule_New.argtypes = [c_void_p, c_char_p, c_void_p]
        pythonapi.PyCapsule_New.restype = py_object
        
        ptr = ctypes.cast(klv_buffer, c_void_p)
        capsule = pythonapi.PyCapsule_New(ptr, None, None)
        
        user_meta.user_meta_data = capsule
        print(">>> user_meta_data created with PyCapsule")
        
        # Keep reference to avoid GC
        # user_meta._klv_buffer_ref = klv_buffer
        
    except Exception as e:
        print(f">>> Error creating user_meta_data: {e}")
        return Gst.PadProbeReturn.OK

    # Set meta type
    user_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_USER_META
    print(">>> meta_type set")

    # Set callback functions
    # user_meta.copy_func = pyds.meta_copy_func
    # user_meta.release_func = pyds.meta_release_func
    print(">>> callback functions set")

    # Attach user meta to batch
    pyds.nvds_add_user_meta_to_batch(batch_meta, user_meta)
    print(">>> user meta added to batch")

    # Clear buffer for next KLV
    klv_data_buffer = None
    return Gst.PadProbeReturn.OK```

This function cannot be fully implemented through Python, which requires modifying native bindings.

Please refer to the patch below. Then recompile and install pyds.

diff --git a/apps/deepstream-custom-binding-test/deepstream_custom_binding_test.py b/apps/deepstream-custom-binding-test/deepstream_custom_binding_test.py
index 5e41846c..c4d8b28a 100644
--- a/apps/deepstream-custom-binding-test/deepstream_custom_binding_test.py
+++ b/apps/deepstream-custom-binding-test/deepstream_custom_binding_test.py
@@ -20,6 +20,8 @@
 import sys
 import os
 import gi
+import uuid
+import ctypes
 
 gi.require_version('Gst', '1.0')
 from gi.repository import Gst, GLib
@@ -40,6 +42,9 @@ def bus_call(bus, message, loop):
         loop.quit()
     return True
 
+KLV_PAYLOAD_TYPE = 0x01  # Example payload type for KLV data

 def streammux_src_pad_buffer_probe(pad, info, u_data):
     gst_buffer = info.get_buffer()
     if not gst_buffer:
@@ -64,15 +69,21 @@ def streammux_src_pad_buffer_probe(pad, info, u_data):
 
         if user_meta:
             print('adding user meta')
-            test_string = 'test message ' + str(frame_number)
+            uuid_bytes = uuid.UUID("550e8400-e29b-41d4-a716-446655440000").bytes
+            klv_data_buffer = " this is a test KLV data buffer for frame " + str(frame_number)
+            sei_payload = uuid_bytes + klv_data_buffer.encode('utf-8')
+
+            print(f"type of sei_payload: {type(sei_payload)} len: {len(sei_payload)}")
             data = pyds.alloc_custom_struct(user_meta)
-            data.message = test_string
-            data.message = pyds.get_string(data.message)
-            data.structId = frame_number
-            data.sampleInt = frame_number + 1
+            data.payload = pyds.alloc_buffer(len(sei_payload))
+
+            ctypes.memmove(data.payload, sei_payload, len(sei_payload))
+
+            data.payloadSize = len(sei_payload)
+            data.payloadType = KLV_PAYLOAD_TYPE
 
             user_meta.user_meta_data = data
-            user_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_USER_META
+            user_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_DUMMY_BBOX_META
 
             pyds.nvds_add_user_meta_to_frame(frame_meta, user_meta)
         else:
@@ -87,52 +98,6 @@ def streammux_src_pad_buffer_probe(pad, info, u_data):
     return Gst.PadProbeReturn.OK
 
 
-def fakesink_sink_pad_buffer_probe(pad, info, u_data):
-    gst_buffer = info.get_buffer()
-    if not gst_buffer:
-        print("Unable to get GstBuffer ")
-        return
-    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
-
-    if not batch_meta:
-        return Gst.PadProbeReturn.OK
-
-    pyds.nvds_acquire_meta_lock(batch_meta)
-
-    l_frame = batch_meta.frame_meta_list
-    while l_frame is not None:
-        try:
-            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
-        except StopIteration:
-            continue
-
-        l_usr = frame_meta.frame_user_meta_list
-        while l_usr is not None:
-            try:
-                user_meta = pyds.NvDsUserMeta.cast(l_usr.data)
-            except StopIteration:
-                continue
-
-            if user_meta.base_meta.meta_type == pyds.NvDsMetaType.NVDS_USER_META:
-                custom_msg_meta = pyds.CustomDataStruct.cast(user_meta.user_meta_data)
-                print(f'event msg meta, otherAttrs = {pyds.get_string(custom_msg_meta.message)}')
-                print('custom meta structId:: ', custom_msg_meta.structId)
-                print('custom meta msg:: ', pyds.get_string(custom_msg_meta.message))
-                print('custom meta sampleInt:: ', custom_msg_meta.sampleInt)
-            try:
-                l_usr = l_usr.next
-            except StopIteration:
-                break
-
-        try:
-            l_frame = l_frame.next
-        except StopIteration:
-            break
-
-    pyds.nvds_release_meta_lock(batch_meta)
-    return Gst.PadProbeReturn.OK
-
-
 def main(args):
     # Check input arguments
     if len(args) != 2:
@@ -164,12 +129,22 @@ def main(args):
     queue = Gst.ElementFactory.make("queue", "queue")
     if not queue:
         sys.stderr.write(" Unable to create queue")
+
+    nvdsmetainsert = Gst.ElementFactory.make("nvdsmetainsert", "nvds-meta-insert")
+    if not nvdsmetainsert:
+        sys.stderr.write(" Unable to create NvDsMetaInsert")
+
     queue1 = Gst.ElementFactory.make("queue", "queue1")
     if not queue1:
         sys.stderr.write(" Unable to create queue")
-    sink = Gst.ElementFactory.make("fakesink", "fakesink")
+
+    nvv4l2h264enc = Gst.ElementFactory.make("nvv4l2h264enc", "nvv4l2-h264-encoder")
+    if not nvv4l2h264enc:
+        sys.stderr.write(" Unable to create Nvv4l2 H264 Encoder")
+
+    sink = Gst.ElementFactory.make("filesink", "filesink")
     if not sink:
-        sys.stderr.write(" Unable to create fake sink \n")
+        sys.stderr.write(" Unable to create filesink \n")
     print("reading input")
     print("Playing file %s " %args[1])
     source.set_property('location', args[1])
@@ -184,9 +159,16 @@ def main(args):
     pipeline.add(decoder)
     pipeline.add(streammux)
     pipeline.add(queue)
+    pipeline.add(nvdsmetainsert)
     pipeline.add(queue1)
+    pipeline.add(nvv4l2h264enc)
     pipeline.add(sink)
 
+    print("Setting properties of elements")
+    nvdsmetainsert.set_property('serialize-lib', '/opt/nvidia/deepstream/deepstream/lib/libnvds_sei_serialization.so')
+    nvv4l2h264enc.set_property('bitrate', 4000000)
+    sink.set_property('location', 'output.h264')
+
     print("Linking elements in the Pipeline")
     source.link(h264parser)
     h264parser.link(decoder)
@@ -201,8 +183,10 @@ def main(args):
     srcpad.link(sinkpad)
 
     streammux.link(queue)
-    queue.link(queue1)
-    queue1.link(sink)
+    queue.link(nvdsmetainsert)
+    nvdsmetainsert.link(queue1)
+    queue1.link(nvv4l2h264enc)
+    nvv4l2h264enc.link(sink)
 
     loop = GLib.MainLoop()
     bus = pipeline.get_bus()
@@ -214,10 +198,6 @@ def main(args):
         sys.stderr.write(" Unable to get src pad of streammux")
     streammux_src_pad.add_probe(Gst.PadProbeType.BUFFER, streammux_src_pad_buffer_probe, 0)
 
-    fakesink_sink_pad = sink.get_static_pad('sink')
-    if not fakesink_sink_pad:
-        sys.stderr.write(" Unable to get sink pad of fakesink")
-    fakesink_sink_pad.add_probe(Gst.PadProbeType.BUFFER, fakesink_sink_pad_buffer_probe, 0)
     Gst.debug_bin_to_dot_file(pipeline, Gst.DebugGraphDetails.ALL, 'graph')
     print("Starting pipeline")
 
diff --git a/bindings/src/bindnvdsmeta.cpp b/bindings/src/bindnvdsmeta.cpp
index a644665a..9621f940 100644
--- a/bindings/src/bindnvdsmeta.cpp
+++ b/bindings/src/bindnvdsmeta.cpp
@@ -85,6 +85,7 @@ namespace pydeepstream {
                        pydsdoc::nvmeta::MetaTypeDoc::NVDS_FORCE32_META)
                 .value("NVDS_PREPROCESS_BATCH_META", NVDS_PREPROCESS_BATCH_META,
                        pydsdoc::nvmeta::MetaTypeDoc::NVDS_PREPROCESS_BATCH_META)
+                .value("NVDS_DUMMY_BBOX_META", nvds_get_user_meta_type("NVIDIA.DUMMY.BBOX.META"))
                 .export_values();
 
 
diff --git a/bindings/src/custom_binding/bindcustom.cpp b/bindings/src/custom_binding/bindcustom.cpp
index 5e0a253b..d0665611 100644
--- a/bindings/src/custom_binding/bindcustom.cpp
+++ b/bindings/src/custom_binding/bindcustom.cpp
@@ -24,18 +24,16 @@ namespace py = pybind11;
 
 namespace pydeepstream {
 
-    CustomDataStruct * copy_custom_struct(void* data, void* user_data) {
+    NVDS_CUSTOM_PAYLOAD * copy_custom_struct(void* data, void* user_data) {
         NvDsUserMeta * srcMeta = (NvDsUserMeta*) data;
-        CustomDataStruct * srcData = (CustomDataStruct *) srcMeta->user_meta_data;
-        CustomDataStruct *destData = (CustomDataStruct *) g_malloc0(
-                        sizeof(CustomDataStruct));
-        destData->structId = srcData->structId;
-        destData->sampleInt = srcData->sampleInt;
-        if (srcData->message != nullptr) {
-            char* srcString = (char *) srcData->message;
-            int strSize = strlen(srcString);
-            destData->message = (char*)calloc(strSize + 1, sizeof(char));
-            strcpy(destData->message, srcData->message);
+        NVDS_CUSTOM_PAYLOAD * srcData = (NVDS_CUSTOM_PAYLOAD *) srcMeta->user_meta_data;
+        NVDS_CUSTOM_PAYLOAD *destData = (NVDS_CUSTOM_PAYLOAD *) g_malloc0(
+                        sizeof(NVDS_CUSTOM_PAYLOAD));
+        destData->payloadType = srcData->payloadType;
+        destData->payloadSize = srcData->payloadSize;
+        if (srcData->payload != nullptr) {
+            destData->payload = (uint8_t*)calloc(srcData->payloadSize, sizeof(uint8_t));
+            memcpy(destData->payload, srcData->payload, srcData->payloadSize);
         }
         return destData;
     }
@@ -43,14 +41,10 @@ namespace pydeepstream {
     void release_custom_struct(void * data, void * user_data) {
         NvDsUserMeta * srcMeta = (NvDsUserMeta*) data;
         if (srcMeta != nullptr) {
-            CustomDataStruct * srcData = (CustomDataStruct *) srcMeta->user_meta_data;
+            NVDS_CUSTOM_PAYLOAD * srcData = (NVDS_CUSTOM_PAYLOAD *) srcMeta->user_meta_data;
             if (srcData != nullptr) {
-                auto * message = srcData->message;
-                srcData->structId = 0;
-                srcData->sampleInt = 0;
-                if (srcData->message != nullptr)
-                {
-                    free(srcData->message);
+                if (srcData->payload != nullptr) {
+                    free(srcData->payload);
                 }
                 g_free(srcData);
             }
@@ -59,30 +53,33 @@ namespace pydeepstream {
 
     void bindcustom(py::module &m) {
                 /* CustomDataStruct bindings to be used with NvDsUserMeta */
-                py::class_<CustomDataStruct>(m, "CustomDataStruct",
-                                pydsdoc::custom::CustomDataStructDoc::descr)
+                py::class_<NVDS_CUSTOM_PAYLOAD>(m, "NVDS_CUSTOM_PAYLOAD")
                 .def(py::init<>())
-                .def_readwrite("structId", &CustomDataStruct::structId)
-                .def_property("message", STRING_FREE_EXISTING(CustomDataStruct, message))
-                .def_readwrite("sampleInt", &CustomDataStruct::sampleInt)
+                .def_readwrite("payloadType", &NVDS_CUSTOM_PAYLOAD::payloadType)
+                .def_readwrite("payloadSize", &NVDS_CUSTOM_PAYLOAD::payloadSize)
+                .def_property("payload", 
+                    [](const NVDS_CUSTOM_PAYLOAD &self) -> size_t {
+                        return (size_t)self.payload;
+                    },
+                    [](NVDS_CUSTOM_PAYLOAD& self, size_t ptr) {
+                        self.payload = (uint8_t *) ptr;
+                    }, py::return_value_policy::reference)
 
                 .def("cast",
                      [](void *data) {
-                         return (CustomDataStruct *) data;
+                         return (NVDS_CUSTOM_PAYLOAD *) data;
                      },
-                     py::return_value_policy::reference,
-                     pydsdoc::custom::CustomDataStructDoc::cast);
+                     py::return_value_policy::reference);
 
         m.def("alloc_custom_struct",
               [](NvDsUserMeta *meta) {
-                  auto *mem = (CustomDataStruct *) g_malloc0(
-                          sizeof(CustomDataStruct));
+                  auto *mem = (NVDS_CUSTOM_PAYLOAD *) g_malloc0(
+                          sizeof(NVDS_CUSTOM_PAYLOAD));
                   meta->base_meta.copy_func = (NvDsMetaCopyFunc) pydeepstream::copy_custom_struct;
                   meta->base_meta.release_func = (NvDsMetaReleaseFunc) pydeepstream::release_custom_struct;
                   return mem;
               },
-              py::return_value_policy::reference,
-              pydsdoc::methodsDoc::alloc_custom_struct);
+              py::return_value_policy::reference);
 
     }
 
diff --git a/bindings/src/custom_binding/include/custom_data.hpp b/bindings/src/custom_binding/include/custom_data.hpp
index a9abe44e..aeb271fb 100644
--- a/bindings/src/custom_binding/include/custom_data.hpp
+++ b/bindings/src/custom_binding/include/custom_data.hpp
@@ -18,8 +18,9 @@
 
 using namespace std;
 
-struct CustomDataStruct {
-  int structId;
-  char* message;
-  int sampleInt;
-};
\ No newline at end of file
+typedef struct _NVDS_CUSTOM_PAYLOAD
+{
+   uint32_t payloadType;
+   uint32_t payloadSize;
+   uint8_t  *payload;
+} NVDS_CUSTOM_PAYLOAD;
\ No newline at end of file
1 Like

Then run the program, and you can find SEI in the output output.h264, as shown in the following figure

python3 deepstream_custom_binding_test.py /opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.h264