Customize Gst-nvmsgbroker adaptor for MQTT using paho.mqtt.c

• Jetson Xavier AGX
• Deepstream 5.0.1
• Jetpack 4.4
• Issue Type: question

I need to customize a Gst-nvmsgbroker adaptor for MQTT using paho.mqtt.c

Complile custom adaptor using

sudo g++ nvds_mqtt_proto.cpp -I/opt/nvidia/deepstream/deepstream/sources/includes -I/usr/include/glib-2.0 -I/usr/lib/aarch64-linux-gnu/glib-2.0/include -I/home/nvidia/Downloads/cowtransfer/paho.mqtt.c-master/src /opt/nvidia/deepstream/deepstream-5.0/lib/ /home/nvidia/Downloads/cowtransfer/paho.mqtt.c-master/build/output/ -std=c++11 -fPIC -shared -o /opt/nvidia/deepstream/deepstream-5.0/lib/

and set configuration in test5_config_file_src_infer.txt


Run deepstream-test5-app using

sudo ./deepstream-test5-app -c ./configs/test5_config_file_src_infer.txt

Then error occurs like below.

** ERROR: <main:1451>: Failed to set pipeline to PAUSED
ERROR from sink_sub_bin_sink2: Could not initialize supporting library.
Debug info: /dvs/git/dirty/git-master_linux/deepstream/sdk/src/gst-plugins/gst-nvmsgbroker/gstnvmsgbroker.c(359): legacy_gst_nvmsgbroker_start (): /GstPipeline:pipeline/GstBin:sink_sub_bin2/GstNvMsgBroker:sink_sub_bin_sink2:
unable to open shared library
ERROR from sink_sub_bin_sink2: GStreamer error: state change failed and some element failed to post a proper error message with the reason for the failure.
Debug info: gstbasesink.c(5265): gst_base_sink_change_state (): /GstPipeline:pipeline/GstBin:sink_sub_bin2/GstNvMsgBroker:sink_sub_bin_sink2:
Failed to start
App run failed

What did I missed? How to solve it? If more information are needed please let me know. Thanks.

The gst-nvmsgbroker plugin is open source. You can find this error happens in legacy_gst_nvmsgbroker_start() function in /opt/nvidia/deepstream/deepstream-5.0/sources/gst-plugins/gst-nvmsgbroker/gstnvmsgbroker.c
The error is caused by dlopen() failure of your customized library. Please check whether you have set correct “msg-broker-proto-lib” in your test5_config_file_src_infer.txt file.

I have checked “msg-broker-proto-lib” in configuration file, path of which is the same as the customized libiary compile instruction specified. Am I compile the libiary in a wrong way or missed something to include?

Compile customized libiary using

sudo g++ nvds_mqtt_proto.cpp -I/opt/nvidia/deepstream/deepstream/sources/includes  /opt/nvidia/deepstream/deepstream-5.0/lib/ /home/nvidia/Downloads/cowtransfer/paho.mqtt.c-master/build/output/ -std=c++11 -fPIC -shared -o /opt/nvidia/deepstream/deepstream-5.0/lib/

And below is what in nvds_mqtt_proto.cpp, which has not been finished.

#include <string.h>
#include <iostream>
#include "nvds_logger.h"
#include "nvds_msgapi.h"
#include "MQTTClient.h"
using namespace std;

#define NVDS_MSGAPI_VERSION     "5.0"
#define QOS     1
#define TIMEOUT 10000L

bool is_valid_mqtt_connection_str(char* connection_str, string& burl, string& bport);

