Deepstream with AMQP message-consumer doesn't work

Please provide complete information as applicable to your setup.

**• Jetson Xavier AGX
**• Deepstream 5.0.1
**• Jetpack 4.4

I am using the amqp protocol adapter to send (using message-broker) and receive (using message-consumer0), but I am unable to get the consumer to work properly.

I am using proto-lib=/opt/nvidia/deepstream/deepstream-5.0/lib/libnvds_amqp_proto.so shipped with the Deepstream 5.0 code-base.

The message-broker is working perfect so far and the configuration is as follows:

[message-broker]
password = guest
hostname = localhost
username = guest
port = 5672
exchange = amq.topic
topic = skyhub
#share-connection = 1

However, I am unable to get the message-consumer0 to work, the current configuration is as follows:

[message-consumer0]
enable=1
proto-lib=/opt/nvidia/deepstream/deepstream-5.0/lib/libnvds_amqp_proto.so
conn-str=192.168.1.28;5672;guest;guest
subscribe-topic-list=topic.cascade
sensor-list-file=msgconv.txt

When I execute my application, I receive the error:

**** ERROR: <start_cloud_to_device_messaging:163>: Subscription to topic[s] failed**

I am currently using RabbitMQ as the broker and I’ve configured an exchange called “topic.cascade” with the type “topic”. I also created a binding that routes topic.cascade to a queue called topic.cascade.queue.

Just to verify that the topic was being set properly, I checked the topicList being passed to nv_msgbroker_subscribe:

#0 0x0000007fb7ab060c in nv_msgbroker_subscribe () at /opt/nvidia/deepstream/deepstream/lib/libnvds_msgbroker.so
#1 0x000000555556cba4 in start_cloud_to_device_messaging (config=0x7fb6f09718, subscribeCb=0x0, uData=0x7fa0be4068)
at /opt/nvidia/deepstream/deepstream/sources/apps/apps-common/src/deepstream_c2d_msg.c:162
#2 0x0000005555563f98 in create_pipeline (appCtx=0x7fa0be4010, bbox_generated_post_analytics_cb=0x555555c328 <bbox_generated_probe_after_analytics>, all_bbox_generated_cb=0x0, perf_cb=0x555555c790 <perf_cb>, overlay_graphics_cb=0x555555d9b0 <overlay_graphics>) at /opt/nvidia/deepstream/deepstream/sources/apps/sample_apps/deepstream-app/deepstream_app.c:1276
#3 0x000000555555ea98 in main (argc=2, argv=0x7fffffebf8) at cascade.c:1331
(gdb) p topicList[0]
*$5 = (gchar ) 0x555581a990 “topic.cascade”

The results of the topic list looks fine. Debugging the internals of libnvds_msgbroker.so is not straight-forward because it appears the binary was compiled with all debugging symbols and frame pointers removed.

This call below in deepstream_c2d_msg.c:start_cloud_to_device_messaging():

nv_msgbroker_subscribe(ctx->connHandle, topicList, numTopic, subscribe_cb, (gpointer)ctx);

nv_msgbroker_subscribe returns 2 which appears to map to NVDS_MSGAPI_UNKNOWN_TOPIC.

Details regarding my rabbitmq configuration:

steven@skyhub-tracker:/opt/nvidia/deepstream/deepstream-5.0/sources/apps$ rabbitmqadmin list exchanges
±-------------------±--------+
| name | type |
±-------------------±--------+
| | direct |
| amq.direct | direct |
| amq.fanout | fanout |
| amq.headers | headers |
| amq.match | headers |
| amq.rabbitmq.trace | topic |
| amq.topic | topic |
| topic.cascade | topic |
±-------------------±--------+

The second entry is the message-consumer0 that has successfully connected to rabbitmq. I put a gdb breakpoint after the nv_msgbroker_connect in order to illustrate I am in fact connecting to the rabbitmq server correctly.

steven@skyhub-tracker:/opt/nvidia/deepstream/deepstream-5.0/sources/apps$ rabbitmqadmin list connections
±----------------------------------------±------±---------+
| name | user | channels |
±----------------------------------------±------±---------+
| 127.0.0.1:37908 → 127.0.0.1:5672 | guest | 1 |
| 192.168.1.28:36024 → 192.168.1.28:5672 | guest | 0 |

Pictures of rabbitmq configuration from management UI:

topic.cascade exists as type “topic”:
Screenshot from 2020-10-31 16-16-33

RabbitMQ is setup to route messages from topic.cascade to the queue topic.cascade.queue:

What exactly am I missing here? What is the proper method of subscribing to a topic with the AMQP plugin for a message-consumer?

I’ve been digging through documentation and came across something that may be significant. From the documentation at Gst-nvmsgbroker — DeepStream 6.3 Release documentation, the description of the function nvds_msgapi_subscribe() states:

