Using nvmsgbroker to subscribe to topics and run custom handling logic when using Python bindings

• Hardware Platform: GPU
• DeepStream Version: 7.0
• NVIDIA GPU Driver Version: 535.129.03
• Issue Type: questions

Hello

I am trying to subscribe to a kafka topic and handle messages when they are recieved within my deepstream application.

I have read this documentation: Gst-nvmsgbroker.

It says that there is a property on nvmsgbroker called “subscribe-topic-list”. When I do the following in python application:

        msgbroker = self._make_element("nvmsgbroker", f"nvmsg-broker_{stream_index}")

        self.pipeline.add(msgbroker)

        msgbroker.set_property(
            "subscribe-topic-list", "camera_status"
        )

I get:

TypeError: object of type `GstNvMsgBroker' does not have property `subscribe-topic-list'

As well as this error I am not sure how to define the callback which is used in the underlying nv_msgbroker_subscribe() when using the Python bindings.

I have also looked at deepstream-test5 which in the readme says

By default, event based recording has been implmented to demonstrate the
usage of message consumer.
User need to implement the custom logic to work on other types of received
messages.

But I can find no examples of how to implement custom logic for messages in c++ or Python.

Is this something that is possible and if so could you please provide some direction?
Thanks

Hi @michaells

If you do a gst-inspect-1.0 nvmsgbroker you’ll see that’s not a property, try just topic. I think subscribe-topic-list is for the config file which you can also use.

Regarding the callbacks and messages I’m not familiar with kafka, but I can do some research.

Regards,
Allan Navarro

Embedded SW Engineer at RidgeRun

Contact us: support@ridgerun.com
Developers wiki: https://developer.ridgerun.com/
Website: www.ridgerun.com

subscribe-topic-list is a configuration of nvmsgborker, not a property.
please refer to DeepStream sample deepstream-test5, which already support sending messages to kafka and receiving messages from kafka. for example, \opt\nvidia\deepstream\deepstream\sources\apps\sample_apps\deepstream-test5\configs\test5_config_file_src_infer.txt. subscribe-topic-list can be found in this file.

Thats a good point on using gst-inspect Allan, thanks. I think the implementation would be the same regardless of the broker you use as long as it conforms to the nvds_msgapi protocol adapter interface. This interface defines a function that subscribes to a topic and one of its parameters is a callback nvds_msgapi_subscribe(). But how this is exposed on the kafka implementation (or any other implementation) I can’t find.

Hi fanzh

I have seen this when I’ve been looking at deepstream-test5 but I can’t find any examples of how to then define callbacks for when these messages are recieved.
So the config file allows me to specify what topics to subscribe to, then how to I handle those messages?

Thanks

deepstream-app is opensource. if setting [message-consumer0], deepstream-app will call start_cloud_to_device_messaging to do subscription. please this function in create_pipeline of \opt\nvidia\deepstream\deepstream\sources\apps\sample_apps\deepstream-app\deepstream_app.c. subscribe_cb is in opt\nvidia\deepstream\deepstream\sources\apps\apps-common\src\deepstream_c2d_msg.c

Thanks again for your help fanzh. A couple of questions:

Firstly, can this callback be set when using the Python bindings?

Secondly, please forgive me as my C isnt very good. In deepstream_app.c we have

 start_cloud_to_device_messaging (&config->message_consumer_config[i],
          NULL, &appCtx->pipeline.multi_src_bin);

The second argument is the callback and it looks to be hardcoded as NULL here? How does that allow for a callback to be set?

deepstream python code is opensource. you can modify to customize. please refer to think for how to build.
YES, please refer to this code for callback sample.

Here the callback is set to NULL. but in low-level code, a real callback function subscribe_cb is passed to nv_msgbroker_subscribe. in subscribeCb, custom callback function subscribeCb will be triggered.
if (ctx->subscribeCb) {
return ctx->subscribeCb (flag, msg, msg_len, topic, ctx->uData);

So just to be clear, the only way to use this functionality is to change the source code? I can not register a callback function for a topic I’ve subscribed to without changing the source code?

yes, DeepStream is a C++ SDK. you need to change the Python code because there is no binding code of the related nv_msgbroker_connect and nv_msgbroker_subscribe. there is a FAQ for how to binding a new function.