Using Kafka plugin to stream to remote Confluent Kafka Broker from Deepstream

• Hardware Platform (Jetson / GPU)
• DeepStream Version 6.0

Hi,
I have been able to successfully run the deepstream-test5 app and have it streaming to a local kafka broker.

Here’s the config for the broker:

[sink1]
enable=1
type=6
msg-conv-config=dstest5_msgconv_sample_config.txt
msg-conv-payload-type=0
msg-broker-proto-lib=/opt/nvidia/deepstream/deepstream-6.0/lib/libnvds_kafka_proto.so
msg-broker-conn-str=localhost;9092;test-events
topic=test-events

I am able connect to a standard managed cluster on Confluent Cloud with a simple python app using:

kafka_producer = KafkaProducer(
    security_protocol="SASL_SSL",
    sasl_mechanism="PLAIN",
    sasl_plain_username=KAFKA_SASL_USERNAME,
    sasl_plain_password=KAFKA_SASL_PASSWORD,
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS
)

I have tried running the deepstream-test5 app with the config:

[sink1]
enable=1
type=6
msg-conv-config=dstest5_msgconv_sample_config.txt
msg-conv-payload-type=0
msg-broker-proto-lib=/opt/nvidia/deepstream/deepstream-6.0/lib/libnvds_kafka_proto.so
msg-broker-conn-str=KAFKA_BOOTSTRAP_SERVERS;9092;test-KAFKA_TOPIC
topic=KAFKA_TOPIC
msg-broker-config=cfg_kafka.txt

The cfg_kafka.txt file:

[message-broker]
proto-cfg="security.protocol=sasl_ssl;sasl.mechanism=PLAIN;sasl.username=KAFKA_SASL_USERNAME;sasl.password=KAFKA_SASL_PASSWORD;debug=broker,security"

However, I round up with this error:

%7|1647520276.752|SASL|rdkafka#producer-1| [thrd:app]: Selected provider Cyrus for SASL mechanism GSSAPI
** ERROR: <main:1561>: Failed to set pipeline to PAUSED
Quitting
ERROR from sink_sub_bin_sink2: Could not configure supporting library.
Debug info: /dvs/git/dirty/git-master_linux/deepstream/sdk/src/gst-plugins/gst-nvmsgbroker/gstnvmsgbroker.cpp(402): legacy_gst_nvmsgbroker_start (): /GstPipeline:pipeline/GstBin:sink_sub_bin2/GstNvMsgBroker:sink_sub_bin_sink2:
unable to connect to broker library
ERROR from sink_sub_bin_sink2: GStreamer error: state change failed and some element failed to post a proper error message with the reason for the failure.
Debug info: gstbasesink.c(5265): gst_base_sink_change_state (): /GstPipeline:pipeline/GstBin:sink_sub_bin2/GstNvMsgBroker:sink_sub_bin_sink2:
Failed to start
App run failed

I’m not sure what I’m doing wrong. Please can someone point me in the right direction? Thanks.

Sorry for the late response, is this still an issue to support? Thanks

Thanks for the reply.
Yes please, it’s still an issue.

msg-broker-conn-str=KAFKA_BOOTSTRAP_SERVERS;9092;test-KAFKA_TOPIC
topic=KAFKA_TOPIC

From your config, topic is different, test-KAFKA_TOPIC vs topic=KAFKA_TOPIC
besides, can your system resolve KAFKA_BOOTSTRAP_SERVERS to right ip address?

That was probably a typo when I was posting the issue here

The config should rather be:

[sink1]
enable=1
type=6
msg-conv-config=dstest5_msgconv_sample_config.txt
msg-conv-payload-type=0
msg-broker-proto-lib=/opt/nvidia/deepstream/deepstream-6.0/lib/libnvds_kafka_proto.so
msg-broker-conn-str=KAFKA_BOOTSTRAP_SERVERS;9092;KAFKA_TOPIC
topic=KAFKA_TOPIC
msg-broker-config=cfg_kafka.txt

The error being thrown still persists and deepstream does not resolve the bootstrap servers to the correct ip address.

This is the error:

%7|1647520276.752|SASL|rdkafka#producer-1| [thrd:app]: Selected provider Cyrus for SASL mechanism GSSAPI
** ERROR: <main:1561>: Failed to set pipeline to PAUSED
Quitting
ERROR from sink_sub_bin_sink2: Could not configure supporting library.
Debug info: /dvs/git/dirty/git-master_linux/deepstream/sdk/src/gst-plugins/gst-nvmsgbroker/gstnvmsgbroker.cpp(402): legacy_gst_nvmsgbroker_start (): /GstPipeline:pipeline/GstBin:sink_sub_bin2/GstNvMsgBroker:sink_sub_bin_sink2:
unable to connect to broker library
ERROR from sink_sub_bin_sink2: GStreamer error: state change failed and some element failed to post a proper error message with the reason for the failure.
Debug info: gstbasesink.c(5265): gst_base_sink_change_state (): /GstPipeline:pipeline/GstBin:sink_sub_bin2/GstNvMsgBroker:sink_sub_bin_sink2:
Failed to start
App run failed

Please change KAFKA_BOOTSTRAP_SERVERS to the IP address and try again.

I did a DNS lookup on the URL to the Confluent Kafka bootstrap servers I’m trying to connect to. I tried all the IPs from the lookup and they all return the same error in my main post.

Sorry for the late response, is this still an issue to support? Thanks

Yes please, it’s still an issue.

How about adding remote broker address into /etc/hosts on your server running deepstream? like this:
IP host name

I’m closing this topic due to there is no update from you for a period, assuming this issue was resolved.
If still need the support, please open a new topic. Thanks

This doesn’t work either. I still get the error in my main post