Hello, I’ve recently been working on a project where I need to pass messages to the USD COMPOSER, and I’ve been trying to integrate it with Kafka. However, I can’t seem to receive messages from Kafka.
I’d appreciate it if you could check and let me know if there’s anything wrong with what I’ve done. I followed the instructions from Kafka — services latest documentation (nvidia.com).
- Kafka setup using a toml file
I wrote the toml file and ran it with kit.exe:
topics = [“test”]
bootstrap_servers = [“10.34.181.156:9092”]
group_id = “mygroup”
service_calls = [“/test/ping”]
and execute kit.exe
PS C:\Users\nuc> C:\Users\nuc\AppData\Local\ov\pkg\create-2022.3.3\kit\kit.exe --config-path=C:\Users\nuc\test.toml
[Info] [carb] Logging to file: C:/Users/nuc/.nvidia-omniverse/logs/Kit/kit/104.2/kit_20230827_180946.log
- Receiving messages in USD Composer through the Script editor
I wrote the following code and tested it in the USD Composer’s script editor, but no messages came through. When I tested the producer and consumer locally, they were able to send and receive messages normally.
from pydantic import BaseModel, Field from omni.services.core import routers router = routers.ServiceAPIRouter() class KafkaModel(BaseModel): topic: str = Field("", title="Kafka Topic", description="The Kafka topic the request was consumed from") partition: int = Field(0, title="Kafka partition") offset: int = Field(0, title="Offset") key: str = Field("", title="Request key") value: dict = Field(..., title="Kafka value payload", description="Kafka value, holds the data") timestamp: str = Field("", title="Timestamp") @router.post("/ping") def ping(data: KafkaModel): print(data.value) class MyExtension(omni.ext.IExt): def on_startup(self, ext_id): main.register_router(router=router, prefix="/test") def on_shutdown(self): main.deregister_router(router=router)
Could you tell me what I might have missed?
Thank you, and have a great day!