Using Kafka Consumers in Omniverse Kit

Hello all, I am working on interfacing one of my own extensions with the KafkaConsumers service extension (omni.services.transport.server.kafka) included in the Extensions manager of Create.

I am following the documentation here: Kafka — Omniverse Developer documentation.
I set up extension.toml in the KafkaConsumers extension to listen to my Kafka topic running on a local machine. I can verify it is successfully connecting to my Kafka server via the “Assignment received…” message in my server stdout. What I am hoping to do is create another extension that responds to new messages from Kafka Consumers and stores the message value.

Where I am getting lost is how to connect my own extension to the outputs of this KafkaConsumers service (i.e. the messages KafkaConsumers consumes from my topic). My new extension MyExtension just consists of the following:

import pydantic

from omni.services.core import routers

router = routers.ServiceAPIRouter()

class KafkaModel(pydantic.BaseModel):
    topic: str = pydantic.Field("", title="Kafka Topic", description="The Kafka topic the request was consumed from")
    partition: int = pydantic.Field(0, title="Kafka partition")
    offset: int = pydantic.Field(0, title="Offset")
    key: str = pydantic.Field("", title="Request Key")
    value: dict = pydantic.Field(..., title="Kafka value payload", description="Kafka value, holds the data")
    timestamp: str = pydantic.Field("", title="Timestamp")

@router.post("/ping")
def ping(data: KafkaModel):
    print(data.value)

What more do I need to add to this extension to, for instance, print the message value every time a new message is received by Kafka Consumers? It seems like ping is meant to be a callback for KafkaConsumers to invoke, but nothing happens when I enable both extensions. I seem to be missing something conceptual here.

Hello @jetson_mason! Thank you for your interest in Kafka! I reached out to the development team for some more help in answering your questions. I will post back here when I have more information to share!

1 Like

Hi @jetson_mason , Apologies for the very late reply to this.

The bit that will tie the endpoint you are calling and the Kafka topic together is a few settings in the extension’s .toml config file or, if you run a larger application, you can override those values in that .kit file as well:

[settings.exts."omni.services.transport.server.kafka".consumers]
topics = [ "foo", "bar" ]
bootstrap_servers = ["localhost:9092"]
group_id = "my_group"
service_calls = ["test/ping"]

This will indicate that for any message sent to the topics foo and bar it’ll call test/ping, you can extend that list to have it call multiple endpoints and handle different topics.
I hope that helps but let me know if I can help in any other way.

Thanks,
Jozef

Hi @mason.mcgough the test bit would come from the registered endpoint but it depends a bit how you registered the router with omni.services.core itself.

Do you happen to have that bit of code where you register the endpoint in the extension and I can have a look?

It would pretty much be something like this bit here:

The way you register the router will dictate what the final full URI of the endpoint is which is then what you’d use in the mapping for service_calls.

ie:

from omni.services.core import main

main.register_router(router, prefix="/foo")

That would make the final URI of your service /foo/ping
And in the toml config you’d then have:

[settings.exts."omni.services.transport.server.kafka".consumers]
topics = ["store-aggregated-tracking"]
bootstrap_servers = ["192.168.1.19:9092"]
group_id = "omniverse_mason"
service_calls = ["foo/ping"]

Let me know if that helps.
Thanks,
Jozef

1 Like

Okay, my extension now registers/deregisters the router in its on_startup and on_shutdown methods. Here is everything I have now.

C:\Users\mcgou\Workspace\Omniverse\Extensions\KafkaPing\exts\omni.mason.kafka\config\extension.toml:

[settings.exts."omni.services.transport.server.kafka".consumers]
topics = ["my-topic"]
bootstrap_servers = ["localhost:9092"]
group_id = "my_group"
service_calls = ["kafka/ping"]

C:\Users\mcgou\Workspace\Omniverse\Extensions\KafkaPing\exts\omni.mason.kafka\omni\mason\kafka\extension.py:

from pydantic import BaseModel, Field

import omni.ext
from omni.services.core import main
from omni.services.core import routers

router = routers.ServiceAPIRouter()

class KafkaModel(BaseModel):
    topic: str = Field("", title="Kafka Topic", description="The Kafka topic the request was consumed from")
    partition: int = Field(0, title="Kafka partition")
    offset: int = Field(0, title="Offset") 
    key: str = Field("", title="Request Key")
    value: dict = Field(..., title="Kafka value payload", description="Kafka value, holds the data")
    timestamp: str = Field("", title="Timestamp")


@router.post("/ping")
def ping(data: KafkaModel):
    print(data.value)


class MyExtension(omni.ext.IExt):
    def on_startup(self, ext_id):
        main.register_router(router=router, prefix='/kafka')

    def on_shutdown(self):
        main.deregister_router(router=router)

Do you see anything wrong with this? My Kafka producer is sending messages to the Kafka server, but I do not see any responses in the Omniverse console.

That all looks ok from what I can tell. I’ll try and reproduce what you are seeing locally with the code.

Hi @jetson_mason,

