Deepstream 6.3 MQTT clientID

Hello, I want to use MQTT to push the detection results, but when I open multiple ports and start different pipeline applications, I find that the previously started pipeline will report an error due to the MQTT uniqueID, causing the pipeline program to exit, as shown in the error below.
Then when I modify the DeepStream6.3 MQTT1.16.5 test code, compile and run it, although two MQTT connections are established, it seems that only one connection is effective.

ds error

[mosq_mqtt_log_callback] Client uniqueID sending PUBLISH (d0, q0, r0, m1, 'Test5', ... (298 bytes))
Publish callback with reason code: Success.
[mosq_mqtt_log_callback] Client uniqueID sending PUBLISH (d0, q0, r0, m44, 'Test7', ... (227 bytes))
Error sending repeat publish: The client is not currently connected.ERROR from sink_sub_bin_sink1: GStreamer encountered a general supporting library error.
Debug info: gstnvmsgbroker.cpp(564): legacy_gst_nvmsgbroker_render (): /GstPipeline:pipeline/GstBin:sink_sub_bin1/GstNvMsgBroker:sink_sub_bin_sink1:
failed to send the message. err(1)
0:00:48.797704925  1522 0x556281d2fd80 WARN                 nvinfer gstnvinfer.cpp:2397:gst_nvinfer_output_loop:<primary_gie> error: Internal data stream error.
0:00:48.797732062  1522 0x556281d2fd80 WARN                 nvinfer gstnvinfer.cpp:2397:gst_nvinfer_output_loop:<primary_gie> error: streaming stopped, reason error (-5)
ERROR from primary_gie: Internal data stream error.
Debug info: gstnvinfer.cpp(2397): gst_nvinfer_output_loop (): /GstPipeline:pipeline/GstBin:primary_gie_bin/GstNvInfer:primary_gie:
streaming stopped, reason error (-5)
Quitting
nvstreammux: Successfully handled EOS for source_id=0
nvstreammux: Successfully handled EOS for source_id=1
nvstreammux: Successfully handled EOS for source_id=2
nvstreammux: Successfully handled EOS for source_id=3
nvstreammux: Successfully handled EOS for source_id=4
nvstreammux: Successfully handled EOS for source_id=5
nvstreammux: Successfully handled EOS for source_id=6
nvstreammux: Successfully handled EOS for source_id=7
nvstreammux: Successfully handled EOS for source_id=8
ERROR from primary_gie_queue: Internal data stream error.
Debug info: gstqueue.c(988): gst_queue_handle_sink_event (): /GstPipeline:pipeline/GstBin:primary_gie_bin/GstQueue:primary_gie_queue:
streaming stopped, reason error (-5)
Error disconnecting: (The client is not currently connected.).
Active sources : 7
Wed May 22 09:24:25 2024

test_mqtt

#include<stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <dlfcn.h>
#include "nvds_msgapi.h"

#define NUM_THREADS 5
//Modify to reflect your own path
#define SO_PATH "/opt/nvidia/deepstream/deepstream/lib/"
#define MQTT_PROTO_SO "libnvds_mqtt_proto.so"
#define MQTT_PROTO_PATH SO_PATH MQTT_PROTO_SO
#define MQTT_CFG_FILE "./cfg_mqtt.txt"
#define MQTT_CFG_FILE2 "./cfg_mqtt2.txt"
//connection string format: host;port;username;password
#define MQTT_CONNECT_STR "192.168.99.18;1883"
#define MAX_LEN 256

NvDsMsgApiHandle (*nvds_msgapi_connect_ptr)(char *connection_str, nvds_msgapi_connect_cb_t connect_cb, char *config_path);
NvDsMsgApiErrorType (*nvds_msgapi_send_async_ptr)(NvDsMsgApiHandle conn, char *topic, const uint8_t *payload, size_t nbuf, nvds_msgapi_send_cb_t send_callback, void *user_ptr);
NvDsMsgApiErrorType (*nvds_msgapi_disconnect_ptr)(NvDsMsgApiHandle h_ptr);
void (*nvds_msgapi_do_work_ptr)(NvDsMsgApiHandle h_ptr);
NvDsMsgApiErrorType (*nvds_msgapi_subscribe_ptr)(NvDsMsgApiHandle conn, char **topics, int num_topics, nvds_msgapi_subscribe_request_cb_t  cb, void *user_ctx);
char* (*nvds_msgapi_getversion_ptr)(void);
char* (*nvds_msgapi_get_protocol_name_ptr)(void);
NvDsMsgApiErrorType (*nvds_msgapi_connection_signature_ptr)(char *connection_str, char *config_path, char *output_str, int max_len);

int consumed_cnt = 0;

