How to custom server maker python kafka data?

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU)
• DeepStream Version

when use python3 deepstream_test5.py -c test5_b16_dynamic_source.yaml -s source_list_dynamic.yaml in /opt/nvidia/deepstream/deepstream/service-maker/sources/apps/python/pipeline_api/deepstream_test5_app , the kafka can receive

class EventMessageGenerator(BatchMetadataOperator):
    """
    generate event message user metadata for downstream nvmsgconv
    to produce message payloads, which will be posted to the remote
    server by msgboker, e.g kafka
    """
    def __init__(self, sensor_map, labels):
        super().__init__()
        self._sensor_map = sensor_map
        self._labels = labels

    def handle_metadata(self,  batch_meta, frame_interval=1):
        for frame_meta in batch_meta.frame_items:
            frame_num = frame_meta.frame_number
            import pdb; pdb.set_trace()
            for object_meta in frame_meta.object_items:

                # if not (frame_num % frame_interval):
                event_msg = batch_meta.acquire_event_message_meta()
                if event_msg:
                    source_id = frame_meta.source_id
                    sensor_info = self._sensor_map[source_id] if source_id in self._sensor_map else None
                    # if sensor_info != None:
                    sensor_id = sensor_info.sensor_id if sensor_info else "N/A"
                    uri = sensor_info.uri if sensor_info else "N/A"
                    event_msg.generate(object_meta, frame_meta, sensor_id, uri, self._labels)
                    frame_meta.append(event_msg)

is this code is ok?

why change another model

also get

{
  "messageid": "eb444474-6cab-4542-a676-b239366ba2c3",
  "mdsversion": "1.0",
  "@timestamp": "2024-11-20T10:52:18.365Z",
  "place": null,
  "sensor": null,
  "analyticsModule": null,
  "object": {
    "id": "18446744073709551615",
    "speed": 0,
    "direction": 0,
    "orientation": 0,
    "vehicle": {
      "type": "",
      "make": "",
      "model": "",
      "color": "",
      "licenseState": "",
      "license": "",
      "confidence": 1
    },
    "bbox": {
      "topleftx": 748,
      "toplefty": 450,
      "bottomrightx": 935,
      "bottomrighty": 722
    },
    "location": {
      "lat": 0,
      "lon": 0,
      "alt": 0
    },
    "coordinate": {
      "x": 0,
      "y": 0,
      "z": 0
    },
    "pose": {

    }
  },
  "event": {
    "id": "bc31120a-6a30-44c0-bcae-d9fc1967026a",
    "type": "entry"
  },
  "videoPath": ""
}

no label, source url items, but has a vehicleitem, this is not in model lables

or can i add a python requests post a json to kafka function in a sink probe?

not use msgconv and msgbroker

what are the model labels? could you add log to print sensor_id? wondering why sensor_id is null.

this question is not related to the original issue. could you open a new topic to focus on the new question? Thanks!

when i use another yolo model, in the model s label.txt, only has custom object label , but msg from broker has a vehicle object, it not in my custom object label class

How to custom the structure of the message,

so that all objects in one frame are included in a message.

like this

{
  "image_id": "000000000001",
  "results": [
    {
      "label": "person",
      "confidence": 0.95,
      "bounding_box": [50, 60, 200, 250]
    },
    {
      "label": "bicycle",
      "confidence": 0.85,
      "bounding_box": [220, 180, 300, 350]
    },
    {
      "label": "car",
      "confidence": 0.90,
      "bounding_box": [320, 200, 500, 450]
    }
  ]
}

please set payload-type’s nvmsgconv to 1. then one message will include multiple objects.

1 Like

i know this

pipeline.txt (17.2 KB)
could you help me

why my custom payload do not effect

in log file has

Nov 22 02:55:11 ps python3: DSLOG:NVDS_KAFKA_PROTO: no matching json field found                 based on kafka key config; using default partition#012
Nov 22 02:55:11 ps python3: DSLOG:NVDS_KAFKA_PROTO: no matching json field found                 based on kafka key config; using default partition#012
Nov 22 02:55:11 ps python3: DSLOG:NVDS_KAFKA_PROTO: no matching json field found                 based on kafka key config; using default partition#012

