Hello, I’ve recently been working on a project where I need to pass messages to the USD COMPOSER, and I’ve been trying to integrate it with Kafka. However, I can’t seem to receive messages from Kafka.
I’d appreciate it if you could check and let me know if there’s anything wrong with what I’ve done. I followed the instructions from Kafka — services latest documentation (nvidia.com).
- Kafka setup using a toml file
I wrote the toml file and ran it with kit.exe:
[settings.exts.“omni.services.transport.server.kafka”.consumers]
topics = [“test”]
bootstrap_servers = [“10.34.181.156:9092”]
group_id = “mygroup”
service_calls = [“/test/ping”]
and execute kit.exe
PS C:\Users\nuc> C:\Users\nuc\AppData\Local\ov\pkg\create-2022.3.3\kit\kit.exe --config-path=C:\Users\nuc\test.toml
[Info] [carb] Logging to file: C:/Users/nuc/.nvidia-omniverse/logs/Kit/kit/104.2/kit_20230827_180946.log
- Receiving messages in USD Composer through the Script editor
I wrote the following code and tested it in the USD Composer’s script editor, but no messages came through. When I tested the producer and consumer locally, they were able to send and receive messages normally.
from pydantic import BaseModel, Field
from omni.services.core import routers
router = routers.ServiceAPIRouter()
class KafkaModel(BaseModel):
topic: str = Field("", title="Kafka Topic", description="The Kafka topic the request was consumed from")
partition: int = Field(0, title="Kafka partition")
offset: int = Field(0, title="Offset")
key: str = Field("", title="Request key")
value: dict = Field(..., title="Kafka value payload", description="Kafka value, holds the data")
timestamp: str = Field("", title="Timestamp")
@router.post("/ping")
def ping(data: KafkaModel):
print(data.value)
class MyExtension(omni.ext.IExt):
def on_startup(self, ext_id):
main.register_router(router=router, prefix="/test")
def on_shutdown(self):
main.deregister_router(router=router)
Could you tell me what I might have missed?
Thank you, and have a great day!