Help customizing DeepStream Kafka messages using ROI-based object counts

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU) RTX 6000 Ada Gen
• DeepStream Version 7.1
• TensorRT Version 10.9.034
• NVIDIA GPU Driver Version (valid for GPU only) 560.35.05
• Issue Type( questions, new requirements, bugs) questions
• How to reproduce the issue ? (This is for bugs. Including which sample app is using, the configuration files content, the command line used and other details for reproducing)
• Requirement details( This is for new requirement. Including the module name-for which plugin or for which sample application, the function description)

Help customizing DeepStream Kafka messages using ROI-based object counts

Hello, I’m currently studying DeepStream with Python.

By referring to NVIDIA’s deepstream-test4, I learned that it’s possible to customize the Kafka message schema. I’ve set the payload-type property of the nvmsgconv element to 257 and am modifying the C plugin accordingly.

Specifically, I want to send messages based on the number of objects detected within a specific ROI using the tiler_probe function. However, I’m not sure how to properly reflect this information in the message payload.

I have modified the following C++ plugin files:

  • nvmsgconv.cpp
  • dsmeta_payload_custom.cpp
  • eventmsg_payload_custom.cpp
  • deepstream_schema_custom.cpp

Despite going through many online references, I’m struggling to find a clear example of how to pass the ROI-based data from the tiler_probe to the generate_message_custom function in dsmeta_payload_custom.cpp, so that the final Kafka message contains the data I want.

To be honest, directly producing messages via Python-Kafka seems way simpler than modifying these plugins…
But, I want to run my deepstream pipeline with nvmsgconv & nvmsgbroker. (with C++ plugin that I make libnvdsmsgconv.co)

Any advice or reference would be greatly appreciated.
Thanks as always!

Best regards,
Daring_park

what is the complete media pipeline? how did you set msg2p-newapi for nvmsgconv? please find the explanation in this doc. and please refer to this doc for Schema Customization.

Thank you for quick replying to my questions.

This is my answer for your question and the information I knew.

what is the complete media pipeline?

  • This is the complete media pipeline in my application.
A [RTSP Sources - uridecodebin] --> B[nvstreammux (Stream Muxer)]
B --> C[nvinfer (PGIE, car detection - trafficcamnet)]
C --> D[nvvideoconvert]
D --> E[capsfilter (Format Filter)]
E --> F[nvmultistreamtiler (Tiler)]
F --> G[nvvideoconvert]
G --> H[nvdsosd (On-Screen Display)]
H --> I[Tee (Devider)]
I -- Message Branch --> J[Queue1]
J --> K[nvmsgconv (Message Converter)]
K --> L[nvmsgbroker (Kafka Broker)]
I -- Movie Branch --> M[Queue2]
M --> N[nveglglessink (Display Sink)]

how did you set msg2p-newapi for nvmsgconv?

  • In my python application, after factorymaking the element, I want to notice my properties about msgconv below.

    • msgconv.set_property(“config”, “config/msg/dstest4_msgconv_config.txt”) # I understand this property setting as configuration for msgconv like sensors, places, …
    • msgconv.set_property(‘payload-type’, 257) # I understand this option for custom schema
    • msgconv.set_property(‘msg2p-newapi’, 1) # I understand this option for using custom schema
  • No Doubt for payload-type 0, 1 by using preset schema.

  • But, I want my own message schema for kafka producing.
    which is like
    {
    ‘source_id’ : 0,
    ‘timestamp’ : ‘2025-04-17T09:01:57.891Z’,
    ‘object_counter’ : {
    ‘downstream’ : 0,
    ‘upstream’ : 2
    }
    }
    {
    ‘source_id’ : 1,
    ‘timestamp’ : ‘2025-04-17T09:01:57.891Z’,
    ‘object_counter’ : {
    ‘downstream’ : 1,
    ‘upstream’ : 1
    }
    }
    that I custom the message by following doc.

About DeepStream Documentation

  • I read the element about Gst-nvmsgconv and custom schema that you linked
  • It’s helpful to approach the method to custom schema.
  • I can’t still produce the custom message I want though.