This API is used to subscribe to topic(s) and consume messages from the external entity. The API is asynchronous and must be called with an already created valid Kafka connection handle as parameter. The caller must also provide a pointer to the callback function to receive the consumed messages from the connection endpoint and an optional user_ctx pointer for specifying user context

I’m assuming that the nvds_msgapi_subscribe/nv_msgbroker_subscribe routines are used internally to subscribe to topics in the message consumer. Is it really the case where Kafka is only supported in the message-consumer/topic subscription?

The documentation is unclear in regards to the support in the nvmsgbroker between the amqp, kafka, and azure plugins. Deploying a Kafka infrastructure isn’t feasible in our current architecture. We definitely don’t want to deploy a java runtime, kafka and zookeeper.

I’ve continued to dig and this basically answers my question:

readelf -Ws libnvds_amqp_proto.so | grep nvds
12: 00000000000036d0 12 FUNC GLOBAL DEFAULT 9 nvds_msgapi_get_protocol_name
20: 0000000000002d18 552 FUNC GLOBAL DEFAULT 9 nvds_msgapi_connect
23: 0000000000000000 0 FUNC GLOBAL DEFAULT UND nvds_log
32: 0000000000002f40 372 FUNC GLOBAL DEFAULT 9 nvds_msgapi_send
38: 00000000000030b8 800 FUNC GLOBAL DEFAULT 9 nvds_msgapi_send_async
40: 0000000000002748 12 FUNC GLOBAL DEFAULT 9 nvds_msgapi_getversion
47: 0000000000000000 0 FUNC GLOBAL DEFAULT UND nvds_log_close
71: 00000000000033d8 712 FUNC GLOBAL DEFAULT 9 nvds_msgapi_do_work
72: 0000000000000000 0 FUNC GLOBAL DEFAULT UND nvds_log_open
78: 00000000000036a0 48 FUNC GLOBAL DEFAULT 9 nvds_msgapi_disconnect
80: 00000000000036e0 716 FUNC GLOBAL DEFAULT 9 nvds_msgapi_connection_signature

readelf -Ws libnvds_kafka_proto.so | grep nvds
16: 0000000000004068 12 FUNC GLOBAL DEFAULT 9 nvds_msgapi_get_protocol_name
24: 00000000000044d8 1404 FUNC GLOBAL DEFAULT 9 nvds_msgapi_connect
27: 0000000000000000 0 FUNC GLOBAL DEFAULT UND nvds_log
37: 0000000000003cb0 372 FUNC GLOBAL DEFAULT 9 nvds_msgapi_send
38: 0000000000003988 804 FUNC GLOBAL DEFAULT 9 nvds_msgapi_subscribe
45: 0000000000003e28 388 FUNC GLOBAL DEFAULT 9 nvds_msgapi_send_async
46: 0000000000004058 12 FUNC GLOBAL DEFAULT 9 nvds_msgapi_getversion
55: 0000000000000000 0 FUNC GLOBAL DEFAULT UND nvds_log_close
89: 0000000000003fb0 72 FUNC GLOBAL DEFAULT 9 nvds_msgapi_do_work
90: 0000000000000000 0 FUNC GLOBAL DEFAULT UND nvds_log_open
98: 0000000000003ff8 96 FUNC GLOBAL DEFAULT 9 nvds_msgapi_disconnect
102: 0000000000004a58 1372 FUNC GLOBAL DEFAULT 9 nvds_msgapi_connection_signature

readelf -Ws libnvds_azure_proto.so | grep nvds
14: 0000000000003bc8 12 FUNC GLOBAL DEFAULT 9 nvds_msgapi_get_protocol_name
21: 0000000000003398 424 FUNC GLOBAL DEFAULT 9 nvds_msgapi_connect
23: 0000000000000000 0 FUNC GLOBAL DEFAULT UND nvds_log
33: 0000000000003540 820 FUNC GLOBAL DEFAULT 9 nvds_msgapi_send
40: 0000000000003878 792 FUNC GLOBAL DEFAULT 9 nvds_msgapi_send_async
41: 00000000000027c8 12 FUNC GLOBAL DEFAULT 9 nvds_msgapi_getversion
47: 0000000000000000 0 FUNC GLOBAL DEFAULT UND nvds_log_close
69: 0000000000003b90 4 FUNC GLOBAL DEFAULT 9 nvds_msgapi_do_work
70: 0000000000000000 0 FUNC GLOBAL DEFAULT UND nvds_log_open
79: 0000000000003b98 48 FUNC GLOBAL DEFAULT 9 nvds_msgapi_disconnect
81: 0000000000003bd8 636 FUNC GLOBAL DEFAULT 9 nvds_msgapi_connection_signature

