Hello, I want to use MQTT to push the detection results, but when I open multiple ports and start different pipeline applications, I find that the previously started pipeline will report an error due to the MQTT uniqueID, causing the pipeline program to exit, as shown in the error below.
Then when I modify the DeepStream6.3 MQTT1.16.5 test code, compile and run it, although two MQTT connections are established, it seems that only one connection is effective.
ds error
[mosq_mqtt_log_callback] Client uniqueID sending PUBLISH (d0, q0, r0, m1, 'Test5', ... (298 bytes))
Publish callback with reason code: Success.
[mosq_mqtt_log_callback] Client uniqueID sending PUBLISH (d0, q0, r0, m44, 'Test7', ... (227 bytes))
Error sending repeat publish: The client is not currently connected.ERROR from sink_sub_bin_sink1: GStreamer encountered a general supporting library error.
Debug info: gstnvmsgbroker.cpp(564): legacy_gst_nvmsgbroker_render (): /GstPipeline:pipeline/GstBin:sink_sub_bin1/GstNvMsgBroker:sink_sub_bin_sink1:
failed to send the message. err(1)
0:00:48.797704925 1522 0x556281d2fd80 WARN nvinfer gstnvinfer.cpp:2397:gst_nvinfer_output_loop:<primary_gie> error: Internal data stream error.
0:00:48.797732062 1522 0x556281d2fd80 WARN nvinfer gstnvinfer.cpp:2397:gst_nvinfer_output_loop:<primary_gie> error: streaming stopped, reason error (-5)
ERROR from primary_gie: Internal data stream error.
Debug info: gstnvinfer.cpp(2397): gst_nvinfer_output_loop (): /GstPipeline:pipeline/GstBin:primary_gie_bin/GstNvInfer:primary_gie:
streaming stopped, reason error (-5)
Quitting
nvstreammux: Successfully handled EOS for source_id=0
nvstreammux: Successfully handled EOS for source_id=1
nvstreammux: Successfully handled EOS for source_id=2
nvstreammux: Successfully handled EOS for source_id=3
nvstreammux: Successfully handled EOS for source_id=4
nvstreammux: Successfully handled EOS for source_id=5
nvstreammux: Successfully handled EOS for source_id=6
nvstreammux: Successfully handled EOS for source_id=7
nvstreammux: Successfully handled EOS for source_id=8
ERROR from primary_gie_queue: Internal data stream error.
Debug info: gstqueue.c(988): gst_queue_handle_sink_event (): /GstPipeline:pipeline/GstBin:primary_gie_bin/GstQueue:primary_gie_queue:
streaming stopped, reason error (-5)
Error disconnecting: (The client is not currently connected.).
Active sources : 7
Wed May 22 09:24:25 2024
test_mqtt
#include<stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <dlfcn.h>
#include "nvds_msgapi.h"
#define NUM_THREADS 5
//Modify to reflect your own path
#define SO_PATH "/opt/nvidia/deepstream/deepstream/lib/"
#define MQTT_PROTO_SO "libnvds_mqtt_proto.so"
#define MQTT_PROTO_PATH SO_PATH MQTT_PROTO_SO
#define MQTT_CFG_FILE "./cfg_mqtt.txt"
#define MQTT_CFG_FILE2 "./cfg_mqtt2.txt"
//connection string format: host;port;username;password
#define MQTT_CONNECT_STR "192.168.99.18;1883"
#define MAX_LEN 256
NvDsMsgApiHandle (*nvds_msgapi_connect_ptr)(char *connection_str, nvds_msgapi_connect_cb_t connect_cb, char *config_path);
NvDsMsgApiErrorType (*nvds_msgapi_send_async_ptr)(NvDsMsgApiHandle conn, char *topic, const uint8_t *payload, size_t nbuf, nvds_msgapi_send_cb_t send_callback, void *user_ptr);
NvDsMsgApiErrorType (*nvds_msgapi_disconnect_ptr)(NvDsMsgApiHandle h_ptr);
void (*nvds_msgapi_do_work_ptr)(NvDsMsgApiHandle h_ptr);
NvDsMsgApiErrorType (*nvds_msgapi_subscribe_ptr)(NvDsMsgApiHandle conn, char **topics, int num_topics, nvds_msgapi_subscribe_request_cb_t cb, void *user_ctx);
char* (*nvds_msgapi_getversion_ptr)(void);
char* (*nvds_msgapi_get_protocol_name_ptr)(void);
NvDsMsgApiErrorType (*nvds_msgapi_connection_signature_ptr)(char *connection_str, char *config_path, char *output_str, int max_len);
int consumed_cnt = 0;
void connect_cb(NvDsMsgApiHandle h_ptr, NvDsMsgApiEventType evt) {
if(evt == NVDS_MSGAPI_EVT_SUCCESS)
printf("In sample prog: connect success \n");
else
printf("In sample prog: connect failed \n");
}
void send_callback (void *user_ptr, NvDsMsgApiErrorType completion_flag) {
if(completion_flag == NVDS_MSGAPI_OK)
printf("Message num %d : send success\n", *((int *) user_ptr));
else
printf("Message num %d : send failed\n", *((int *) user_ptr));
}
void subscribe_cb(NvDsMsgApiErrorType flag, void *msg, int len, char *topic, void *user_ptr) {
int *ptr = (int *) user_ptr;
if(flag == NVDS_MSGAPI_ERR) {
printf("Error in consuming message[%d] from MQTT broker\n", *ptr);
}
else {
printf("Consuming message[%d], on topic[%s]. Payload= %.*s\n", *ptr, topic, len, (const char *) msg);
}
consumed_cnt++;
}
int main(int argc, char **argv) {
void *so_handle;
if(argc < 2)
so_handle = dlopen(MQTT_PROTO_PATH, RTLD_LAZY);
else if(argc == 2)
so_handle = dlopen(argv[1], RTLD_LAZY);
else {
printf("Invalid arguments to sample applicaiton\n");
printf("Usage: \n\t./test_async [optional path_to_so_lib] \n\n");
exit(1);
}
char *error;
if (!so_handle) {
printf("unable to open shared library\n");
exit(-1);
}
*(void **) (&nvds_msgapi_connect_ptr) = dlsym(so_handle, "nvds_msgapi_connect");
*(void **) (&nvds_msgapi_send_async_ptr) = dlsym(so_handle, "nvds_msgapi_send_async");
*(void **) (&nvds_msgapi_disconnect_ptr) = dlsym(so_handle, "nvds_msgapi_disconnect");
*(void **) (&nvds_msgapi_do_work_ptr) = dlsym(so_handle, "nvds_msgapi_do_work");
*(void **) (&nvds_msgapi_subscribe_ptr) = dlsym(so_handle, "nvds_msgapi_subscribe");
*(void **) (&nvds_msgapi_getversion_ptr) = dlsym(so_handle, "nvds_msgapi_getversion");
*(void **) (&nvds_msgapi_get_protocol_name_ptr) = dlsym(so_handle, "nvds_msgapi_get_protocol_name");
*(void **) (&nvds_msgapi_connection_signature_ptr) = dlsym(so_handle, "nvds_msgapi_connection_signature");
if ((error = dlerror()) != NULL) {
fprintf(stderr, "%s\n", error);
exit(-1);
}
printf("Adapter protocol=%s , version=%s\n", nvds_msgapi_get_protocol_name_ptr(), nvds_msgapi_getversion_ptr());
char query_conn_signature[MAX_LEN];
char query_conn_signature2[MAX_LEN];
if(nvds_msgapi_connection_signature_ptr((char *) MQTT_CONNECT_STR, (char *) MQTT_CFG_FILE, query_conn_signature, MAX_LEN) != NVDS_MSGAPI_OK) {
printf("Error querying connection signature string. Exiting\n");
}
printf("connection string queried= %s\n", query_conn_signature);
printf("MQTT_CONNECT_STR= %s\n", MQTT_CONNECT_STR);
if(nvds_msgapi_connection_signature_ptr((char *) MQTT_CONNECT_STR, (char *) MQTT_CFG_FILE2, query_conn_signature2, MAX_LEN) != NVDS_MSGAPI_OK) {
printf("Error querying connection signature string. Exiting\n");
}
printf("connection string queried= %s\n", query_conn_signature2);
printf("ah2 MQTT_CONNECT_STR= %s\n", MQTT_CONNECT_STR);
//There are 2 options to provide connection string
//option 1: provide connection string as param to nvds_msgapi_connect()
//option 2: The full connection details in config file and connection params provided in nvds_msgapi_connect() as NULL
NvDsMsgApiHandle ah = nvds_msgapi_connect_ptr((char *) MQTT_CONNECT_STR, connect_cb, (char *) MQTT_CFG_FILE);
if(ah == NULL) {
printf("Connect to MQTT broker failed\n");
exit(0);
}
printf("Connect Success\n");
NvDsMsgApiHandle ah2 = nvds_msgapi_connect_ptr((char *) MQTT_CONNECT_STR, connect_cb, (char *) MQTT_CFG_FILE2);
if(ah2 == NULL) {
printf("ah2 Connect to MQTT broker failed\n");
exit(0);
}
printf("ah2 Connect Success\n");
if(ah == NULL) {
printf("AH1 Connect to MQTT broker failed\n");
exit(0);
}
printf("AH1 Connect Success\n");
//Subscribe to topics
const char *topics[] = {"topic1", "topic2", "topic3","topic4","topic5"};
int num_topics=5;
if(nvds_msgapi_subscribe_ptr(ah, (char **)topics, num_topics, subscribe_cb, &consumed_cnt) != NVDS_MSGAPI_OK) {
printf("MQTT subscription to topic[s] failed. Exiting \n");
exit(-1);
}
if(nvds_msgapi_subscribe_ptr(ah2, (char **)topics, num_topics, subscribe_cb, &consumed_cnt) != NVDS_MSGAPI_OK) {
printf("ah2 MQTT subscription to topic[s] failed. Exiting \n");
exit(-1);
}
for(int i=0; i < 10; i++) {
char msg[100];
char msg2[100];
sprintf(msg, "Hello%d\n", i);
sprintf(msg2, "topic2 Hello%d\n", i);
if(nvds_msgapi_send_async_ptr(ah, (char *) "topic4", (const uint8_t *)msg, strlen(msg), send_callback, &i) != NVDS_MSGAPI_OK)
printf("Message send failed\n");
nvds_msgapi_do_work_ptr(ah);
i++;
if(nvds_msgapi_send_async_ptr(ah2, (char *) "topic4", (const uint8_t *)msg2, strlen(msg2), send_callback, &i) != NVDS_MSGAPI_OK)
printf("Message2 send failed\n");
sleep(3.13);
nvds_msgapi_do_work_ptr(ah2);
}
// while(consumed_cnt < 10) {
// sleep(1);
// nvds_msgapi_do_work_ptr(ah); // need to continuously call do_work to process callbacks
// nvds_msgapi_do_work_ptr(ah2);
// }
printf("---------------------------\n");
sleep(1);
nvds_msgapi_disconnect_ptr(ah);
nvds_msgapi_disconnect_ptr(ah2);
}
root@8c12bf0f4d4f:/opt/nvidia/deepstream/deepstream-6.3/sources/libs/mqtt_protocol_adaptor# ./test_mqtt_proto_async
Adapter protocol=MQTT , version=4.0
connection string queried= 0392a25d243cab7f38d926cc8cd4891f9604a8eeba99e1b55a6935adda27547c
MQTT_CONNECT_STR= 192.168.99.18;1883
connection string queried= 569dd3a19d26f93bf4552d4280345b82c378ec714179d067e160bda668f6c24c
ah2 MQTT_CONNECT_STR= 192.168.99.18;1883
[mosq_mqtt_log_callback] Client uniqueID sending CONNECT
Connect Success
[mosq_mqtt_log_callback] Client uniqueID sending CONNECT
ah2 Connect Success
AH1 Connect Success
[mosq_mqtt_log_callback] Client uniqueID sending SUBSCRIBE (Mid: 1, Topic: topic1, QoS: 0, Options: 0x00)
[mosq_mqtt_log_callback] Client uniqueID sending SUBSCRIBE (Mid: 2, Topic: topic2, QoS: 0, Options: 0x00)
[mosq_mqtt_log_callback] Client uniqueID sending SUBSCRIBE (Mid: 3, Topic: topic3, QoS: 0, Options: 0x00)
[mosq_mqtt_log_callback] Client uniqueID sending SUBSCRIBE (Mid: 4, Topic: topic4, QoS: 0, Options: 0x00)
[mosq_mqtt_log_callback] Client uniqueID sending SUBSCRIBE (Mid: 5, Topic: topic5, QoS: 0, Options: 0x00)
[mosq_mqtt_log_callback] Client uniqueID sending SUBSCRIBE (Mid: 1, Topic: topic1, QoS: 0, Options: 0x00)
[mosq_mqtt_log_callback] Client uniqueID sending SUBSCRIBE (Mid: 2, Topic: topic2, QoS: 0, Options: 0x00)
[mosq_mqtt_log_callback] Client uniqueID sending SUBSCRIBE (Mid: 3, Topic: topic3, QoS: 0, Options: 0x00)
[mosq_mqtt_log_callback] Client uniqueID sending SUBSCRIBE (Mid: 4, Topic: topic4, QoS: 0, Options: 0x00)
[mosq_mqtt_log_callback] Client uniqueID sending SUBSCRIBE (Mid: 5, Topic: topic5, QoS: 0, Options: 0x00)
[mosq_mqtt_log_callback] Client uniqueID sending PUBLISH (d0, q0, r0, m6, 'topic4', ... (7 bytes))
Publish callback with reason code: Success.
Message num 0 : send success
[mosq_mqtt_log_callback] Client uniqueID sending PUBLISH (d0, q0, r0, m6, 'topic4', ... (14 bytes))
[mosq_mqtt_log_callback] Client uniqueID received CONNACK (2)
Connection error: Granted QoS 2
In sample prog: connect failed
[mosq_mqtt_log_callback] Client uniqueID sending PUBLISH (d0, q0, r0, m7, 'topic4', ... (7 bytes))
[mosq_mqtt_log_callback] Client uniqueID received CONNACK (0)
mqtt connection success; ready to send data
In sample prog: connect success
Publish callback with reason code: Success.
Message num 2 : send success
Error sending repeat publish: The client is not currently connected.Message2 send failed
[mosq_mqtt_log_callback] Client uniqueID sending PUBLISH (d0, q0, r0, m8, 'topic4', ... (7 bytes))
[mosq_mqtt_log_callback] Client uniqueID received SUBACK
Publish callback with reason code: Success.
Message num 4 : send success
Error sending repeat publish: The client is not currently connected.Message2 send failed
[mosq_mqtt_log_callback] Client uniqueID sending PUBLISH (d0, q0, r0, m9, 'topic4', ... (7 bytes))
[mosq_mqtt_log_callback] Client uniqueID received SUBACK
Publish callback with reason code: Success.
Message num 6 : send success
Error sending repeat publish: The client is not currently connected.Message2 send failed
[mosq_mqtt_log_callback] Client uniqueID sending PUBLISH (d0, q0, r0, m10, 'topic4', ... (7 bytes))
[mosq_mqtt_log_callback] Client uniqueID received SUBACK
Publish callback with reason code: Success.
Message num 8 : send success
Error sending repeat publish: The client is not currently connected.Message2 send failed
---------------------------
[mosq_mqtt_log_callback] Client uniqueID sending DISCONNECT
mqtt disconnected
Error disconnecting: (The client is not currently connected.).