AMQP bug regarding username and topic definition

Hello,
I have defined a broker as sink

[sink1]
enable=1
type=6
msg-conv-payload-type=1
msg-broker-proto-lib=/opt/nvidia/deepstream/deepstream-4.0/lib/libnvds_amqp_proto.so
msg-broker-conn-str=172.17.0.1;5672;guest
msg-broker-config=cfg_amqp.txt

cfg_amqp.txt:
[message-broker]
password = guest
hostname = rabbitmq
username = rabbit
port = 5672
exchange = detections
topic = camera_front_left

The msg-broker-conn-str contains the address, port and username.
I can receive the data on the exchange “detections”, but as routing key the username “guest” is used. I would rather expect as routing key the topic name, in my case “camera_front_left”.

Any updates on that?
I am getting further problems using 2 brokers at the same time. And I think, that it might have something to do with this.

I have broker 1, publishing detections with.

msg-broker-conn-str=172.17.0.1;5672;guest

cfg_amqp.txt:
[message-broker]
password = guest
hostname = rabbitmq
username = rabbit
port = 5672
exchange = detections
topic = camera_front_left

And broker 2 publishing frames with:

msg-broker-conn-str=172.17.0.1;5672;guest

cfg_amqp_frame.txt:
[message-broker]
password = guest
hostname = rabbitmq
username = rabbit
port = 5672
exchange = image
topic = image

In the gstnvmsgbroker.c I extended the function gst_nvmsgbroker_render() to check from which component the payload comes, to only publish the correct payload. Like this I make sure, that broker 1 only publishes detections and broker 2 only publishes images. I verified this and it works correctly.

Nevertheless the both topics (image and detections) receive both kind of payloads.