If using tiler, please add “tee->nvmsgconv->nvmsgbroker” in front of tiler because many meta information become meaningless after tiler composites a 2D tile from batched buffers. You don’t need to modify eventmsg_payload_custom.cpp because it is for msg2p-newapi=0. If msg2p-newapi is 1, nvmsgconv will only get information from NvDsFrameMeta to create payload, so If you need to add custom data to payload, please add the information to NvDsFrameMeta with nvds_add_user_meta_to_frame, which can be found in deepstream-test4.

Thank you for the response.

After modifying the pipeline, it is much better to make the message.

    print("Adding elements to the Pipeline")
    pipeline.add(pgie)
    pipeline.add(nvvidconv1)
    pipeline.add(filter1)
    pipeline.add(tiler)
    pipeline.add(nvvidconv)
    pipeline.add(nvosd)
    pipeline.add(tee)
    pipeline.add(queue1)
    pipeline.add(queue2)
    # message element pipeline
    pipeline.add(msgconv)
    pipeline.add(msgbroker)
    pipeline.add(sink)
    
    print("Linking elements in the Pipeline")
    streammux.link(pgie)
    pgie.link(nvvidconv1)
    nvvidconv1.link(filter1)
    filter1.link(tee) # split the pipeline

    # Message Branch
    tee_msg_pad = tee.request_pad_simple('src_%u')
    sink_pad = queue1.get_static_pad("sink")
    tee_msg_pad.link(sink_pad)
    queue1.link(msgconv)
    msgconv.link(msgbroker)

    # Movie Branch
    queue2.link(tiler)
    tiler.link(nvvidconv)
    nvvidconv.link(nvosd)
    nvosd.link(sink)
    tee_render_pad = tee.request_pad_simple("src_%u")
    sink_pad = queue2.get_static_pad("sink")
    tee_render_pad.link(sink_pad)

    tiler_sink_pad = tiler.get_static_pad("sink")
    if not tiler_sink_pad:
        sys.stderr.write(" Unable to get src pad \n")
    else:
        tiler_sink_pad.add_probe(Gst.PadProbeType.BUFFER, tiler_sink_pad_buffer_probe, 0)

I use “tiler_sink_pad_buffer_probe” function with “pyds.nvds_add_user_meta_to_frame(frame_meta, user_event_meta)”.

How should I make the message like this with my code? (how to modifying the payload)

                {
                    "source_id" : 0,
                    "frame_num" : 120,
                    "counter" : {
                        "up" : 1, "down" : 2
                    }
                }
                {
                    "source_id" : 1,
                    "frame_num" : 120,
                    "counter" : {
                        "up" : 0, "down" : 1
                    }
                }

