How to using kafka in Omniverse Kit

Hello, I am currently trying to implement a method to receive data via Kafka in Omniverse, but it seems that the method is incorrect as I am unable to receive messages.

Could you please take a look at the following code and point out any mistakes?

I referred to the documents below
Kafka — Omniverse Services latest documentation (nvidia.com)

[toml config file]

[settings.exts."omni.services.transport.server.kafka".consumers]
topics = ["test"]
bootstrap_servers = ["{ip}:9092"]
group_id = "mygroup"
service_calls = ["test/ping"]

[Extension Code]

import omni.ext
import omni.ui as ui

import pydantic
from omni.services.core import main
from omni.services.core import routers


router = routers.ServiceAPIRouter()

class KafkaModel(pydantic.BaseModel):
    topic: str = pydantic.Field("", title="Kafka Topic", description="The Kafka topic the request was consumed from")
    partition: int = pydantic.Field(0, title="Kafka partition")
    offset: int = pydantic.Field(0, title="Offset")
    key: str = pydantic.Field("", title="Request Key")
    value: dict = pydantic.Field(..., title="Kafka value payload", description="Kafka value, holds the data")
    timestamp: str = pydantic.Field("", title="Timestamp")
#print(dir(UsdPhysics.CollisionAPI))


@router.post("/ping")
def ping(data: KafkaModel):
    print(data.value)


class CompanyHelloWorldExtension(omni.ext.IExt):
    # ext_id is current extension id. It can be used with extension manager to query additional information, like where
    # this extension is located on filesystem.
    def __init__(self):
   
        self._count = 0
        self._test = 0
    def on_startup(self, ext_id):
        print("[Kafka test] startup")
        main.register_router(router=router, prefix='/test')

        self._window = ui.Window("Kafka test", width=500, height=300)
        with self._window.frame:
            with ui.VStack():
                label = ui.Label("")

                def on_reset():
                    
                    self._test = 0
                    label.text = f"count: {self._test}"
                    self._count = 0

                with ui.HStack():
                
                    ui.Button("test", clicked_fn=on_reset)


    def on_shutdown(self):
        main.deregister_router(router=router)
        print("[Kafka test] company hello world shutdown")

Below is the Python code I used to first check if message sending and receiving via Kafka was working

[producer]

from confluent_kafka import Producer

conf = {
    'bootstrap.servers': '{ip}:9092',  
}

producer = Producer(conf)

def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

for i in range(5):
    producer.produce('test', key=str(i), value='value_{}'.format(i), callback=delivery_report)

producer.flush()

[consumer]

from confluent_kafka import Consumer, KafkaError

conf = {
    'bootstrap.servers': '{ip}:9092',  
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(conf)

consumer.subscribe(['test'])

try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print('Reached end of topic {} partition {} at offset {}'.format(
                    msg.topic(), msg.partition(), msg.offset()))
            else:
                print('Error while consuming message: {}'.format(msg.error()))
        else:
            print('Received message (key={}, value={}) at offset {}'.format(
                msg.key(), msg.value(), msg.offset()))
except KeyboardInterrupt:
    pass
finally:
    consumer.close()