could you elaborate the question?
“no matching json field found” is not an error. the related code is opensource. please refer to nvds_msgapi_send in \opt\nvidia\deepstream\deepstream\sources\libs\kafka_protocol_adaptor\nvds_kafka_proto.cpp

hello, my code is in the txt file,
i custom a msg payload,

i had searched in forums and docs . still dont know What is the procedure?

do you means just to only change nvds_kafka_proto.cppfile?

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU)

• DeepStream Version

• JetPack Version (valid for Jetson only)

• TensorRT Version

• NVIDIA GPU Driver Version (valid for GPU only)

• Issue Type( questions, new requirements, bugs)

• How to reproduce the issue ? (This is for bugs. Including which sample app is using, the configuration files content, the command line used and other details for reproducing)

• Requirement details( This is for new requirement. Including the module name-for which plugin or for which sample application, the function description)

• Hardware Platform (Jetson / GPU) 3090 GPU

• DeepStream Version 7.1 dockers nvcr.io/nvidia/deepstream:7.1-gc-triton-devel

• NVIDIA GPU Driver Version (valid for GPU only)

Mon Nov 25 03:27:21 2024       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 535.183.01             Driver Version: 535.183.01 

if the model detect “car”, “vehicle” will be added. are there cars in your test video? does every message include “vehicle” even if there are other kinds of objects?

my custom yolo model no ‘car’ label
my custom msg payload is below

{
        "source_id":1111,
        "pts":2222,
        "ntp":55564,
        "width":1920,
        "height":1080,
        "objects":[
            {
                "class_id": 0,
                "class_name": "car",
                "confidence": 0.8,
                "box": [12,23,35,46]
        },
            {
                "class_id": 1,
                "class_name": "person",
                "confidence": 0.8,
                "box": [12, 23, 35, 46]
            }

        ]

    }

My main issue is how to implement my custom message payload.

The code in my program is written like this, but it is not functioning.

from you configuration file. your model can detect car. please refer to my last comment. if the model detects car in the test video, it is expected that the message includes “vehicle” string.

Currently in servicemaker sample, label and url can’t be added to message because event_msg.generate is not opensource. please use python version sample deepstream_test_4.py. please refer to this topic for how to add new data.

the author said I have modified the add_custom_meta_to_buffer function from the mentioned previous issue:

dont know where modified ,

you can add msg_meta.otherAttrs = meta_str in this code. meta_str is your custom string.

As you said, I have conducted the following tests.

in 7.1 GPU docker

cd /opt/nvidia/deepstream/deepstream/sources/deepstream_python_apps/apps/deepstream-test4
vim deepstream_test_4.py
add msg_meta.otherAttrs = "12345yyyyy" line

user_event_meta = pyds.nvds_acquire_user_meta_from_pool(batch_meta)
if user_event_meta:
    # Allocating an NvDsEventMsgMeta instance and getting
    # reference to it. The underlying memory is not manged by
    # Python so that downstream plugins can access it. Otherwise
    # the garbage collector will free it when this probe exits.
    msg_meta = pyds.alloc_nvds_event_msg_meta(user_event_meta)
    msg_meta.bbox.top = obj_meta.rect_params.top
    msg_meta.bbox.left = obj_meta.rect_params.left
    msg_meta.bbox.width = obj_meta.rect_params.width
    msg_meta.bbox.height = obj_meta.rect_params.height
    msg_meta.frameId = frame_number
    msg_meta.trackingId = long_to_uint64(obj_meta.object_id)
    msg_meta.confidence = obj_meta.confidence
    msg_meta = generate_event_msg_meta(msg_meta, obj_meta.class_id)
    msg_meta.otherAttrs = "12345yyyyy"
    user_event_meta.user_meta_data = msg_meta
    user_event_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_EVENT_MSG_META
    pyds.nvds_add_user_meta_to_frame(frame_meta,
                                     user_event_meta)
else:
    print("Error in attaching event meta to buffer\n")

then run python3 deepstream_test_4.py -i /opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.h264 -p /opt/nvidia/deepstream/deepstream/lib/libnvds_kafka_proto.so --conn-str "localhost;9092" -t test5app --no-display

but in the kafka msg no a 12345yyyyy content