I attached a working example along with an app file that should configure Kafka as well as the service itself.

You can invoke the app file with something along the lines of:
kit.sh (or .exe when on Windows) services.kafka.example.kit --ext-folder <path to root of omni.services.kafka.example >

omni.services.kafka.example.zip (243.5 KB)
services.kafka.example.kit (287 Bytes)

Let me know if anything comes up.

Thanks,
Jozef

Hi @jeenbergen , just gave that a try with the files you provided me (extracted them both to C:\Users\mcgou\Workspace\Omniverse\Extensions), but still no luck. Here are my results when running kit.exe:

>>> PS C:\Users\mcgou\AppData\Local\ov\pkg\kit-103.1.0> .\kit.exe C:\Users\mcgou\Workspace\Omniverse\Extensions\services.kafka.example.kit --ext-folder C:\Users\mcgou\Workspace\Omniverse\Extensions\omni.services.kafka.example
Loading user config located at: 'C:/Users/mcgou/AppData/Local/ov/data/Kit/services.kafka.example/0.1/user.config.json'
[Info] [carb] Logging to file: C:/Users/mcgou/.nvidia-omniverse/logs/Kit/services.kafka.example/0.1/kit_20220516_155111.log
2022-05-16 19:51:11 [142ms] [Warning] [omni.ext.plugin] [ext: data] Extensions config 'extension.toml' doesn't exist 'c:/users/mcgou/workspace/omniverse/extensions/omni.services.kafka.example/data' or 'c:/users/mcgou/workspace/omniverse/extensions/omni.services.kafka.example/data/config'
2022-05-16 19:51:11 [143ms] [Warning] [omni.ext.plugin] [ext: docs] Extensions config 'extension.toml' doesn't exist 'c:/users/mcgou/workspace/omniverse/extensions/omni.services.kafka.example/docs' or 'c:/users/mcgou/workspace/omniverse/extensions/omni.services.kafka.example/docs/config'
2022-05-16 19:51:11 [144ms] [Warning] [omni.ext.plugin] [ext: omni] Extensions config 'extension.toml' doesn't exist 'c:/users/mcgou/workspace/omniverse/extensions/omni.services.kafka.example/omni' or 'c:/users/mcgou/workspace/omniverse/extensions/omni.services.kafka.example/omni/config'
2022-05-16 19:51:12 [278ms] [Warning] [omni.ext.plugin] [ext: data] Extensions config 'extension.toml' doesn't exist 'c:/users/mcgou/workspace/omniverse/extensions/omni.services.kafka.example/data' or 'c:/users/mcgou/workspace/omniverse/extensions/omni.services.kafka.example/data/config'
2022-05-16 19:51:12 [279ms] [Warning] [omni.ext.plugin] [ext: docs] Extensions config 'extension.toml' doesn't exist 'c:/users/mcgou/workspace/omniverse/extensions/omni.services.kafka.example/docs' or 'c:/users/mcgou/workspace/omniverse/extensions/omni.services.kafka.example/docs/config'
2022-05-16 19:51:12 [280ms] [Warning] [omni.ext.plugin] [ext: omni] Extensions config 'extension.toml' doesn't exist 'c:/users/mcgou/workspace/omniverse/extensions/omni.services.kafka.example/omni' or 'c:/users/mcgou/workspace/omniverse/extensions/omni.services.kafka.example/omni/config'
[0.426s] [ext: omni.usd.config-1.0.0] startup
[0.431s] [ext: omni.usd.libs-1.0.0] startup
[0.594s] [ext: omni.kit.async_engine-0.0.0] startup
[0.601s] [ext: omni.assets.plugins-0.0.0] startup
[0.608s] [ext: omni.client-0.1.0] startup
[0.626s] [ext: omni.kit.registry.nucleus-0.0.0] startup
2022-05-16 19:51:12 [588ms] [Warning] [omni.client.plugin]  Tick: authentication: Could not resolve host name "kit-extensions.ov.nvidia.com"
2022-05-16 19:51:12 [590ms] [Warning] [omni.client.plugin]  Tick: authentication: Could not resolve host name "kit-extensions.ov.nvidia.com"
2022-05-16 19:51:12 [590ms] [Warning] [omni.client.plugin]  Tick: authentication: Could not resolve host name "kit-extensions.ov.nvidia.com"
2022-05-16 19:51:12 [594ms] [Error] [omni.ext.plugin] Failed to resolve extension dependencies. Failure hints:
        [services.kafka.example-0.1.0] dependency: (name: 'omni.services.kafka.example', tag: '', version: '', exact: 0, optional: 0) can't be satisfied. Available versions:
         (none found)

2022-05-16 19:51:12 [594ms] [Error] [omni.ext.plugin] Exiting app because of dependency solver failure...
[0.711s] [ext: omni.kit.registry.nucleus-0.0.0] shutdown
[0.715s] [ext: omni.client-0.1.0] shutdown
[0.724s] [ext: omni.assets.plugins-0.0.0] shutdown
[0.724s] [ext: omni.kit.async_engine-0.0.0] shutdown
[0.727s] [ext: omni.usd.libs-1.0.0] shutdown
[0.784s] [ext: omni.usd.config-1.0.0] shutdown