void connect_cb(NvDsMsgApiHandle h_ptr, NvDsMsgApiEventType evt) {
    if(evt == NVDS_MSGAPI_EVT_SUCCESS)
        printf("In sample prog: connect success \n");
    else
        printf("In sample prog: connect failed \n");
}

void send_callback (void *user_ptr, NvDsMsgApiErrorType completion_flag) {
    if(completion_flag == NVDS_MSGAPI_OK)
        printf("Message num %d : send success\n", *((int *) user_ptr));
    else
        printf("Message num %d : send failed\n", *((int *) user_ptr));
}

void subscribe_cb(NvDsMsgApiErrorType flag, void *msg, int len, char *topic, void *user_ptr) {
  int *ptr = (int *) user_ptr;
  if(flag == NVDS_MSGAPI_ERR) {
    printf("Error in consuming message[%d] from MQTT broker\n", *ptr);
  }
  else {
    printf("Consuming message[%d], on topic[%s]. Payload= %.*s\n", *ptr, topic, len, (const char *) msg);
  }
  consumed_cnt++;
}


int main(int argc, char **argv) {
    void *so_handle;
    if(argc < 2)
        so_handle = dlopen(MQTT_PROTO_PATH, RTLD_LAZY);
    else if(argc == 2)
        so_handle = dlopen(argv[1], RTLD_LAZY);
    else {
        printf("Invalid arguments to sample applicaiton\n");
        printf("Usage: \n\t./test_async [optional path_to_so_lib] \n\n");
        exit(1);
    }
    char *error;
    if (!so_handle) {
        printf("unable to open shared library\n");
        exit(-1);
    }
    *(void **) (&nvds_msgapi_connect_ptr) = dlsym(so_handle, "nvds_msgapi_connect");
    *(void **) (&nvds_msgapi_send_async_ptr) = dlsym(so_handle, "nvds_msgapi_send_async");
    *(void **) (&nvds_msgapi_disconnect_ptr) = dlsym(so_handle, "nvds_msgapi_disconnect");
    *(void **) (&nvds_msgapi_do_work_ptr) = dlsym(so_handle, "nvds_msgapi_do_work");
    *(void **) (&nvds_msgapi_subscribe_ptr) = dlsym(so_handle, "nvds_msgapi_subscribe");
    *(void **) (&nvds_msgapi_getversion_ptr) = dlsym(so_handle, "nvds_msgapi_getversion");
    *(void **) (&nvds_msgapi_get_protocol_name_ptr) = dlsym(so_handle, "nvds_msgapi_get_protocol_name");
    *(void **) (&nvds_msgapi_connection_signature_ptr) = dlsym(so_handle, "nvds_msgapi_connection_signature");

    if ((error = dlerror()) != NULL)  {
        fprintf(stderr, "%s\n", error);
        exit(-1);
    }

    printf("Adapter protocol=%s , version=%s\n", nvds_msgapi_get_protocol_name_ptr(), nvds_msgapi_getversion_ptr());

    char query_conn_signature[MAX_LEN];
    char query_conn_signature2[MAX_LEN];
    if(nvds_msgapi_connection_signature_ptr((char *) MQTT_CONNECT_STR, (char *) MQTT_CFG_FILE, query_conn_signature, MAX_LEN) != NVDS_MSGAPI_OK) {
        printf("Error querying connection signature string. Exiting\n");
    }
    printf("connection string queried= %s\n", query_conn_signature);
    printf("MQTT_CONNECT_STR= %s\n", MQTT_CONNECT_STR);

    if(nvds_msgapi_connection_signature_ptr((char *) MQTT_CONNECT_STR, (char *) MQTT_CFG_FILE2, query_conn_signature2, MAX_LEN) != NVDS_MSGAPI_OK) {
        printf("Error querying connection signature string. Exiting\n");
    }
    printf("connection string queried= %s\n", query_conn_signature2);
    printf("ah2 MQTT_CONNECT_STR= %s\n", MQTT_CONNECT_STR);


    //There are 2 options to provide connection string
    //option 1: provide connection string as param to nvds_msgapi_connect()
    //option 2: The full connection details in config file and connection params provided in nvds_msgapi_connect() as NULL

    NvDsMsgApiHandle ah = nvds_msgapi_connect_ptr((char *) MQTT_CONNECT_STR, connect_cb, (char *) MQTT_CFG_FILE);
    if(ah == NULL) {
        printf("Connect to MQTT broker failed\n");
        exit(0);
    }
    printf("Connect Success\n");

    NvDsMsgApiHandle ah2 = nvds_msgapi_connect_ptr((char *) MQTT_CONNECT_STR, connect_cb, (char *) MQTT_CFG_FILE2);
    if(ah2 == NULL) {
        printf("ah2 Connect to MQTT broker failed\n");
        exit(0);
    }
    printf("ah2 Connect Success\n");

    if(ah == NULL) {
        printf("AH1 Connect to MQTT broker failed\n");
        exit(0);
    }
    printf("AH1 Connect Success\n");

    //Subscribe to topics
    const char *topics[] = {"topic1", "topic2", "topic3","topic4","topic5"};
    int num_topics=5;
    if(nvds_msgapi_subscribe_ptr(ah, (char **)topics, num_topics, subscribe_cb, &consumed_cnt) != NVDS_MSGAPI_OK) {
      printf("MQTT subscription to topic[s] failed. Exiting \n");
      exit(-1);
    }

    if(nvds_msgapi_subscribe_ptr(ah2, (char **)topics, num_topics, subscribe_cb, &consumed_cnt) != NVDS_MSGAPI_OK) {
      printf("ah2 MQTT subscription to topic[s] failed. Exiting \n");
      exit(-1);
    }

    for(int i=0; i < 10; i++) {
        char msg[100];
        char msg2[100];
        sprintf(msg, "Hello%d\n", i);
        sprintf(msg2, "topic2 Hello%d\n", i);
        if(nvds_msgapi_send_async_ptr(ah, (char *) "topic4", (const uint8_t *)msg, strlen(msg), send_callback, &i) != NVDS_MSGAPI_OK)
            printf("Message send failed\n");
        nvds_msgapi_do_work_ptr(ah);
        
        i++;

        if(nvds_msgapi_send_async_ptr(ah2, (char *) "topic4", (const uint8_t *)msg2, strlen(msg2), send_callback, &i) != NVDS_MSGAPI_OK)
            printf("Message2 send failed\n");
        sleep(3.13);
        nvds_msgapi_do_work_ptr(ah2);
    }

    // while(consumed_cnt < 10) {
    //   sleep(1);
    //   nvds_msgapi_do_work_ptr(ah); // need to continuously call do_work to process callbacks
    //   nvds_msgapi_do_work_ptr(ah2);
    // }
    printf("---------------------------\n");
    sleep(1);
    nvds_msgapi_disconnect_ptr(ah);
    nvds_msgapi_disconnect_ptr(ah2);
}
root@8c12bf0f4d4f:/opt/nvidia/deepstream/deepstream-6.3/sources/libs/mqtt_protocol_adaptor# ./test_mqtt_proto_async
Adapter protocol=MQTT , version=4.0
connection string queried= 0392a25d243cab7f38d926cc8cd4891f9604a8eeba99e1b55a6935adda27547c
MQTT_CONNECT_STR= 192.168.99.18;1883
connection string queried= 569dd3a19d26f93bf4552d4280345b82c378ec714179d067e160bda668f6c24c
ah2 MQTT_CONNECT_STR= 192.168.99.18;1883
[mosq_mqtt_log_callback] Client uniqueID sending CONNECT
Connect Success
[mosq_mqtt_log_callback] Client uniqueID sending CONNECT
ah2 Connect Success
AH1 Connect Success
[mosq_mqtt_log_callback] Client uniqueID sending SUBSCRIBE (Mid: 1, Topic: topic1, QoS: 0, Options: 0x00)
[mosq_mqtt_log_callback] Client uniqueID sending SUBSCRIBE (Mid: 2, Topic: topic2, QoS: 0, Options: 0x00)
[mosq_mqtt_log_callback] Client uniqueID sending SUBSCRIBE (Mid: 3, Topic: topic3, QoS: 0, Options: 0x00)
[mosq_mqtt_log_callback] Client uniqueID sending SUBSCRIBE (Mid: 4, Topic: topic4, QoS: 0, Options: 0x00)
[mosq_mqtt_log_callback] Client uniqueID sending SUBSCRIBE (Mid: 5, Topic: topic5, QoS: 0, Options: 0x00)
[mosq_mqtt_log_callback] Client uniqueID sending SUBSCRIBE (Mid: 1, Topic: topic1, QoS: 0, Options: 0x00)
[mosq_mqtt_log_callback] Client uniqueID sending SUBSCRIBE (Mid: 2, Topic: topic2, QoS: 0, Options: 0x00)
[mosq_mqtt_log_callback] Client uniqueID sending SUBSCRIBE (Mid: 3, Topic: topic3, QoS: 0, Options: 0x00)
[mosq_mqtt_log_callback] Client uniqueID sending SUBSCRIBE (Mid: 4, Topic: topic4, QoS: 0, Options: 0x00)
[mosq_mqtt_log_callback] Client uniqueID sending SUBSCRIBE (Mid: 5, Topic: topic5, QoS: 0, Options: 0x00)
[mosq_mqtt_log_callback] Client uniqueID sending PUBLISH (d0, q0, r0, m6, 'topic4', ... (7 bytes))
Publish callback with reason code: Success.
Message num 0 : send success
[mosq_mqtt_log_callback] Client uniqueID sending PUBLISH (d0, q0, r0, m6, 'topic4', ... (14 bytes))
[mosq_mqtt_log_callback] Client uniqueID received CONNACK (2)
Connection error: Granted QoS 2
In sample prog: connect failed 
[mosq_mqtt_log_callback] Client uniqueID sending PUBLISH (d0, q0, r0, m7, 'topic4', ... (7 bytes))
[mosq_mqtt_log_callback] Client uniqueID received CONNACK (0)
mqtt connection success; ready to send data
In sample prog: connect success 
Publish callback with reason code: Success.
Message num 2 : send success
Error sending repeat publish: The client is not currently connected.Message2 send failed
[mosq_mqtt_log_callback] Client uniqueID sending PUBLISH (d0, q0, r0, m8, 'topic4', ... (7 bytes))
[mosq_mqtt_log_callback] Client uniqueID received SUBACK
Publish callback with reason code: Success.
Message num 4 : send success
Error sending repeat publish: The client is not currently connected.Message2 send failed
[mosq_mqtt_log_callback] Client uniqueID sending PUBLISH (d0, q0, r0, m9, 'topic4', ... (7 bytes))
[mosq_mqtt_log_callback] Client uniqueID received SUBACK
Publish callback with reason code: Success.
Message num 6 : send success
Error sending repeat publish: The client is not currently connected.Message2 send failed
[mosq_mqtt_log_callback] Client uniqueID sending PUBLISH (d0, q0, r0, m10, 'topic4', ... (7 bytes))
[mosq_mqtt_log_callback] Client uniqueID received SUBACK
Publish callback with reason code: Success.
Message num 8 : send success
Error sending repeat publish: The client is not currently connected.Message2 send failed
---------------------------
[mosq_mqtt_log_callback] Client uniqueID sending DISCONNECT
mqtt disconnected
Error disconnecting: (The client is not currently connected.).

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU)