{
  "messageid": "6e8c5d61-3d7f-42fa-8d91-8d9ab12c4707",
  "mdsversion": "1.0",
  "@timestamp": "2024-11-26T06:51:11.231Z",
  "place": {
    "id": "1",
    "name": "XYZ",
    "type": "garage",
    "location": {
      "lat": 30.32,
      "lon": -40.55,
      "alt": 100
    },
    "entrance": {
      "name": "walsh",
      "lane": "lane1",
      "level": "P2",
      "coordinate": {
        "x": 1,
        "y": 2,
        "z": 3
      }
    }
  },
  "sensor": {
    "id": "CAMERA_ID",
    "type": "Camera",
    "description": "\"Entrance of Garage Right Lane\"",
    "location": {
      "lat": 45.293701447,
      "lon": -75.8303914499,
      "alt": 48.1557479338
    },
    "coordinate": {
      "x": 5.2,
      "y": 10.1,
      "z": 11.2
    }
  },
  "analyticsModule": {
    "id": "XYZ",
    "description": "\"Vehicle Detection and License Plate Recognition\"",
    "source": "OpenALR",
    "version": "1.0"
  },
  "object": {
    "id": "18446744073709551615",
    "speed": 0,
    "direction": 0,
    "orientation": 0,
    "person": {
      "age": 45,
      "gender": "male",
      "hair": "black",
      "cap": "none",
      "apparel": "formal",
      "confidence": 0.45377635955810547
    },
    "bbox": {
      "topleftx": 465,
      "toplefty": 480,
      "bottomrightx": 487,
      "bottomrighty": 516
    },
    "location": {
      "lat": 0,
      "lon": 0,
      "alt": 0
    },
    "coordinate": {
      "x": 0,
      "y": 0,
      "z": 0
    },
    "pose": {

    }
  },
  "event": {
    "id": "378502fe-8861-4eea-bc8c-6e9ecd385337",
    "type": "entry"
  },
  "videoPath": ""
}

please refer to " My solution for the second question:" part in this topic. you need to modify generate_event_message function of the eventmsg_payload.cpp, then rebuild, then replace the /opt/nvidia/deepstream/deepstream/lib/libnvds_msgconv.so with the new so.

when run deepstream_test_4.py, I made some changes to the code for testing purposes.

            user_event_meta = pyds.nvds_acquire_user_meta_from_pool(batch_meta)
            if user_event_meta:
                msg_meta = pyds.alloc_nvds_event_msg_meta(user_event_meta)
                msg_meta.bbox.top = 111
                msg_meta.bbox.left = 222
                msg_meta.bbox.width = 333
                msg_meta.bbox.height = 444
                msg_meta.frameId = 9999999
                msg_meta.confidence = obj_meta.confidence
                msg_meta = generate_event_msg_meta(msg_meta)
                # msg_meta.otherAttrs = json_str
                # msg_meta.otherAttrs = "11ttttt"
                user_event_meta.user_meta_data = msg_meta
                user_event_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_EVENT_MSG_META
                pyds.nvds_add_user_meta_to_frame(frame_meta,user_event_meta)
            else:
                print("Error in attaching event meta to buffer\n")

why in the msg, no a frameId key?

{
  "messageid": "a53e6017-3940-41e2-8e5a-fca21c3949cd",
  "mdsversion": "1.0",
  "@timestamp": "2024-11-27T03:18:57.727Z",
  "place": null,
  "sensor": null,
  "analyticsModule": null,
  "object": {
    "id": "0",
    "speed": 0,
    "direction": 0,
    "orientation": 0,
    "vehicle": {
      "type": "",
      "make": "",
      "model": "",
      "color": "",
      "licenseState": "",
      "license": "",
      "confidence": 1
    },
    "bbox": {
      "topleftx": 222,
      "toplefty": 111,
      "bottomrightx": 555,
      "bottomrighty": 555
    },
    "location": {
      "lat": 0,
      "lon": 0,
      "alt": 0
    },
    "coordinate": {
      "x": 0,
      "y": 0,
      "z": 0
    },
    "pose": {

    }
  },
  "event": {
    "id": "f51a110b-678e-4b0a-822f-86c1659c7f29",
    "type": "entry"
  },
  "videoPath": ""
}

frameId is not added in low-level code generate_object_object of opt\nvidia\deepstream\deepstream\sources\libs\nvmsgconv\deepstream_schema\eventmsg_payload.cpp. the low-level code is opensource. please refer to my last comment. you can add the following code in generate_object_object after adding tracking_id, then rebuild and replace the so file.