This is my a part of code in tiler_sink_pad_buffer_probe funcion :

        # for test
        n_frame = pyds.get_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
        BOUNDARY = {0 : [[650, 880], [450, 650], [900, 1130], [450, 650]]}
        
        # down stream (ex, l t 650 600, w 350 h 200)
        cv2.rectangle(n_frame, (BOUNDARY[camera_source_id][0][0], BOUNDARY[camera_source_id][1][0]), (BOUNDARY[camera_source_id][0][1], BOUNDARY[camera_source_id][1][1]), (0, 255, 0), 2) # 좌상, 우하
        # up stream (ex, l t 1050 600, w 500 h 200)
        cv2.rectangle(n_frame, (BOUNDARY[camera_source_id][2][0], BOUNDARY[camera_source_id][3][0]), (BOUNDARY[camera_source_id][2][1], BOUNDARY[camera_source_id][3][1]), (0, 0, 255), 2) # 좌상, 우하

        # pgie object list
        l_obj = frame_meta.obj_meta_list
        while l_obj is not None:
            try:
                obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
            except StopIteration:
                continue
            
            if obj_meta.class_id == 0: # 차량만 탐지

                # 2. object center calculate
                obj_center_x = int(obj_meta.rect_params.left + obj_meta.rect_params.width // 2)
                obj_center_y = int(obj_meta.rect_params.top + obj_meta.rect_params.height // 2)

                if BOUNDARY[camera_source_id][0][0] <= obj_center_x <= BOUNDARY[camera_source_id][0][1] and BOUNDARY[camera_source_id][1][0] <= obj_center_y <= BOUNDARY[camera_source_id][1][1]:
                    object_counter[camera_source_id]["object_counter_down"] += 1
                elif BOUNDARY[camera_source_id][2][0] <= obj_center_x <= BOUNDARY[camera_source_id][2][1] and BOUNDARY[camera_source_id][3][0] <= obj_center_y <= BOUNDARY[camera_source_id][3][1]:
                    object_counter[camera_source_id]["object_counter_up"] += 1
            try:
                l_obj = l_obj.next
            except StopIteration:
                break

        # 30 frame; message produce
        if PRODUCE_A_MESSAGE and (frame_number % 30) == 0:
            user_event_meta = pyds.nvds_acquire_user_meta_from_pool(batch_meta)
            if user_event_meta:
                msg_meta = pyds.alloc_nvds_event_msg_meta(user_event_meta)
                msg_meta.frameId = frame_number
                msg_meta.sensorId = camera_source_id
                meta = pyds.NvDsEventMsgMeta.cast(msg_meta)
                meta.ts = pyds.alloc_buffer(MAX_TIME_STAMP_LEN + 1)
                pyds.generate_ts_rfc3339(meta.ts, MAX_TIME_STAMP_LEN)

                # JSON making
                counter_data = {
                    "source_id": int(camera_source_id),
                    "frame_num": int(frame_meta.frame_num),
                    "counter": {
                        "up": object_counter[camera_source_id]["object_counter_up"],
                        "down": object_counter[camera_source_id]["object_counter_down"]
                    }
                }
                obj = pyds.NvDsVehicleObject.cast(pyds.alloc_nvds_vehicle_object())
                # obj.type = object_counter[camera_source_id]["object_counter_up"]
                # meta.extMsg = obj
                # meta.extMsgSize = sys.getsizeof(obj)

                user_event_meta.user_meta_data = msg_meta
                user_event_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_EVENT_MSG_META
                pyds.nvds_add_user_meta_to_frame(frame_meta, user_event_meta)

I’m sorry to bother you with the understanding difficult for me.
However, I want to solve this problem as hard as I can.

Thank you for helping me again.

Regards,
Daring_park

please don’t add tiler_sink_pad_buffer_probe on tiler. adding on tee is ok. otherwise, msgconv can’t access the custom data. please refer to this sample for adding some custom data with nvds_add_user_meta_to_frame. then in nvmsgconv, you can get the custom data from framemeta, then convert th custom data to string payload.

Thank you for helping me to solve the problem.

Here is I tried

  • I changed the probe element the way you told (tee probe).
  • I don’t still get the method to make custom message for kafka.

I think the sample code told the point important.

        if user_meta:
            print('adding user meta')
            test_string = 'test message ' + str(frame_number)
            data = pyds.alloc_custom_struct(user_meta)
            data.message = test_string
            data.message = pyds.get_string(data.message)
            data.structId = frame_number
            data.sampleInt = frame_number + 1

            user_meta.user_meta_data = data
            user_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_USER_META
  • I don’t know the way how to apply to the message for kafka.
  • I want to make the message with inference information(object count with ROI).
  • my consumer can’t print the value.

and this is the code that I refer to the sample code for making custom data in my tee probe function.

        # 30 프레임마다 메시지를 produce
        if PRODUCE_A_MESSAGE and (frame_number % 30) == 0:
            user_meta = pyds.nvds_acquire_user_meta_from_pool(batch_meta)
            counter_data = {
                "source_id": int(camera_source_id),
                "frame_num": int(frame_meta.frame_num),
                "counter": {
                    "up": object_counter[camera_source_id]["object_counter_up"],
                    "down": object_counter[camera_source_id]["object_counter_down"]
                }
            }

            if user_meta:
                print('adding user meta')
                test_string = 'test message ' + str(frame_number)
                data = pyds.alloc_custom_struct(user_meta)
                data.message = test_string
                data.message = pyds.get_string(data.message)
                data.structId = frame_number
                data.sampleInt = frame_number + 1

                user_meta.user_meta_data = data
                user_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_USER_META

                pyds.nvds_add_user_meta_to_frame(frame_meta, user_meta)
            else:
                print("not able to find user_meta.")
            print(f"Frame: {frame_meta.frame_num}, Source: {frame_meta.source_id}")

            PRODUCE_A_MESSAGE = False

            """
            consumer want this format of message.
            {
                "source_id" : 0,
                "frame_num" : 120,
                "counter" : {
                    "up" : 1, "down" : 2
                }
            }
            {
                "source_id" : 1,
                "frame_num" : 120,
                "counter" : {
                    "up" : 0, "down" : 1
                }
            }
            """

consumer received the message.value without deserialization.

Received : b'CUSTOM Schema\x00'
Received : b'CUSTOM Schema\x00'
Received : b'CUSTOM Schema\x00'

Thank you for helping me again.
I try hard to solve this problem as hard as I can.

Regards,
Daring_park

  1. first please add the custom data with nvds_add_user_meta_to_frame. from the code you shared, you already complate adding the custom data to meta data.
  2. In nvmsgconv, can you access the custom data from the meta data? please refer to “frame_usermeta->base_meta.meta_type == xxx” in dsmeta_payload.cpp for how to access user meta from NvDsFrameMeta. From “b’CUSTOM Schema\x00’”, the playload is empty. After getting the custom data, you need to convert the custom data to string payload.

Thank you for the help with quick response, fanzh.

  1. first please add the custom data with nvds_add_user_meta_to_frame. from the code you shared, you already complate adding the custom data to meta data.
  • Yep, The sample code that you mentioned is helpful to add the custom data with nvds_user_meta_to_frame.
  • It’s string format though. After the test, Can I make the message format that I told before?

The message format that I want to receive by consumer.py

received : {
    "source_id" : 0,
    "frame_num" : 120,
    "counter" : {
        "up" : 1, "down" : 2
    }
}
received : {
    "source_id" : 1,
    "frame_num" : 120,
    "counter" : {
        "up" : 0, "down" : 1
    }
}
  1. In nvmsgconv, can you access the custom data from the meta data?
    please refer to “frame_usermeta->base_meta.meta_type == xxx” in dsmeta_payload.cpp for how to access user meta from NvDsFrameMeta.

From “b’CUSTOM Schema\x00’”, the playload is empty.
After getting the custom data, you need to convert the custom data to string payload.

  • I kind of worried about understanding c++ code that I’m diffucult to modify the C plugin though.

  • Do I need to sudo make for “new libnvdsmsgconv.so”?

  • There are another options for “Cmake” metioned “schema.pb.cc”, “lidar_schema.pb.cc” “protobuf_build_dir”, … .

  • I can find the code with ““frame_usermeta->base_meta.meta_type == xxx” in function “generate_dsmeta_message” and “generate_dsmeta_message_minimal”.

  • I don’t know how to modify it in the direction I want.

  • you told me I don’t need to modify eventmsg_payload.cpp for the custom message.
    (I told dsmeta_payload.cpp though. have to modify?)

Thank you for helping me with fast response.

Regards,
Daring_park

If you don’t want to modify C low-level lib, please refer to the second paragraph in the doc. You can attach a custom message blob(string) to payload using NVDS_CUSTOM_MSG_BLOB type by creating NvDsCustomMsgInfo object and specifying the fields message (custom message blob) and len (size of the custom message). please refer to the sample code osd_sink_pad_buffer_image_probe in opt/nvidia/deepstream/deepstream-7.1/sources/apps/sample_apps/deepstream-test4/deepstream_test4_app.c. especially the msg2p-newapi needed to be 1.