How to build a custom object to use on payloads for message broker with Python Bindings

• Hardware Platform (Jetson / GPU) Jetson Xavier NX
• DeepStream Version 5.1
• JetPack Version (valid for Jetson only) 4.5.1
• TensorRT Version7.1.3
• Issue Type( questions, new requirements, bugs) questions

Hi. I’m building an app with deepstream python bindings using the model for face mask detection (GitHub - NVIDIA-AI-IOT/face-mask-detection: Face Mask Detection using NVIDIA Transfer Learning Toolkit (TLT) and DeepStream for COVID-19). When it is detected a face with no mask I want to send a message with message broker (kafka) with a custom payload. I’m following the example shown in deepstream_test_4 (deepstream_python_apps/deepstream_test_4.py at v1.0 · NVIDIA-AI-IOT/deepstream_python_apps · GitHub) but I want to customize the payload. Instead of using the available objects (ex. pyds.NvDsPersonObject), I want to create a custom payload object to use in field extMsg of pyds.NvDsEventMsgMeta.

In documentation I noticed a NvDsPayloadObject https://docs.nvidia.com/metropolis/deepstream/python-api/PYTHON_API/NvDsMeta_Schema/NvDsPayload.html but no information is given on how to use this objects or it’s attributes.

How can I send a custom payload in my python app?

Thank you

Yeah, we will check internally.

Hi, any update on how to use the custom payload (python)? I m generating a custom JSON that consists of some fields I need and I want to send that to KAFKA.

Hi. I ended up using kafka-python and created a producer with that in my app. I didn’t figure out how to use message broker from deepstream with custom payload.

Thanks for your response, I am also using Kafka-python but is that the right approach? It didn’t cause any FPS drop so I think it won’t any issue. What do you think?

Hey customer, currently you can use Kafka-python as a WAR.
And we will have a demo for Custom payload for message broker in Python in later DS release.

1 Like

@bcao lakshay.chhabra @catia.mourao.896 ,

How to use kafka-python to send customized payload?
I have “two ubuntu machines”, and both are under same WIFI network(one address is 172.20.10.2, the other is 172.20.10.7), I can use deepstream test4 python script successfully transmit the detected bounding box info through kafka by use the above ip. But I want the customized payload…
Thus, I tried some kafka-python scrpit.
For producer:

from time import sleep
from json import dumps
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['172.20.10.7:9092'])
for e in range(100):
    data = {'number' : e}
    producer.send('demo01', value=data)
    sleep(1)

For consumer:

from kafka import KafkaConsumer
from json import loads

consumer = KafkaConsumer(
    'demo01',
     bootstrap_servers=['172.20.10.7:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: loads(x.decode('utf-8')))

for message in consumer:
    print(message.value)

did not work… So need some suggestion or the executable code if possible!

hi, a0975003518
you can try following solution:
you can change line code: group_id='my-group' to group_id=str(uuid.uuid1())

import time
from kafka import KafkaConsumer
from json import loads
import uuid 
from test import pre_process_message

consumer = KafkaConsumer(
    'test',
    bootstrap_servers='192.168.1.6:9092',
    auto_offset_reset='latest',
    enable_auto_commit=True,
    group_id=str(uuid.uuid1()),
    value_deserializer=lambda x: loads(x.decode('utf-8'))
)

# do a dummy poll to retrieve some message
consumer.poll()

# go to end of the stream
consumer.seek_to_end()

for event in consumer:
    event_data = event.value
    print(event_data)