Hi, any update on how to use the custom payload (python)? I m generating a custom JSON that consists of some fields I need and I want to send that to KAFKA.
Hi. I ended up using kafka-python and created a producer with that in my app. I didn’t figure out how to use message broker from deepstream with custom payload.
Thanks for your response, I am also using Kafka-python but is that the right approach? It didn’t cause any FPS drop so I think it won’t any issue. What do you think?
Hey customer, currently you can use Kafka-python as a WAR.
And we will have a demo for Custom payload for message broker in Python in later DS release.
How to use kafka-python to send customized payload?
I have “two ubuntu machines”, and both are under same WIFI network(one address is 172.20.10.2, the other is 172.20.10.7), I can use deepstream test4 python script successfully transmit the detected bounding box info through kafka by use the above ip. But I want the customized payload…
Thus, I tried some kafka-python scrpit.
For producer:
from time import sleep
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['172.20.10.7:9092'])
for e in range(100):
data = {'number' : e}
producer.send('demo01', value=data)
sleep(1)
For consumer:
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer(
'demo01',
bootstrap_servers=['172.20.10.7:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
for message in consumer:
print(message.value)
did not work… So need some suggestion or the executable code if possible!
hi, a0975003518
you can try following solution:
you can change line code: group_id='my-group' to group_id=str(uuid.uuid1())
import time
from kafka import KafkaConsumer
from json import loads
import uuid
from test import pre_process_message
consumer = KafkaConsumer(
'test',
bootstrap_servers='192.168.1.6:9092',
auto_offset_reset='latest',
enable_auto_commit=True,
group_id=str(uuid.uuid1()),
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
# do a dummy poll to retrieve some message
consumer.poll()
# go to end of the stream
consumer.seek_to_end()
for event in consumer:
event_data = event.value
print(event_data)