Hello, I am currently trying to implement a method to receive data via Kafka in Omniverse, but it seems that the method is incorrect as I am unable to receive messages.
Could you please take a look at the following code and point out any mistakes?
I referred to the documents below
Kafka — Omniverse Services latest documentation (nvidia.com)
[toml config file]
[settings.exts."omni.services.transport.server.kafka".consumers]
topics = ["test"]
bootstrap_servers = ["{ip}:9092"]
group_id = "mygroup"
service_calls = ["test/ping"]
[Extension Code]
import omni.ext
import omni.ui as ui
import pydantic
from omni.services.core import main
from omni.services.core import routers
router = routers.ServiceAPIRouter()
class KafkaModel(pydantic.BaseModel):
topic: str = pydantic.Field("", title="Kafka Topic", description="The Kafka topic the request was consumed from")
partition: int = pydantic.Field(0, title="Kafka partition")
offset: int = pydantic.Field(0, title="Offset")
key: str = pydantic.Field("", title="Request Key")
value: dict = pydantic.Field(..., title="Kafka value payload", description="Kafka value, holds the data")
timestamp: str = pydantic.Field("", title="Timestamp")
#print(dir(UsdPhysics.CollisionAPI))
@router.post("/ping")
def ping(data: KafkaModel):
print(data.value)
class CompanyHelloWorldExtension(omni.ext.IExt):
# ext_id is current extension id. It can be used with extension manager to query additional information, like where
# this extension is located on filesystem.
def __init__(self):
self._count = 0
self._test = 0
def on_startup(self, ext_id):
print("[Kafka test] startup")
main.register_router(router=router, prefix='/test')
self._window = ui.Window("Kafka test", width=500, height=300)
with self._window.frame:
with ui.VStack():
label = ui.Label("")
def on_reset():
self._test = 0
label.text = f"count: {self._test}"
self._count = 0
with ui.HStack():
ui.Button("test", clicked_fn=on_reset)
def on_shutdown(self):
main.deregister_router(router=router)
print("[Kafka test] company hello world shutdown")
Below is the Python code I used to first check if message sending and receiving via Kafka was working
[producer]
from confluent_kafka import Producer
conf = {
'bootstrap.servers': '{ip}:9092',
}
producer = Producer(conf)
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
for i in range(5):
producer.produce('test', key=str(i), value='value_{}'.format(i), callback=delivery_report)
producer.flush()
[consumer]
from confluent_kafka import Consumer, KafkaError
conf = {
'bootstrap.servers': '{ip}:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(conf)
consumer.subscribe(['test'])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print('Reached end of topic {} partition {} at offset {}'.format(
msg.topic(), msg.partition(), msg.offset()))
else:
print('Error while consuming message: {}'.format(msg.error()))
else:
print('Received message (key={}, value={}) at offset {}'.format(
msg.key(), msg.value(), msg.offset()))
except KeyboardInterrupt:
pass
finally:
consumer.close()