bool is_valid_mqtt_connection_str(char* connection_str, string& burl, string& bport) {
    if (connection_str == NULL) {
        nvds_log(NVDS_MQTT_LOG_CAT, LOG_ERR, "mqtt connection string cant be NULL");
        return false;

    string str(connection_str);
    size_t n=0;
    for(int i=0;i<str.length();i++)
        if (str[i]==';')

    if (n < 1) {
        nvds_log(NVDS_MQTT_LOG_CAT, LOG_ERR, "MQTT connection string format is invalid.");
        return false;
	char cp[25];
	const char sep[2] = ";";
	burl = strtok(cp, sep);
	bport = strtok(NULL,sep);

    if (burl == "" || bport == "") {
        nvds_log(NVDS_MQTT_LOG_CAT, LOG_ERR, "Kafka connection string is invalid. hostname or port is empty\n");
        return false;
    return true;

NvDsMsgApiHandle nvds_msgapi_connect(char* connection_str, nvds_msgapi_connect_cb_t connect_cb, char* config_path) {
    string burl = "", bport = "";
    if (!is_valid_mqtt_connection_str(connection_str, burl, bport)) 
        return NULL;

    MQTTClient client;
    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
    int rc;
    string address = "tcp://" + burl + ":" + bport;
    if ((rc = MQTTClient_create(&client,, "qwertyuiop1234567890", MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS) {
        nvds_log(NVDS_MQTT_LOG_CAT, LOG_ERR, ("Failed to create client, return code " + to_string(rc) + "\n").data());
        return NULL;

    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;
    if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
        nvds_log(NVDS_MQTT_LOG_CAT, LOG_ERR, ("Failed to connect, return code " + to_string(rc) + "\n").data());
        return NULL;
    nvds_log(NVDS_MQTT_LOG_CAT, LOG_INFO, "MQTT connection successful\n");
    return (NvDsMsgApiHandle)(client);

NvDsMsgApiErrorType nvds_msgapi_subscribe(NvDsMsgApiHandle h_ptr, char** topics, int num_topics, nvds_msgapi_subscribe_request_cb_t cb, void* user_ctx) {
    return NVDS_MSGAPI_OK;

NvDsMsgApiErrorType nvds_msgapi_send(NvDsMsgApiHandle h_ptr, char* topic, const uint8_t* payload, size_t nbuf) {
    MQTTClient* client = (MQTTClient*)h_ptr;
    int rc;
    if (h_ptr == NULL) {
        nvds_log(NVDS_MQTT_LOG_CAT, LOG_ERR, "MQTT connection handle passed for send() = NULL. Send failed\n");
        return NVDS_MSGAPI_ERR;
    if (topic == NULL || !strcmp(topic, "")) {
        nvds_log(NVDS_MQTT_LOG_CAT, LOG_ERR, "MQTT topic not specified.Send failed\n");
        return NVDS_MSGAPI_ERR;
    if (payload == NULL || nbuf <= 0) {
        nvds_log(NVDS_MQTT_LOG_CAT, LOG_ERR, "MQTT: Either send payload is NULL or payload length <=0. Send failed\n");
        return NVDS_MSGAPI_ERR;
    MQTTClient_message pubmsg = MQTTClient_message_initializer;
    pubmsg.payload = (void*)payload;
    pubmsg.payloadlen = nbuf;
    pubmsg.qos = QOS;
    pubmsg.retained = 0;
    MQTTClient_deliveryToken token;
    if ((rc = MQTTClient_publishMessage(client, topic, &pubmsg, &token)) != MQTTCLIENT_SUCCESS) {
        nvds_log(NVDS_MQTT_LOG_CAT, LOG_DEBUG, "Failed to publish message, return code %d\n", rc);
        return NVDS_MSGAPI_ERR;
    rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
    return NVDS_MSGAPI_OK;

NvDsMsgApiErrorType nvds_msgapi_send_async(NvDsMsgApiHandle h_ptr, char* topic, const uint8_t* payload, size_t nbuf, nvds_msgapi_send_cb_t send_callback, void* user_ptr) {
    return NVDS_MSGAPI_ERR;

void nvds_msgapi_do_work(NvDsMsgApiHandle h_ptr) {
    if (h_ptr == NULL) {
        nvds_log(NVDS_MQTT_LOG_CAT, LOG_ERR, "MQTT connection handle passed for dowork() = NULL. No actions taken\n");
    nvds_log(NVDS_MQTT_LOG_CAT, LOG_DEBUG, "nvds_msgapi_do_work\n");

NvDsMsgApiErrorType nvds_msgapi_disconnect(NvDsMsgApiHandle h_ptr) {
    if (!h_ptr) {
        nvds_log(NVDS_MQTT_LOG_CAT, LOG_DEBUG, "nvds_msgapi_disconnect called with null handle\n");
        return NVDS_MSGAPI_ERR;
    int rc;
    MQTTClient* client = (MQTTClient*)h_ptr;
    if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS)
        nvds_log(NVDS_MQTT_LOG_CAT, LOG_DEBUG, "Failed to disconnect, return code %d\n", rc);
    return NVDS_MSGAPI_OK;

char* nvds_msgapi_getversion() {
    return (char*)NVDS_MSGAPI_VERSION;

char* nvds_msgapi_get_protocol_name() {
    return (char*)NVDS_MSGAPI_PROTOCOL;

NvDsMsgApiErrorType nvds_msgapi_connection_signature(char* broker_str, char* cfg, char* output_str, int max_len) {
    return NVDS_MSGAPI_OK;

The log has shown you the error. What I can tell is just that it is the dlopen() failure in legacy_gst_nvmsgbroker_start() function in /opt/nvidia/deepstream/deepstream-5.0/sources/gst-plugins/gst-nvmsgbroker/gstnvmsgbroker.c.