How to Kafka Integration in USD Composer

Hello, I’ve recently been working on a project where I need to pass messages to the USD COMPOSER, and I’ve been trying to integrate it with Kafka. However, I can’t seem to receive messages from Kafka.

I’d appreciate it if you could check and let me know if there’s anything wrong with what I’ve done. I followed the instructions from Kafka — services latest documentation (

  1. Kafka setup using a toml file

I wrote the toml file and ran it with kit.exe:

topics = [“test”]
bootstrap_servers = [“”]
group_id = “mygroup”
service_calls = [“/test/ping”]

and execute kit.exe

PS C:\Users\nuc> C:\Users\nuc\AppData\Local\ov\pkg\create-2022.3.3\kit\kit.exe --config-path=C:\Users\nuc\test.toml
[Info] [carb] Logging to file: C:/Users/nuc/.nvidia-omniverse/logs/Kit/kit/104.2/kit_20230827_180946.log

  1. Receiving messages in USD Composer through the Script editor

I wrote the following code and tested it in the USD Composer’s script editor, but no messages came through. When I tested the producer and consumer locally, they were able to send and receive messages normally.

from pydantic import BaseModel, Field
from import routers

router = routers.ServiceAPIRouter()

class KafkaModel(BaseModel):
    topic: str = Field("", title="Kafka Topic", description="The Kafka topic the request was consumed from")
    partition: int = Field(0, title="Kafka partition")
    offset: int = Field(0, title="Offset")
    key: str = Field("", title="Request key")
    value: dict = Field(..., title="Kafka value payload", description="Kafka value, holds the data")
    timestamp: str = Field("", title="Timestamp")"/ping")
def ping(data: KafkaModel):
class MyExtension(omni.ext.IExt):
    def on_startup(self, ext_id):
        main.register_router(router=router, prefix="/test")

    def on_shutdown(self):

Could you tell me what I might have missed?

Thank you, and have a great day!