I am still confused about the msg-broker-conn-str. In the README of deepstream-test4 it says it should be host;port;username for AMQP. But in the api documentation it says the following url;port;topic. (https://docs.nvidia.com/metropolis/deepstream/dev-guide/DeepStream_Development_Guide/baggage/group__ee__nvmessaging__group.html#ga9300ddd79ee5d585c608d69f7999271f).

Though, when I put the topic, e.g. “image”, I can’t connect to the broker.

I would appreciate an investigation regarding this.

will have a check, and updated to you.

“topic” name is missing under [sink1] group. Please set topic under [sink1] group with that it should work.

For more details:
$ gst-inspect-1.0 nvmsgbroker

HI
Sorry for late reply, we support just one exchange in the current design.

Hey amycao,
We figured that.That’s why I have again two broker plugins, publishing on one the same exchange: detection-broker-plugin (publishing detections on exchange=“ds”, topic/routing key=“detections”) and image-broker-plugin (publishing frames on exchange=“ds”, topic/routing key=“images”).

When I run the pipeline with the image-broker-plugin only, the receiver gets the correct messages, i.e. for each camera stream (num = 5) one image per timestamp. Here is the receivers log, while each line represents a received message.

cam_id 2 - ts: 1575489634225857440 - bytes: 333881
cam_id 1 - ts: 1575489634225857440 - bytes: 273329
cam_id 3 - ts: 1575489634225857440 - bytes: 333881
cam_id 4 - ts: 1575489634225857440 - bytes: 273329
cam_id 0 - ts: 1575489634225857440 - bytes: 333881
--> correct
cam_id 3 - ts: 1575489635133929824 - bytes: 330921
cam_id 0 - ts: 1575489635133929824 - bytes: 330921
cam_id 4 - ts: 1575489635133929824 - bytes: 384033
cam_id 2 - ts: 1575489635133929824 - bytes: 330921
cam_id 1 - ts: 1575489635133929824 - bytes: 384033
--> correct
cam_id 0 - ts: 1575489636008646848 - bytes: 336727
cam_id 1 - ts: 1575489636008646848 - bytes: 397679
cam_id 2 - ts: 1575489636008646848 - bytes: 336727
cam_id 3 - ts: 1575489636008646848 - bytes: 336727
cam_id 4 - ts: 1575489636008646848 - bytes: 397679
--> correct

When I run both broker-plugins, the receiver gets messed up messages. Sometimes several times the same image, sometimes more than 5 per timestamp, sometimes also correct as expected. Here is an example output.

cam_id 4 - ts: 1575489187948383456 - bytes: 273329
cam_id 0 - ts: 1575489187948383456 - bytes: 333881
cam_id 1 - ts: 1575489187948383456 - bytes: 273329
cam_id 3 - ts: 1575489187948383456 - bytes: 333881
cam_id 2 - ts: 1575489187948383456 - bytes: 333881
--> correct
cam_id 4 - ts: 1575489188849397760 - bytes: 384033
cam_id 4 - ts: 1575489188849397760 - bytes: 384033
cam_id 1 - ts: 1575489188849397760 - bytes: 384033
cam_id 1 - ts: 1575489188849397760 - bytes: 384033
cam_id 3 - ts: 1575489188849397760 - bytes: 330921
cam_id 3 - ts: 1575489188849397760 - bytes: 330921
--> double images and too many
cam_id 4 - ts: 1575489189776421568 - bytes: 397681
cam_id 0 - ts: 1575489189776421568 - bytes: 336729
cam_id 2 - ts: 1575489189776421568 - bytes: 336729
cam_id 3 - ts: 1575489189776421568 - bytes: 336729
cam_id 1 - ts: 1575489189776421568 - bytes: 397681
--> correct
cam_id 2 - ts: 1575489190698302976 - bytes: 295949
cam_id 3 - ts: 1575489190698302976 - bytes: 295949
cam_id 1 - ts: 1575489190698302976 - bytes: 390629
cam_id 4 - ts: 1575489190698302976 - bytes: 390629
cam_id 0 - ts: 1575489190698302976 - bytes: 295949
-->correct
cam_id 2 - ts: 1575489191650070976 - bytes: 268913
cam_id 3 - ts: 1575489191650070976 - bytes: 268913
cam_id 3 - ts: 1575489191650070976 - bytes: 268913
cam_id 1 - ts: 1575489191650070976 - bytes: 391605
cam_id 1 - ts: 1575489191650070976 - bytes: 391605
--> double images
cam_id 2 - ts: 1575489192646531360 - bytes: 295737
cam_id 3 - ts: 1575489192646531360 - bytes: 295737
cam_id 4 - ts: 1575489192646531360 - bytes: 375049
cam_id 4 - ts: 1575489192646531360 - bytes: 375049
cam_id 0 - ts: 1575489192646531360 - bytes: 295737
--> double images
cam_id 2 - ts: 1575489193658376192 - bytes: 271901
cam_id 2 - ts: 1575489193658376192 - bytes: 271901
cam_id 4 - ts: 1575489193658376192 - bytes: 343189
cam_id 4 - ts: 1575489193658376192 - bytes: 343189
cam_id 1 - ts: 1575489193658376192 - bytes: 343189
--> double images

I hope my explanations are clear.
At least there are no detections on ds.images. But still…Can that be fixed?

Hi
Please use “msg-conv-comp-id” and “msg-broker-comp-id” fields, This will restrict msgconv and msgbroker to process messages from the components with the corresponding ids only, and also set corresponding componentId field in EventMsgMeta.

It is not really clear, what you exactly mean.

This is how I do it at the moment.
I set the msg-conv-comp-id and msg-broker-comp-id to the same value.
That means all image payloads have payload->componentId=2 and all detection payloads have the payload->componentId=1. They are set in the appropriate message-converter.

The image-broker only published payloads of payload->componentId=2 (equals to its msg-broker-comp-id) and the detection-broker only published payload->componentId=1 (equals to its msg-broker-comp-id).

Still I don’t see how it should help for the previously described problem, where some messages are published twice.

Thanks

Are you filling “componentId” field correctly in NvDsEventMsgMeta?

Hello. I don’t attach the images as NvDsEventMsgMeta. Instead I wrote a custom plugin, that converts the image in a payload directly and finally attaches NvDsPayload directly. There I set the payload->componentId correctly, yes.

HI
Sorry for a late reply, dev team will check this.

Hi
Our internal team confirmed that by modifying the test5 application to mimic the scenario as mentioned by you but didn’t find any duplicate messages being sent by broker.
It seems the issue with your modifications, you can verify first with test5 app with following modifications.

In function generate_event_msg_meta() after line 475 add the following code to mimic the scenario you are trying to do.

if (class_id == 0 ) {
meta->componentId = 1;
} else if (class_id == 2) {
meta->componentId = 2;
}

Hello,
Thanks for your reply and effort.

I made an example to reproduce it with the deepstream open source code…

Step 1.
Replace function bbox_generated_probe_after_analytics in deepstream_test5_app_main.c with the following function. It generates some dummy-objects to better understand what’s happening.

guint64 global_ts = 0;
static void
bbox_generated_probe_after_analytics (AppCtx * appCtx, GstBuffer * buf,
    NvDsBatchMeta * batch_meta, guint index)
{

  int counter = 0;
  for (NvDsMetaList * l_frame = batch_meta->frame_meta_list; l_frame != NULL;
      l_frame = l_frame->next) {
    NvDsFrameMeta *frame_meta = l_frame->data;
    // Generate dummy objects
        NvDsEventMsgMeta *msg_meta1 =
            (NvDsEventMsgMeta *) g_malloc0 (sizeof (NvDsEventMsgMeta));
        msg_meta1->sensorId = counter;
        msg_meta1->frameId = global_ts;
        msg_meta1->componentId = 1;
        msg_meta1->objClassId = 1;

        NvDsUserMeta *user_event_meta1 =
            nvds_acquire_user_meta_from_pool (batch_meta);
        if (user_event_meta1) {
          /*
           * Since generated event metadata has custom objects for
           * Vehicle / Person which are allocated dynamically, we are
           * setting copy and free function to handle those fields when
           * metadata copy happens between two components.
           */
          user_event_meta1->user_meta_data = (void *) msg_meta1;
          user_event_meta1->base_meta.batch_meta = batch_meta;
          user_event_meta1->base_meta.meta_type = NVDS_EVENT_MSG_META;
          user_event_meta1->base_meta.copy_func =
              (NvDsMetaCopyFunc) meta_copy_func;
          user_event_meta1->base_meta.release_func =
              (NvDsMetaReleaseFunc) meta_free_func;
          nvds_add_user_meta_to_frame (frame_meta, user_event_meta1);
        } else {
          g_print ("Error in attaching event meta to buffer\n");
        }
    
        NvDsEventMsgMeta *msg_meta2 =
            (NvDsEventMsgMeta *) g_malloc0 (sizeof (NvDsEventMsgMeta));
        msg_meta2->sensorId = counter;
        msg_meta2->frameId = global_ts;
        msg_meta2->componentId = 2;
        msg_meta2->objClassId = 0;

        NvDsUserMeta *user_event_meta2 =
            nvds_acquire_user_meta_from_pool (batch_meta);
        if (user_event_meta2) {
          /*
           * Since generated event metadata has custom objects for
           * Vehicle / Person which are allocated dynamically, we are
           * setting copy and free function to handle those fields when
           * metadata copy happens between two components.
           */
          user_event_meta2->user_meta_data = (void *) msg_meta2;
          user_event_meta2->base_meta.batch_meta = batch_meta;
          user_event_meta2->base_meta.meta_type = NVDS_EVENT_MSG_META;
          user_event_meta2->base_meta.copy_func =
              (NvDsMetaCopyFunc) meta_copy_func;
          user_event_meta2->base_meta.release_func =
              (NvDsMetaReleaseFunc) meta_free_func;
          nvds_add_user_meta_to_frame (frame_meta, user_event_meta2);
        } else {
          g_print ("Error in attaching event meta to buffer\n");
        }
        counter++;
    }

    global_ts++;
  
}

Step 2.
Replace generate_schema_message in nvmsgconv.cpp to send the appropriate data.

static gchar*
generate_schema_message (NvDsMsg2pCtx *ctx, NvDsEventMsgMeta *meta)
{
  JsonNode *rootNode;
  JsonObject *rootObj;
  gchar *message;

  // root object
  rootObj = json_object_new ();
  json_object_set_int_member (rootObj, "sensorId", meta->sensorId);
  json_object_set_int_member (rootObj, "frameId", meta->frameId);
  json_object_set_int_member (rootObj, "objClassId", meta->objClassId);
  
  rootNode = json_node_new (JSON_NODE_OBJECT);
  json_node_set_object (rootNode, rootObj);

  message = json_to_string (rootNode, TRUE);
  json_node_free (rootNode);
  json_object_unref (rootObj);

  return message;
}

Step 3.
Use the following config-files:

[application]
enable-perf-measurement=1
perf-measurement-interval-sec=5

[source0]
enable=1
type=3
uri=file:///opt/nvidia/deepstream/deepstream-4.0/samples/streams/sample_1080p_h264.mp4
num-sources=5
gpu-id=0
nvbuf-memory-type=0

[sink0]
enable=1
type=6
msg-conv-config=msgconv.txt
msg-conv-payload-type=0
msg-broker-proto-lib=/opt/nvidia/deepstream/deepstream-4.0/lib/libnvds_amqp_proto.so
#Provide your msg-broker-conn-str here
msg-broker-conn-str=rabbitmq;5672;guest
topic=detections2
msg-broker-config=cfg_amqp.txt
msg-conv-msg2p-lib=/opt/nvidia/deepstream/deepstream-4.0/lib/libnvds_msgconv.so
sync=0
msg-broker-comp-id=1

[sink1]
enable=1
type=6
msg-conv-config=robot_msgconv.txt
msg-conv-payload-type=0
msg-broker-proto-lib=/opt/nvidia/deepstream/deepstream-4.0/lib/libnvds_amqp_proto.so
#Provide your msg-broker-conn-str here
msg-broker-conn-str=rabbitmq;5672;guest
msg-broker-config=cfg_amqp.txt
topic=detections
msg-conv-msg2p-lib=/opt/nvidia/deepstream/deepstream-4.0/lib/libnvds_msgconv.so
sync=0
msg-broker-comp-id=2

[osd]
enable=1
gpu-id=0
border-width=1
text-size=15
text-color=1;1;1;1;
text-bg-color=0.3;0.3;0.3;1
font=Arial
show-clock=0
clock-x-offset=800
clock-y-offset=820
clock-text-size=12
clock-color=1;0;0;0
nvbuf-memory-type=0

[streammux]
gpu-id=0
live-source=1
batch-size=5
batched-push-timeout=40000
width=608
height=608
enable-padding=0
nvbuf-memory-type=0

[primary-gie]
enable=1
gpu-id=0
batch-size=5
bbox-border-color0=1;0;0;1
bbox-border-color1=0;1;1;1
bbox-border-color2=0;1;1;1
bbox-border-color3=0;1;0;1
nvbuf-memory-type=0
interval=0
gie-unique-id=1
config-file=../detectnet_v2_resnet_10.txt

[tests]
file-loop=1

while cfg_ampq.txt is:

[message-broker]
password = guest
#optional
hostname = rabbitmq
username = guest
port = 5672
exchange = ds
topic = detections

Objects of objClassId=0 are published on the exchange=ds and the routing_key=detections.
Objects of objClassId=1 are published on the exchange=ds and the routing_key=detections2.

For each source (total there are 5), one dummy object with objClassId=0 and one dummy object with objClassId=1 is created. So it is expected that for each batch, 5 objects are published on routing_key=detections and 5 objects are published on routing_key=detections2.

Here is an example output for exchange=ds and the routing_key=detections.
detections: frameId: 147, sensorId: 0, objClassId: 0
detections: frameId: 147, sensorId: 1, objClassId: 0
detections: frameId: 147, sensorId: 2, objClassId: 0
detections: frameId: 147, sensorId: 3, objClassId: 0
detections: frameId: 147, sensorId: 4, objClassId: 0

detections: frameId: 148, sensorId: 0, objClassId: 0
detections: frameId: 148, sensorId: 1, objClassId: 0
detections: frameId: 148, sensorId: 2, objClassId: 0
detections: frameId: 148, sensorId: 3, objClassId: 0
detections: frameId: 148, sensorId: 4, objClassId: 0
detections: frameId: 148, sensorId: 4, objClassId: 0

detections: frameId: 149, sensorId: 0, objClassId: 0
detections: frameId: 149, sensorId: 0, objClassId: 0
detections: frameId: 149, sensorId: 2, objClassId: 0
detections: frameId: 149, sensorId: 2, objClassId: 0
detections: frameId: 149, sensorId: 4, objClassId: 0

detections: frameId: 150, sensorId: 2, objClassId: 0
detections: frameId: 150, sensorId: 3, objClassId: 0
detections: frameId: 150, sensorId: 4, objClassId: 0

detections: frameId: 151, sensorId: 0, objClassId: 0
detections: frameId: 151, sensorId: 1, objClassId: 0
detections: frameId: 151, sensorId: 2, objClassId: 0
detections: frameId: 151, sensorId: 3, objClassId: 0
detections: frameId: 151, sensorId: 4, objClassId: 0

detections: frameId: 152, sensorId: 0, objClassId: 0
detections: frameId: 152, sensorId: 1, objClassId: 0
detections: frameId: 152, sensorId: 2, objClassId: 0
detections: frameId: 152, sensorId: 3, objClassId: 0
detections: frameId: 152, sensorId: 4, objClassId: 0

detections: frameId: 153, sensorId: 0, objClassId: 0
detections: frameId: 153, sensorId: 1, objClassId: 0
detections: frameId: 153, sensorId: 2, objClassId: 0
detections: frameId: 153, sensorId: 3, objClassId: 0
detections: frameId: 153, sensorId: 4, objClassId: 0

detections: frameId: 154, sensorId: 0, objClassId: 0
detections: frameId: 154, sensorId: 1, objClassId: 0
detections: frameId: 154, sensorId: 2, objClassId: 0
detections: frameId: 154, sensorId: 3, objClassId: 0
detections: frameId: 154, sensorId: 4, objClassId: 0

detections: frameId: 155, sensorId: 0, objClassId: 0
detections: frameId: 155, sensorId: 1, objClassId: 0
detections: frameId: 155, sensorId: 2, objClassId: 0
detections: frameId: 155, sensorId: 3, objClassId: 0
detections: frameId: 155, sensorId: 4, objClassId: 0

As you can see, most of the iterations are correct, but not all (148: duplicates, 149: duplicates, 150: missing)

For sure it is hard to tell, where lies the bug.
I checked the published objects also in gst_nvmsgbroker_render() before passing to self->nvds_msgapi_send_async and it seems correct. But since I can’t look into the self->nvds_msgapi_send_async code, I kindly ask you to test the here described scenario.

P.s.
In case you want to use the same script to receive the data on rabbitmq, I provide my python script here.

import sys
import pika
import time
import json
def on_detections(ch, method, properties, body):
    msg_dict = json.loads(body)
    print("%s: frameId: %d, sensorId: %d, objClassId: %d"%(method.routing_key, msg_dict["frameId"], msg_dict["sensorId"], msg_dict["objClassId"]))

if __name__ == '__main__':
    if len(sys.argv) == 2:
        exchange_name = sys.argv[1]
        routing_key = "#"
    if len(sys.argv) == 3:
        exchange_name = sys.argv[1]
        routing_key = sys.argv[2]
    else:
        print("Usage: python3 print_message_on_exchange.py <exchange_name> <routing key>")
        sys.exit(2)

    broker_connection = {"publish_on_rabbitMQ": True,
                         "host": "localhost",
                         "port": 5672,
                         "virtual_host": "/",
                         "username": "guest",
                         "password": "guest"}

    credentials = pika.PlainCredentials(broker_connection["username"], broker_connection["password"])
    parameters = pika.ConnectionParameters(broker_connection["host"],
                                        broker_connection["port"],
                                        broker_connection["virtual_host"],
                                        credentials=credentials)
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()

    # Consumer
    channel.exchange_declare(exchange=exchange_name, exchange_type='topic', durable=True)
    result = channel.queue_declare('', exclusive=True)
    queue_name = result.method.queue
    channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=routing_key)
    channel.basic_consume(queue_name, on_detections)

    try:
       channel.start_consuming()
    except KeyboardInterrupt:
        channel.stop_consuming()
    connection.close()

