Nvmsgbroker message consumer in Python

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU) dGPU, NVIDIA GeForce RTX 4090
• DeepStream Version Docker container nvcr.io/nvidia/deepstream:7.1-triton-multiarch
• JetPack Version (valid for Jetson only)
• TensorRT Version
• NVIDIA GPU Driver Version (valid for GPU only) 560.70
• Issue Type( questions, new requirements, bugs) Question
• How to reproduce the issue ? (This is for bugs. Including which sample app is using, the configuration files content, the command line used and other details for reproducing)
• Requirement details( This is for new requirement. Including the module name-for which plugin or for which sample application, the function description)

I would like to set up message consuming logic using nvmsgbroker. According to this post:

you need to change the Python code because there is no binding code of the related nv_msgbroker_connect and nv_msgbroker_subscribe.

Ok, I’ve written the following binding, taking nvmsgbroker.h as a reference (haven’t written bindings to some typedefs because haven’t found examples for that):

// NvMsgBroker

#include "bindnvmsgbroker.hpp"

namespace py = pybind11;

namespace pydeepstream {

    void pydeepstream::bindnvmsgbroker(py::module &m) {
        /* Start of bindings for nvmsgbroker.h */
        py::enum_<NvMsgBrokerErrorType>(m, "NvMsgBrokerErrorType",
                                        pydsdoc::NvMsgBrokerDoc::NvMsgBrokerErrorTypeDoc::descr) // TO DO: ADD IT
            .value("NV_MSGBROKER_API_OK", NV_MSGBROKER_API_OK,
                   pydsdoc::NvMsgBrokerDoc::NvMsgBrokerErrorTypeDoc::NV_MSGBROKER_API_OK)
            .value("NV_MSGBROKER_API_ERR", NV_MSGBROKER_API_ERR,
                   pydsdoc::NvMsgBrokerDoc::NvMsgBrokerErrorTypeDoc::NV_MSGBROKER_API_ERR)
            .value("NV_MSGBROKER_API_RECONNECTING", NV_MSGBROKER_API_RECONNECTING,
                   pydsdoc::NvMsgBrokerDoc::NvMsgBrokerErrorTypeDoc::NV_MSGBROKER_API_RECONNECTING)
            .value("NV_MSGBROKER_API_NOT_SUPPORTED", NV_MSGBROKER_API_NOT_SUPPORTED,
                   pydsdoc::NvMsgBrokerDoc::NvMsgBrokerErrorTypeDoc::NV_MSGBROKER_API_NOT_SUPPORTED)
            .export_values();
    
        py::class_<NvMsgBrokerClientMsg>(m, "NvMsgBrokerClientMsg",
                                         pydsdoc::NvMsgBrokerDoc::NvMsgBrokerClientMsgDoc::descr)
            .def(py::init<>())
            .def_property("topic",
                          STRING_CHAR_ARRAY(NvMsgBrokerClientMsg, topic))
            .def_readwrite("payload", &NvMsgBrokerClientMsg::payload)
            .def_readwrite("payload_len", &NvMsgBrokerClientMsg::payload_len);

        m.def("nv_msgbroker_connect",
              &nv_msgbroker_connect,
              "broker_conn_str"_a,
              "broker_proto_lib"_a,
              "connect_cb"_a,
              "cfg"_a,
              pydsdoc::NvMsgBrokerDoc::nv_msgbroker_connect);

        m.def("nv_msgbroker_send_async",
              &nv_msgbroker_send_async,
              "h_ptr"_a,
              "message"_a,
              "cb"_a,
              "user_ctx"_a,
              pydsdoc::NvMsgBrokerDoc::nv_msgbroker_send_async);

        m.def("nv_msgbroker_subscribe",
              &nv_msgbroker_subscribe,
              "h_ptr"_a,
              "topics"_a,
              "num_topics"_a,
              "cb"_a,
              "user_ctx"_a,
              pydsdoc::NvMsgBrokerDoc::nv_msgbroker_subscribe);

        m.def("nv_msgbroker_disconnect",
              &nv_msgbroker_disconnect,
              "h_ptr"_a,
              pydsdoc::NvMsgBrokerDoc::nv_msgbroker_disconnect);

        m.def("nv_msgbroker_version",
              &nv_msgbroker_version,
              pydsdoc::NvMsgBrokerDoc::nv_msgbroker_version);
    }

}

I’ve also generated header for it and docstrings and recompiled the bindings. But what do I do next? How do I set up msgbroker? In deepstream-test4, msgbroker is created and configured in the following way:

...
    msgbroker = Gst.ElementFactory.make("nvmsgbroker", "nvmsg-broker")
    if not msgbroker:
        sys.stderr.write(" Unable to create msgbroker \n")
...
    msgbroker.set_property('proto-lib', proto_lib)
    msgbroker.set_property('conn-str', conn_str)
    if cfg_file is not None:
        msgbroker.set_property('config', cfg_file)
    if topic is not None:
        msgbroker.set_property('topic', topic)
    msgbroker.set_property('sync', False)

    print("Adding elements to Pipeline \n")
...
    pipeline.add(msgbroker)
...
    print("Linking elements in the Pipeline \n")
...
    msgconv.link(msgbroker)
...

What has to be changed now with new bindings to set message consumption? And is there anything I’ve missed?

please refer to the following cmd for how to sending messages to broker.

python3 deepstream_test_4.py -i /opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.h264 -p /opt/nvidia/deepstream/deepstream/lib/libnvds_kafka_proto.so   --conn-str="localhost;9092" -t deepstream -s 0 --no-display

if you need to receive message from broker. please port the logics in start_cloud_to_device_messaging of \opt\nvidia\deepstream\deepstream\sources\apps\apps-common\src\deepstream_c2d_msg.c to python code. subscribe_cb is subscribe callback function.

There is no update from you for a period, assuming this is not an issue anymore. Hence we are closing this topic. If need further support, please open a new one. Thanks

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.