• DeepStream Version

• JetPack Version (valid for Jetson only)

• TensorRT Version

• NVIDIA GPU Driver Version (valid for GPU only)

• Issue Type( questions, new requirements, bugs)

• How to reproduce the issue ? (This is for bugs. Including which sample app is using, the configuration files content, the command line used and other details for reproducing)

• Requirement details( This is for new requirement. Including the module name-for which plugin or for which sample application, the function description)

  • gpu: nvidia1080ti
  • deeepsream6.3
  • deepstream docker
  • ubuntu 22.04

I found a problem similar to the one described above. When I run the deepstream-test5 sample and enable 2 MQTT sinks, only one of them is effective.

[sink0]
enable=1
#Type - 1=FakeSink 2=EglSink 3=File 4=UDPSink 5=nvdrmvideosink 6=MsgConvBroker
type=6
msg-conv-payload-type=257

msg-conv-msg2p-new-api=0
msg-conv-frame-interval=16
msg-broker-proto-lib= /opt/nvidia/deepstream/deepstream/lib/libnvds_mqtt_proto.so
#  Provide your msg-broker-conn-str here
msg-broker-conn-str=192.168.99.18;1883;PaDet
topic=PaDet
msg-broker-config=/opt/nvidia/deepstream/deepstream-6.3/sources/libs/mqtt_protocol_adaptor/cfg_mqtt.txt

[sink1]
enable=1
#Type - 1=FakeSink 2=EglSink 3=File 4=UDPSink 5=nvdrmvideosink 6=MsgConvBroker
type=6
msg-conv-payload-type=257

msg-conv-msg2p-new-api=0
msg-conv-frame-interval=16
msg-broker-proto-lib= /opt/nvidia/deepstream/deepstream/lib/libnvds_mqtt_proto.so
#  Provide your msg-broker-conn-str here
msg-broker-conn-str=192.168.99.18;1883;PaDet1
topic=PaDet1
msg-broker-config=/opt/nvidia/deepstream/deepstream-6.3/sources/libs/mqtt_protocol_adaptor/cfg_mqtt.txt

As the doc said, each client ID must be unique to connect to the same broker. please use different clientID if using multiple clients.

Thank you for your answer, it is indeed the case. I customized the random Client_uniqueID, recompiled the libnvds_mqtt_proto.so file, and the problem has been resolved. Thank you once again, and I wish you smooth work ahead!

    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<int> dis(0, 100);
    int random_number = dis(gen);
    string Client_uniqueID = "deepstream" + to_string(random_number);
    struct mosquitto* client = mosquitto_new(Client_uniqueID.c_str(), true, NULL);
1 Like

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.