Hi
Sorry for a late response, I am trying to repro your issue, but still in progress, here is some thought:
we can not gurantee every time the sending message is ok, right? you can see from message broker plugin source
nvds_msgapi_send_callback, that’s why frame 150 got some iteration missing, will updated to you once more found.

Hi Amycao,
Thanks for your feedback. I don’t understand exactly what you want to say with your though. Shouldn’t there be an error message, if message has not been send: “error(%d) in sending data”.

static void
nvds_msgapi_send_callback (void *data, NvDsMsgApiErrorType status)
{
  GstNvMsgBroker *self = (GstNvMsgBroker *) data;

  g_mutex_lock (&self->flowLock);
  self->pendingCbCount--;
  self->lastError = status;

  if (status != NVDS_MSGAPI_OK) {
    GST_ERROR_OBJECT (self, "error(%d) in sending data", status);
  }
  g_mutex_unlock (&self->flowLock);
}

Thanks for digging into it.

Can you use GST_DEBUG=5 before your run command to capture the log?

I did that and only found the following logs regarding nvmsgbroker. Logs for both broker plugins.

0:01:55.104258629 19827 0x559898e2d0 DEBUG nvmsgbroker gstnvmsgbroker.c:415:gst_nvmsgbroker_render:<sink_sub_bin_sink1> render