Aaah for the --ext-folder go one folder up so it can find the omni.services.kafka.example extension.

.\kit.exe C:\Users\mcgou\Workspace\Omniverse\Extensions\services.kafka.example.kit --ext-folder C:\Users\mcgou\Workspace\Omniverse\Extensions

Good catch. I moved them to their own folder because they were being shadowed by other extensions in that directory, but got a similar result. Here is what it returned. This is with PowerShell by the way.

>>> PS C:\Users\mcgou\AppData\Local\ov\pkg\kit-103.1.0> .\kit.exe C:\Users\mcgou\Workspace\Omniverse\Extensions\NvidiaKafkaExample\services.kafka.example.kit --ext-folder C:\Users\mcgou\Workspace\Omniverse\Extensions\NvidiaKafkaExample
Loading user config located at: 'C:/Users/mcgou/AppData/Local/ov/data/Kit/services.kafka.example/0.1/user.config.json'
[Info] [carb] Logging to file: C:/Users/mcgou/.nvidia-omniverse/logs/Kit/services.kafka.example/0.1/kit_20220516_163949.log
[0.354s] [ext: omni.usd.config-1.0.0] startup
[0.359s] [ext: omni.usd.libs-1.0.0] startup
[0.507s] [ext: omni.kit.async_engine-0.0.0] startup
[0.513s] [ext: omni.assets.plugins-0.0.0] startup
[0.519s] [ext: omni.client-0.1.0] startup
[0.536s] [ext: omni.kit.registry.nucleus-0.0.0] startup
2022-05-16 20:39:50 [498ms] [Warning] [omni.client.plugin]  Tick: authentication: Could not resolve host name "kit-extensions.ov.nvidia.com"
2022-05-16 20:39:50 [499ms] [Warning] [omni.client.plugin]  Tick: authentication: Could not resolve host name "kit-extensions.ov.nvidia.com"
2022-05-16 20:39:50 [501ms] [Warning] [omni.client.plugin]  Tick: authentication: Could not resolve host name "kit-extensions.ov.nvidia.com"
2022-05-16 20:39:50 [504ms] [Error] [omni.ext.plugin] Failed to resolve extension dependencies. Failure hints:
        [services.kafka.example-0.1.0] dependency: (name: 'omni.services.transport.server.kafka', tag: '', version: '', exact: 0, optional: 0) can't be satisfied. Available versions:
         (none found)

2022-05-16 20:39:50 [504ms] [Error] [omni.ext.plugin] Exiting app because of dependency solver failure...
[0.604s] [ext: omni.kit.registry.nucleus-0.0.0] shutdown
[0.607s] [ext: omni.client-0.1.0] shutdown
[0.616s] [ext: omni.assets.plugins-0.0.0] shutdown
[0.617s] [ext: omni.kit.async_engine-0.0.0] shutdown
[0.619s] [ext: omni.usd.libs-1.0.0] shutdown
[0.675s] [ext: omni.usd.config-1.0.0] shutdown

I keep seeing ‘Could not resolve host name “kit-extensions.ov.nvidia.com”’ come up. Could that be responsible somehow? I tried it again with Windows Defender disabled just to be sure it wasn’t that a firewall setting.

That is a bit weird but should be unrelated. That is the default extension repo internally and a bit surprised it shows up there but it looks like it now cannot find the omni.services.transport.server.kafka extension.

In your previous tests, did you download and enable that extension from the extension manager from inside Create for example?

I did actually. Previously I was using Create to develop these extensions. Now I am using Code or running kit.exe explicitly like you suggested. What can I do to fix that?

There are a few ways, the easiest is probably grabbing this one and unpacking it along side the omni.services.kafka.example:
https://kit-102-1-public-extensions.s3.amazonaws.com/public/3/archives/omni.services.transport.server.kafka-0.1.0%2Bcp37.zip

But if you wanted to re-use the one you installed (which would be the same version as above) via Create you can find, in Create, where it has installed the extension and from there you could add another --ext-folder with the path to the root of the folder holding the extensions.

Unpacked that and gave it a try.
.\kit.exe C:\Users\mcgou\Workspace\Omniverse\Extensions\NvidiaKafkaExample\services.kafka.example.kit --ext-folder C:\Users\mcgou\Workspace\Omniverse\Extensions\NvidiaKafkaExample --ext-folder C:\Users\mcgou\AppData\Local\ov\data\Kit\Create.Next\2021.3\exts\3\omni.services.transport.server.kafka-0.1.0+cp37
Got pretty much the same results as the previous command. The “kit-extensions.ov.nvidia.com” warnings came up repeatedly followed by the “Failed to resolve extension dependencies” error.

For the time being I will go with using the site-packages installation of kafka python. Later on I may try this again with a fresh install of Omniverse just to make sure there were no issues with the installation.