Hello all, I am working on interfacing one of my own extensions with the KafkaConsumers service extension (omni.services.transport.server.kafka) included in the Extensions manager of Create.
I am following the documentation here: Kafka — Omniverse Developer documentation.
I set up extension.toml in the KafkaConsumers extension to listen to my Kafka topic running on a local machine. I can verify it is successfully connecting to my Kafka server via the “Assignment received…” message in my server stdout. What I am hoping to do is create another extension that responds to new messages from Kafka Consumers and stores the message value.
Where I am getting lost is how to connect my own extension to the outputs of this KafkaConsumers service (i.e. the messages KafkaConsumers consumes from my topic). My new extension MyExtension just consists of the following:
from omni.services.core import routers
router = routers.ServiceAPIRouter()
topic: str = pydantic.Field("", title="Kafka Topic", description="The Kafka topic the request was consumed from")
partition: int = pydantic.Field(0, title="Kafka partition")
offset: int = pydantic.Field(0, title="Offset")
key: str = pydantic.Field("", title="Request Key")
value: dict = pydantic.Field(..., title="Kafka value payload", description="Kafka value, holds the data")
timestamp: str = pydantic.Field("", title="Timestamp")
def ping(data: KafkaModel):
What more do I need to add to this extension to, for instance, print the message value every time a new message is received by Kafka Consumers? It seems like ping is meant to be a callback for KafkaConsumers to invoke, but nothing happens when I enable both extensions. I seem to be missing something conceptual here.
Hello @jetson_mason! Thank you for your interest in Kafka! I reached out to the development team for some more help in answering your questions. I will post back here when I have more information to share!
The bit that will tie the endpoint you are calling and the Kafka topic together is a few settings in the extension’s .toml config file or, if you run a larger application, you can override those values in that .kit file as well:
This will indicate that for any message sent to the topics foo and bar it’ll call test/ping, you can extend that list to have it call multiple endpoints and handle different topics.
I hope that helps but let me know if I can help in any other way.
Hi @jeenbergen , just gave that a try with the files you provided me (extracted them both to C:\Users\mcgou\Workspace\Omniverse\Extensions), but still no luck. Here are my results when running kit.exe:
Good catch. I moved them to their own folder because they were being shadowed by other extensions in that directory, but got a similar result. Here is what it returned. This is with PowerShell by the way.
>>> PS C:\Users\mcgou\AppData\Local\ov\pkg\kit-103.1.0> .\kit.exe C:\Users\mcgou\Workspace\Omniverse\Extensions\NvidiaKafkaExample\services.kafka.example.kit --ext-folder C:\Users\mcgou\Workspace\Omniverse\Extensions\NvidiaKafkaExample
Loading user config located at: 'C:/Users/mcgou/AppData/Local/ov/data/Kit/services.kafka.example/0.1/user.config.json'
[Info] [carb] Logging to file: C:/Users/mcgou/.nvidia-omniverse/logs/Kit/services.kafka.example/0.1/kit_20220516_163949.log
[0.354s] [ext: omni.usd.config-1.0.0] startup
[0.359s] [ext: omni.usd.libs-1.0.0] startup
[0.507s] [ext: omni.kit.async_engine-0.0.0] startup
[0.513s] [ext: omni.assets.plugins-0.0.0] startup
[0.519s] [ext: omni.client-0.1.0] startup
[0.536s] [ext: omni.kit.registry.nucleus-0.0.0] startup
2022-05-16 20:39:50 [498ms] [Warning] [omni.client.plugin] Tick: authentication: Could not resolve host name "kit-extensions.ov.nvidia.com"
2022-05-16 20:39:50 [499ms] [Warning] [omni.client.plugin] Tick: authentication: Could not resolve host name "kit-extensions.ov.nvidia.com"
2022-05-16 20:39:50 [501ms] [Warning] [omni.client.plugin] Tick: authentication: Could not resolve host name "kit-extensions.ov.nvidia.com"
2022-05-16 20:39:50 [504ms] [Error] [omni.ext.plugin] Failed to resolve extension dependencies. Failure hints:
[services.kafka.example-0.1.0] dependency: (name: 'omni.services.transport.server.kafka', tag: '', version: '', exact: 0, optional: 0) can't be satisfied. Available versions:
2022-05-16 20:39:50 [504ms] [Error] [omni.ext.plugin] Exiting app because of dependency solver failure...
[0.604s] [ext: omni.kit.registry.nucleus-0.0.0] shutdown
[0.607s] [ext: omni.client-0.1.0] shutdown
[0.616s] [ext: omni.assets.plugins-0.0.0] shutdown
[0.617s] [ext: omni.kit.async_engine-0.0.0] shutdown
[0.619s] [ext: omni.usd.libs-1.0.0] shutdown
[0.675s] [ext: omni.usd.config-1.0.0] shutdown
I keep seeing ‘Could not resolve host name “kit-extensions.ov.nvidia.com”’ come up. Could that be responsible somehow? I tried it again with Windows Defender disabled just to be sure it wasn’t that a firewall setting.
That is a bit weird but should be unrelated. That is the default extension repo internally and a bit surprised it shows up there but it looks like it now cannot find the omni.services.transport.server.kafka extension.
In your previous tests, did you download and enable that extension from the extension manager from inside Create for example?
But if you wanted to re-use the one you installed (which would be the same version as above) via Create you can find, in Create, where it has installed the extension and from there you could add another --ext-folder with the path to the root of the folder holding the extensions.
Unpacked that and gave it a try. .\kit.exe C:\Users\mcgou\Workspace\Omniverse\Extensions\NvidiaKafkaExample\services.kafka.example.kit --ext-folder C:\Users\mcgou\Workspace\Omniverse\Extensions\NvidiaKafkaExample --ext-folder C:\Users\mcgou\AppData\Local\ov\data\Kit\Create.Next\2021.3\exts\3\omni.services.transport.server.kafka-0.1.0+cp37
Got pretty much the same results as the previous command. The “kit-extensions.ov.nvidia.com” warnings came up repeatedly followed by the “Failed to resolve extension dependencies” error.
For the time being I will go with using the site-packages installation of kafka python. Later on I may try this again with a fresh install of Omniverse just to make sure there were no issues with the installation.