0:01:55.066242633 19827 0x559898e720 DEBUG nvmsgbroker gstnvmsgbroker.c:415:gst_nvmsgbroker_render:<sink_sub_bin_sink2> render

Furthermore I discovered, that with two broker connections, I get a random “Segmentation Fault” or “double free or corruption (fasttop)”. It doesn’t happen regularly, can happen after 3 seconds or 5 minutes

...
Creating LL OSD context new
** INFO: <bus_callback:175>: Pipeline running
**PERF: 16.49 (16.32)	16.49 (16.32)	16.49 (16.32)	16.49 (16.32)	16.49 (16.32)	
**PERF: 16.34 (16.32)	16.34 (16.32)	16.34 (16.32)	16.34 (16.32)	16.34 (16.32)	
**PERF: 16.19 (16.31)	16.19 (16.31)	16.19 (16.31)	16.19 (16.31)	16.19 (16.31)	
**PERF: 16.10 (16.31)	16.10 (16.31)	16.10 (16.31)	16.10 (16.31)	16.10 (16.31)	
**PERF: 16.00 (16.30)	16.00 (16.30)	16.00 (16.30)	16.00 (16.30)	16.00 (16.30)	
**PERF: 16.23 (16.29)	16.23 (16.29)	16.23 (16.29)	16.23 (16.29)	16.23 (16.29)	
**PERF: 16.04 (16.28)	16.04 (16.28)	16.04 (16.28)	16.04 (16.28)	16.04 (16.28)	
double free or corruption (fasttop)
Aborted (core dumped)
... 
Creating LL OSD context new
** INFO: <bus_callback:175>: Pipeline running
**PERF: 16.63 (16.52)	16.63 (16.54)	16.63 (16.54)	16.63 (16.52)	16.63 (16.52)	
**PERF: 16.52 (16.52)	16.52 (16.53)	16.52 (16.53)	16.52 (16.52)	16.52 (16.52)	
**PERF: 16.50 (16.52)	16.50 (16.53)	16.50 (16.53)	16.50 (16.52)	16.50 (16.52)	
Segmentation fault (core dumped)

Could you reproduce, what I reported?
Thanks in advance!

HI
Regarding your missing/duplicate messages, our internal team confirmed with your method, we can not repro your issue, but not with 4.0.2, will updated to you once have result on version 4.0.2

On which version did you test it? I am working with version 4.0.1.