Hello,
We are trying to implement DeepStream to our project and are currently working on integrating our application with Azure Event Hubs. For some initial testing purposes, we are using deepstream-test4 from deepstream_python_apps repository (deepstream_python_apps/tree/master/apps/deepstream-test4) within a Docker container (deepstream:7.1-gc-triton-devel). When we start the container, we install ./user_additional_install.sh, ffmpeg, ./samples/prepare_classification_test_video.sh, ./user_deepstream_python_apps_install.sh --build-bindings, ./update_rtpmanager.sh, nvdsinfer from GitHub - marcoslucianops/DeepStream-Yolo: NVIDIA DeepStream SDK 7.1 / 7.0 / 6.4 / 6.3 / 6.2 / 6.1.1 / 6.1 / 6.0.1 / 6.0 / 5.1 implementation for YOLO models, gst-nvtracker from /opt/nvidia/deepstream/deepstream-7.1/sources/gst-plugins/gst-nvtracker/ and GitHub - confluentinc/librdkafka: The Apache Kafka C/C++ library with tag v2.2.0. We are running it on the X86 architecture Nvidia GPU.
Our goal is to successfully send data from DeepStream pipeline to our Azure Event Hub (using the Kafka endpoint) using the Secure Edge-to-Cloud Messaging. While our sample test script just for sending sample data written in C using librdkafka was able to send data flawlessly to the Event Hub,when we run the deepstream-test4 pipeline it is encountering connection issues (and so is our own pipeline). <> are placeholders replaced with real values. The configuration of SAS policy in Azure is as follows: clams - Manage, Listen, Send.
When running the pipeline, we receive the following error message:
%3|1740161802.558|ERROR|rdkafka#producer-1| [thrd:app]: rdkafka#producer-1: <namespace>.servicebus.windows.net:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 22ms in state APIVERSION_QUERY, 5 identical error(s) suppressed)
Below is an example of our current command and configuration file:
Command:
python3 deepstream_test_4.py -i videooo.mp4 -p /opt/nvidia/deepstream/deepstream-7.1/lib/libnvds_kafka_proto.so --conn-str="<namespace>.servicebus.windows.net:9093" -s 0 -c cfg_kafka.txt
cfg_kafka.txt:
[message-broker]
# Consumer and partitioning settings:
consumer-group-id = mygrp
partition-key = sensor.id
# Azure Event Hubs (Kafka endpoint) settings:
bootstrap.servers = <namespace>.servicebus.windows.net:9093
security.protocol = sasl_ssl
sasl.mechanism = PLAIN
sasl.username = $ConnectionString
sasl.password = Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=<shared_acces_key_name>;SharedAccessKey=<shareacceskey>
Despite using these settings which work in our standalone C test (you can find it below) the DeepStream pipeline still fails with the ApiVersion error. We suspect that DeepStream’s Kafka integration might not be picking up the correct SASL_SSL configuration parameters. Should we change the configuration or command line parameters for DeepStream to ensure the proper use of SASL_SSL when connecting to Azure Event Hubs. Are there issues or improvements in DeepStream 7.1 (especially in the Docker container deepstream:7.1-gc-triton-devel) that might affect the connection to Azure Event Hubs? Is there anything else, that we might be missing out?
C test code:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <librdkafka/rdkafka.h>
// Delivery report callback: called once for each produced message
static void dr_msg_cb(rd_kafka_t *rk,
const rd_kafka_message_t *rkmessage,
void *opaque) {
if (rkmessage->err)
fprintf(stderr, "%% Delivery failed: %s\n",
rd_kafka_err2str(rkmessage->err));
else
fprintf(stdout,
"%% Message delivered to topic %s [partition %d] at offset %ld\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition,
rkmessage->offset);
}
int main(void) {
char errstr[512];
rd_kafka_t *rk; // Producer instance handle
rd_kafka_conf_t *conf; // Configuration object
/* Create Kafka configuration object */
conf = rd_kafka_conf_new();
/* Set the broker address for your Event Hubs namespace */
if (rd_kafka_conf_set(conf, "bootstrap.servers",
"<namespace>.servicebus.windows.net:9093",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% Error setting bootstrap.servers: %s\n", errstr);
exit(1);
}
/* Configure to use SASL_SSL */
if (rd_kafka_conf_set(conf, "security.protocol",
"sasl_ssl", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% Error setting security.protocol: %s\n", errstr);
exit(1);
}
/* Set SASL mechanism to PLAIN */
if (rd_kafka_conf_set(conf, "sasl.mechanism",
"PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% Error setting sasl.mechanism: %s\n", errstr);
exit(1);
}
/* For Azure Event Hubs, SASL username is always "$ConnectionString" */
if (rd_kafka_conf_set(conf, "sasl.username",
"$ConnectionString", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% Error setting sasl.username: %s\n", errstr);
exit(1);
}
/* Set SASL password to your Event Hubs connection string */
if (rd_kafka_conf_set(conf, "sasl.password",
"Endpoint=sb://<namespace.servicebus.windows.net/;SharedAccessKeyName=<shared_access_key_name>;SharedAccessKey=<shared_access_key_name",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% Error setting sasl.password: %s\n", errstr);
exit(1);
}
/* Enable full debug logging */
if (rd_kafka_conf_set(conf, "debug", "all", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% Error setting debug: %s\n", errstr);
exit(1);
}
/* Set delivery report callback */
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
/* Create producer instance */
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk) {
fprintf(stderr, "%% Failed to create producer: %s\n", errstr);
exit(1);
}
/* Create a topic object for your Event Hub (it acts as the Kafka topic) */
rd_kafka_topic_t *rkt;
rkt = rd_kafka_topic_new(rk, "<topic>", NULL);
if (!rkt) {
fprintf(stderr, "%% Failed to create topic object\n");
rd_kafka_destroy(rk);
exit(1);
}
/* The message payload to send */
const char *payload = "Hello Event Hubs from librdkafka!";
size_t payload_len = strlen(payload);
/* Produce a message on the topic */
if (rd_kafka_produce(
rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
(void *)payload, payload_len,
NULL, 0, NULL) == -1) {
fprintf(stderr, "%% Failed to produce message: %s\n",
rd_kafka_err2str(rd_kafka_last_error()));
} else {
fprintf(stdout, "%% Message queued successfully\n");
}
/* Wait for message deliveries */
fprintf(stdout, "%% Flushing messages...\n");
rd_kafka_flush(rk, 10000);
/* Clean up */
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);
return 0;
}