readelf -Ws libnvds_azure_edge_proto.so | grep nvds
10: 0000000000002c90 12 FUNC GLOBAL DEFAULT 9 nvds_msgapi_get_protocol_name
16: 0000000000002830 240 FUNC GLOBAL DEFAULT 9 nvds_msgapi_connect
25: 0000000000002920 396 FUNC GLOBAL DEFAULT 9 nvds_msgapi_send
27: 0000000000002ab0 412 FUNC GLOBAL DEFAULT 9 nvds_msgapi_send_async
28: 0000000000001df0 12 FUNC GLOBAL DEFAULT 9 nvds_msgapi_getversion
52: 0000000000002c50 4 FUNC GLOBAL DEFAULT 9 nvds_msgapi_do_work
59: 0000000000002c58 52 FUNC GLOBAL DEFAULT 9 nvds_msgapi_disconnect
61: 0000000000002ca0 20 FUNC GLOBAL DEFAULT 9 nvds_msgapi_connection_signature

I see that the kafka is the only plugin that current implements nvds_msgapi_subscribe which would be required for topic registration.

If you review https://docs.nvidia.com/metropolis/deepstream/dev-guide/text/DS_IoT.html:

The Edge-to-Cloud documentation is correct and all of the protocol plugins seem to support the behavior described.

The Cloud-to-Edge documentation is NOT correct. Documentation states, “You can choose between Kafka, AMQP, Azure IoT, or you can even create a custom adapter.”, but only Kafka will work. Either the documentation should be corrected to reflect the state of the libraries, or even better…update the libraries so they work as the documentation states.

Is it intentional that only kafka is supported in the current code-base? Is there anyway to get the source code for the amqp proto adapter and I can just add in the necessary support to add topic subscription support? It would be great to expand message-consumer support to amqp, and azure.

I hope this is helpful. I’ve tried to be as thorough and detail as possible, but I did leave out the disassembly of the libnvds_amqp_proto.so plugin (which is what made me realize the register api isn’t implemented in the plugin). Perhaps in the next release of the proto plugins you can leave the symbols in the plugin :-)

Thanks,

Steve

5 Likes

I have the same issue and need to get confirmation.

5 Likes

By running test_msgbroker, you can get the following info:“Test 1: Subscription not supported for amqp(amqp). Skipping subscription.”

5 Likes

Hi,
Will check internally if we will support amqp and azure for message consuming.

6 Likes

Thank you for responding.

Our architecture makes heavy use of rabbitmq (amqp) and we don’t currently have a mechanism to consume messages with deepstream.

I’ve reviewed the code and libraries in Deepstream 5.0.1 thoroughly and with the current code-base it looks like the only option is to use Kafka with the message-consumer. However, it’s not feasible to deploy Kafka/Zookeeper in our infrastructure.

I look forward to your response.

Thanks,

Steve

7 Likes

that would be awesome!

6 Likes

Thanks, that would be great - watching this thread with interest!

5 Likes

It’s on the roadmap for a future release.

5 Likes

Is the dev team open to publishing the source for the amqp protocol plugin? If they publish it, then we can send pull requests to add support to the plugin.

3 Likes

This is mandatory for embedded systems operating autonomously with restful interfaces.

4 Likes

Fantastic news. Any timeline for this would be much appreciated!

3 Likes

Dev team will discuss, once progress, will get back to you.

3 Likes

We’ll add this support in a future release. we are not open sourcing the adapter lib right now.
Thanks.

3 Likes

that’s pretty disappointing tbh - hope this can be settled soon

4 Likes

That is disappointing because it limits on how Deepstream can be leveraged in the cloud. We are currently writing a protocol plugin to fill the gap in Deepstream support using librabbitmq-c. Are there any details that can be shared about the implementation of NVIDIA’s proto plugin that is not currently available in the public documentation? Is there an ETA on the message-consumer support for AMQP?

2 Likes

@Amycao this is extremely discouraging to people trying to build on your platform. Nvidia needs to be more open and support the devs who are building products using their software and hardware. Please do what you can to get this moved up the chain and prioritized appropriately.

3 Likes

I’m having the same issue. I need AMQP and RabbitMQ support for a project I’m working on at the edge where I can’t install Kafka. Hoping this is prioritized in a future release.

2 Likes

I also need [message-consumer0] for AMQP support

A workaround is to deploy kafka on Jetson.

   zookeeper:
                  image: owlab/zookeeper-arm64

                  ports:

                      - 2181:2181

                  restart: always

      kafka:

                  image: owlab/kafka-arm64

                  hostname: kafka

                  links: 

                      - zookeeper

                  ports:

                      - 9092:9092

                      - 7203:7203

                  environment:

                      - KAFKA_ADVERTISED_HOST_NAME=192.168.1.104

                      - ZOOKEEPER_IP=zookeeper

                  restart: always

We have internal bug to track for this feature support, 3170030, thanks for your patience.

1 Like