snprintf (tracking_id, sizeof(tracking_id), "%u", meta->frameId);
json_object_set_string_member (objectObj, "frameId", tracking_id);

will Segmentation fault (core dumped)

cd /opt/nvidia/deepstream/deepstream/sources/libs/nvmsgconv/deepstream_schema/
vim eventmsg_payload.cpp

in generate_event_message func add json_object_set_string_member (rootObj, "otherAttrs", meta->otherAttrs);

  message = json_to_string (rootNode, TRUE);
  json_node_free (rootNode);
  json_object_unref (rootObj);
  json_object_set_string_member (rootObj, "otherAttrs", meta->otherAttrs);

  return message;
cd ..
make && make install
deepstream_schema/dsmeta_payload.cpp: In function 'gchar* generate_dsmeta_message_protobuf(void*, void*, size_t&)':
deepstream_schema/dsmeta_payload.cpp:584:38: warning: 'void* g_memdup(gconstpointer, guint)' is deprecated: Use 'g_memdup2' instead [-Wdeprecated-declarations]
  584 |     gchar* message = (gchar*)g_memdup(msg_str.c_str(), message_len);
      |                              ~~~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
In file included from /usr/include/glib-2.0/glib.h:82,
                 from /usr/include/glib-2.0/gobject/gbinding.h:28,
                 from /usr/include/glib-2.0/glib-object.h:22,
                 from /usr/include/json-glib-1.0/json-glib/json-types.h:31,
                 from /usr/include/json-glib-1.0/json-glib/json-glib.h:29,
                 from deepstream_schema/dsmeta_payload.cpp:13:
/usr/include/glib-2.0/glib/gstrfuncs.h:257:23: note: declared here
  257 | gpointer              g_memdup         (gconstpointer mem,
      |                       ^~~~~~~~
cp -rv libnvds_msgconv.so /opt/nvidia/deepstream/deepstream-7.1/lib/
'libnvds_msgconv.so' -> '/opt/nvidia/deepstream/deepstream-7.1/lib/libnvds_msgconv.so'

cd /opt/nvidia/deepstream/deepstream/sources/deepstream_python_apps/apps/deepstream-test4

GST_DEBUG=3 python3 deepstream_test_4.py -i /opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.h264 -p /opt/nvidia/deepstream/deepstream/lib/libnvds_kafka_proto.so --conn-str "localhost;9092" -t test5app --no-display

0:00:00.218803907  8357 0x59e8ad4dff60 INFO                 nvinfer gstnvinfer.cpp:684:gst_nvinfer_logger:<primary-inference> NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::generateBackendContext() <nvdsinfer_context_impl.cpp:2195> [UID = 1]: Use deserialized engine model: /opt/nvidia/deepstream/deepstream-7.1/samples/models/Primary_Detector/resnet18_trafficcamnet_pruned.onnx_b1_gpu0_int8.engine
0:00:00.222858196  8357 0x59e8ad4dff60 INFO                 nvinfer gstnvinfer_impl.cpp:343:notifyLoadModelStatus:<primary-inference> [UID 1]: Load new model:dstest4_pgie_config.txt sucessfully
0:00:00.223242800  8357 0x59e8ad4dff60 WARN                 basesrc gstbasesrc.c:3688:gst_base_src_start_complete:<file-source> pad not activated yet
mimetype is video/x-raw
0:00:00.351970944  8357 0x78d3080013c0 WARN            v4l2videodec gstv4l2videodec.c:2297:gst_v4l2_video_dec_decide_allocation:<nvv4l2-decoder> Duration invalid, not setting latency
0:00:00.352037132  8357 0x78d3080013c0 WARN          v4l2bufferpool gstv4l2bufferpool.c:1130:gst_v4l2_buffer_pool_start:<nvv4l2-decoder:pool:src> Uncertain or not enough buffers, enabling copy threshold
0:00:00.353123118  8357 0x78d308001a00 WARN          v4l2bufferpool gstv4l2bufferpool.c:1607:gst_v4l2_buffer_pool_dqbuf:<nvv4l2-decoder:pool:src> Driver should never set v4l2_buffer.field to ANY
Frame Number = 0 Vehicle Count = 9 Person Count = 6
Segmentation fault (core dumped)