Problems about sending messages via ZeroMQ

**• Hardware Platform (Jetson / GPU)**Jetson
• DeepStream Version6.1.1
Sample using: deepstream-app
Hi,I am trying to send messages via ZeroMQ, I try to put it under the main function,but failed.
Then I try to put it under process_meta() function under deepstream_app.c file:

process_meta (AppCtx * appCtx, NvDsBatchMeta * batch_meta){
  void *socket = zmq_socket(context, ZMQ_PUB); 
  void *publisher = zmq_socket(context, ZMQ_PUB); 
  if (zmq_bind(socket, "tcp://*:5555") != 0) { 
    perror("绑定端口失败"); 
    zmq_close(socket); 
    return NULL; 
  } 
  sleep(2); 
  GstBuffer *buf = (GstBuffer *) info->data;
  //extra  
  NvDsBatchMeta *batch_meta = gst_buffer_get_nvds_batch_meta(buf);
  for (NvDsMetaList * l_frame = batch_meta->frame_meta_list; l_frame != NULL;
          l_frame = l_frame->next) {
    NvDsFrameMeta *frame_meta = l_frame->data;
    for (NvDsMetaList * l_obj = frame_meta->obj_meta_list; l_obj != NULL;
        l_obj = l_obj->next) {
      NvDsObjectMeta *obj = (NvDsObjectMeta *) l_obj->data;
      float x = obj->rect_params.left; 
      float y = obj->rect_params.top; 
      float width = obj->rect_params.width; 
      float height = obj->rect_params.height;
      float mid_x=x+width / 2.0f;
      float mid_y=x+height / 2.0f;
      float angle=obj->misc_obj_info[0];

      char message[256]; 
      snprintf(message, sizeof(message), "%d,%d,%.2f", x, y, angle); 

      int bytes_sent = zmq_send(socket, message, strlen(message), 0); 
      if (bytes_sent == -1) { 
        printf("消息发送失败: %s\n", zmq_strerror(zmq_errno())); 
        } 
      else { 
        printf("发送坐标和角度: %s\n", message); 
        } 
        sleep(1); 
    } 
        zmq_close(socket); 
        return NULL; 
  }
}

The program works fine,but it seems that it don’t send any message because I try to receive the message in another python program,receiving nothing.
I wonder if the deepstream support ZeroMQ to send messages.

If yes,is something wrong with my codes position?

If no,what other ways I can try to send messages to other program.

process_meta is in probe function gie_processing_done_buf_prob, which will be called when every frame comes. please don’t add the sending broker code in process_meta because it maybe block the pipeline. here are some suggestions.

  1. nvmsgbroker is opensource. you can customize to implement ZeroMQ sending broker functionality. please refer to the doc.
  2. Or you can use multiple thread. In process_meta, save the information to a queue. then in another thread, get information from the queue, then send broker by ZeroMQ.

Thanks,I am trying to use the second one:

static void
process_meta (AppCtx * appCtx, NvDsBatchMeta * batch_meta,Queue *q)
{ 
for (NvDsMetaList * l_frame = batch_meta->frame_meta_list; l_frame != NULL;
      l_frame = l_frame->next) {
    NvDsFrameMeta *frame_meta = l_frame->data;
    for (NvDsMetaList * l_obj = frame_meta->obj_meta_list; l_obj != NULL;
        l_obj = l_obj->next) {
      NvDsObjectMeta *obj = (NvDsObjectMeta *) l_obj->data;
      gint class_index = obj->class_id;
      NvDsGieConfig *gie_config = NULL;
      gchar *str_ins_pos = NULL;
      float x = obj->rect_params.left; 
      float y = obj->rect_params.top; 
      float width = obj->rect_params.width; 
      float height = obj->rect_params.height;
      float center_x=x+width / 2.0f;
      float center_y=y+height / 2.0f;
      float angle=obj->misc_obj_info[0];
      char message[512]; 
      snprintf(message, sizeof(message), "Object center: (%f, %f), Angle: %f", center_x, center_y, angle); 
      queue_push(q, message);

I add a new struct Queue,and define the sending message function:

void *send_data_via_zmq(void *arg) { 
  Queue *q = (Queue *)arg; 
  void *context = zmq_ctx_new(); 
  void *socket = zmq_socket(context, ZMQ_PUSH); 
  zmq_connect(socket, "tcp://broker_ip:5555"); 
  while (1) { 
    char *data = queue_pop(q); 
    if (data != NULL) { 
      zmq_send(socket, data, strlen(data), 0); 
      free(data); 
    } 
  } 
  zmq_close(socket); 
  zmq_ctx_destroy(context); 
  return NULL; 
}

Then,for the main codes,I put it under the final create_pipeline():

gboolean
create_pipeline (AppCtx * appCtx,
    bbox_generated_callback bbox_generated_post_analytics_cb,
    bbox_generated_callback all_bbox_generated_cb, perf_callback perf_cb,
    overlay_graphics_callback overlay_graphics_cb)
{
  Queue *q = queue_init(); 
  pthread_t zmq_thread; 
  //before the create_processing_instance() function,where the pocess_meta function locates

  //after the create_processing_instance() function
  pthread_create(&zmq_thread, NULL, send_data_via_zmq, (void *)q); 
  pthread_join(zmq_thread, NULL); 
  queue_destroy(q);

But,here is the problem,when I try to run the program,the progress seems to stop and continue loading with nothing appearing on the terminal.

pthread_join is for thread exiting. please add pthread_join in main thread, not in create_pipeline.

Do you mean that I should put it on the main function under deepstream_app_main.c?

  1. Yes. please put it in main function. pthread_join will wait. you can block create_pipeline.
  2. After commenting out the following code, will the app run well?
 pthread_join(zmq_thread, NULL); 
 queue_destroy(q);

Hi,I transfer the codes in deepstream_app.c to deepstream_app_main.c.The original codes are under the process_meta(),now I put it under

static void
all_bbox_generated (AppCtx * appCtx, GstBuffer * buf,
    NvDsBatchMeta * batch_meta, guint index,Queue *q)
{
  for (NvDsMetaList * l_frame = batch_meta->frame_meta_list; l_frame != NULL;
      l_frame = l_frame->next) {
    NvDsFrameMeta *frame_meta = l_frame->data;
    for (NvDsMetaList * l_obj = frame_meta->obj_meta_list; l_obj != NULL;
        l_obj = l_obj->next) {
      NvDsObjectMeta *obj = (NvDsObjectMeta *) l_obj->data;
      int x = obj->rect_params.left; 
      int y = obj->rect_params.top; 
      int width = obj->rect_params.width; 
      int height = obj->rect_params.height;
      int center_x=x+width / 2.0f;
      int center_y=y+height / 2.0f;
      int angle=obj->misc_obj_info[0];
      char message[512]; 
      snprintf(message, sizeof(message), "Object center: (%f, %f), Angle: %f", center_x, center_y, angle); 
      queue_push(q, message);
    }
  }
}

The all_bbox_generated() function is called in the main function,so for the remained codes,the structure is like:

int
main (int argc, char *argv[])
{
  GOptionContext *ctx = NULL;
  GOptionGroup *group = NULL;
  GError *error = NULL;
  guint i;

  Queue *q = queue_init(); 
  pthread_t zmq_thread; 
  pthread_create(&zmq_thread, NULL, send_data_via_zmq, (void *)q);
  .....other codes
  .....other codes
  .....other codes

  for (i = 0; i < num_instances; i++) {
    if (!create_pipeline (appCtx[i], NULL,
            all_bbox_generated, perf_cb, overlay_graphics)) {
      NVGSTDS_ERR_MSG_V ("Failed to create pipeline");
      return_value = -1;
      goto done;
    }
  }

  pthread_join(zmq_thread, NULL); 
  queue_destroy(q);

  main_loop = g_main_loop_new (NULL, FALSE);

  _intr_setup ();
  g_timeout_add (400, check_for_interrupt, NULL);
  .....other codes
  .....other codes
  .....other codes

The all_bbox_generated function is called as a parameter of create_pipeline function,so I add:

  pthread_join(zmq_thread, NULL); 
  queue_destroy(q);

after create_pipeline
But still the same problem,the progress seems to stop and continue loading with nothing appearing on the terminal.

please add “pthread_join(zmq_thread, NULL); queue_destroy(q);” after g_main_loop_run, which blocks the main thread.

Now,it can run,but I meet errors when the objects are detected:

deepstream-apptest: tpp.c:82: __pthread_tpp_change_priority: Assertion `new_prio == -1 || (new_prio >= fifo_min_prio && new_prio <= fifo_max_prio)’ failed.

Sorry for the late reply! could you simplify the code to narrow down this issue? for example, which code cause this issue? it is a DeepStream bug? Thanks!

Eaxctly,the codes shown above are all the codes I add.
It seems that the added thread is uncompatible with the original one.

Sorry for the late reply, Is this still an DeepStream issue to support?

Thanks. I have solved this by using another way called shared memory.

1 Like

Thanks for the update, Is this still an DeepStream issue to support? Thanks! please open a new topic if having other DeepStream problems.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.