Segmentation Fault in doca_pe_progress

Hello,
I tried to create a DOCA Comch MsgQ between host and BlueField 3 DPA, according to the instructions mentioned at here. After successful initialization, I submitted a send_task with immediate data to the cpu producer, the DPA thread received the immediate data and acknowledged the consumer completion event. However, when I tried to call doca_pe_progress to trigger the task completion callback, it crashed inside.

By the way, I don’t find any sample code for DOCA Comch MsgQ. I’m not sure I’m using it correctly.

Common header test_msgq_common.h:

#pragma once

#include <stdint.h>

#ifdef __cplusplus
extern "C" {
#endif

typedef struct params {
  uint64_t comp_handle;
  uint64_t notify_comp_handle;
  uint64_t cons_handle;
  uint64_t cons_comp_handle;
  uint64_t trigger_count;
} params __attribute__((aligned(8)));

#ifdef __cplusplus
}
#endif

Host-side code:

#include <dbg.h>

#include "test_msgq_common.h"

#include <string>
#include <thread>

#include <sys/stat.h>
#include <unistd.h>

#include <doca_comch.h>
#include <doca_comch_consumer.h>
#include <doca_comch_msgq.h>
#include <doca_comch_producer.h>
#include <doca_ctx.h>
#include <doca_dev.h>
#include <doca_dpa.h>
#include <doca_pe.h>

using namespace std::chrono_literals;

extern "C" doca_dpa_app *test_msgq;

std::string ibdev_name = "mlx5_1";

doca_dev *dev;
doca_dpa *dpa;
doca_pe *pe;
doca_comch_msgq *inq;
doca_dpa_thread *t;
doca_comch_producer *cpu_prod;
doca_ctx *cpu_prod_ctx;
uint32_t cpu_prod_id;
doca_data cpu_prod_ctx_d;
doca_comch_consumer *dpa_cons;
doca_ctx *dpa_cons_ctx;
uint32_t dpa_cons_id;
doca_dpa_completion *dpa_comp;
doca_dpa_notification_completion *dpa_notify_comp;
doca_comch_consumer_completion *dpa_cons_comp;
doca_dpa_dev_uintptr_t dpa_params_ptr;
params p;
doca_comch_producer_task_send *task_send;
doca_task *task;
doca_data task_d;

extern "C" doca_dpa_func_t test_msgq_1;
extern "C" doca_dpa_func_t trigger;

#ifndef NDEBUG
#include <cstdio>
#include <cstdlib>
#include <doca_error.h>
#define doca_check(expr)                                                       \
  do {                                                                         \
    doca_error_t __doca_check_result__ = expr;                                 \
    if (__doca_check_result__ != DOCA_SUCCESS) {                               \
      fprintf(stderr, "%s:%d:%s:" #expr ": %s\n", __FILE__, __LINE__,          \
              __func__, doca_error_get_descr(__doca_check_result__));          \
      if (dpa != nullptr) {                                                    \
        doca_error_t __doca_dpa_last_error__ =                                 \
            doca_dpa_peek_at_last_error(dpa);                                  \
        fprintf(stderr, "DPA ERROR: %s\n",                                     \
                doca_error_get_descr(__doca_dpa_last_error__));                \
      }                                                                        \
      std::abort();                                                            \
    }                                                                          \
  } while (0);
#else
#define doca_check(expr) (void)(expr)
#endif

auto task_send_comp_ok_cb(struct doca_comch_producer_task_send *task,
                          union doca_data d1, union doca_data d2) -> void {
  std::cerr << "ok done" << std::endl;
  std::cerr << d1.u64 << d2.u64 << std::endl;
  doca_task_free(doca_comch_producer_task_send_as_task(task));
};
auto task_send_comp_err_cb(struct doca_comch_producer_task_send *task,
                           union doca_data, union doca_data) -> void {
  std::cerr << "err done" << std::endl;
  doca_task_free(doca_comch_producer_task_send_as_task(task));
};
auto state_change_cb(const union doca_data, struct doca_ctx *,
                     enum doca_ctx_states ps, enum doca_ctx_states ns) -> void {
  switch (ps) {
  case DOCA_CTX_STATE_IDLE:
    std::cerr << "from idle ";
    break;
  case DOCA_CTX_STATE_STARTING:
    std::cerr << "from starting ";
    break;
  case DOCA_CTX_STATE_RUNNING:
    std::cerr << "from running ";
    break;
  case DOCA_CTX_STATE_STOPPING:
    std::cerr << "from stopping ";
    break;
  default:
    break;
  }
  switch (ns) {
  case DOCA_CTX_STATE_IDLE:
    std::cerr << "to idle" << std::endl;
    break;
  case DOCA_CTX_STATE_STARTING:
    std::cerr << "to starting" << std::endl;
    break;
  case DOCA_CTX_STATE_RUNNING:
    std::cerr << "to running" << std::endl;
    break;
  case DOCA_CTX_STATE_STOPPING:
    std::cerr << "to stopping" << std::endl;
    break;
  default:
    break;
  }
};
int main() {
  dbg("find device");
  doca_devinfo **dev_list;
  doca_devinfo *dev_info = NULL;
  uint32_t nb_devs;
  doca_check(doca_devinfo_create_list(&dev_list, &nb_devs));
  char ibdev_name_buf[DOCA_DEVINFO_IBDEV_NAME_SIZE];
  doca_error_t result = DOCA_SUCCESS;
  for (uint32_t i = 0; i < nb_devs; i++) {
    result = doca_devinfo_get_ibdev_name(dev_list[i], ibdev_name_buf,
                                         DOCA_DEVINFO_IBDEV_NAME_SIZE);
    dbg(ibdev_name_buf);
    if (result == DOCA_SUCCESS && ibdev_name == ibdev_name_buf) {
      dev_info = dev_list[i];
      break;
    }
  }
  if (!dev_info) {
    std::abort();
  }
  dbg(doca_comch_producer_cap_is_supported(dev_info));
  dbg("open device");
  doca_check(doca_dev_open(dev_info, &dev));
  doca_check(doca_devinfo_destroy_list(dev_list));
  dbg("create dpa");
  doca_check(doca_dpa_create(dev, &dpa));
  doca_check(doca_dpa_set_app(dpa, test_msgq));
#ifndef NDEBUG
  doca_check(doca_dpa_set_log_level(dpa, DOCA_DPA_DEV_LOG_LEVEL_DEBUG));
#else
  doca_check(doca_dpa_set_log_level(dpa, DOCA_DPA_DEV_LOG_LEVEL_INFO));
#endif
  //   if (access(".log", F_OK) != 0) {
  //     int result = mkdir(".log", 0777);
  //     if (result != 0) {
  //       fprintf(stderr, "ERROR: create log file? %s\n", strerror(errno));
  //       std::abort();
  //     }
  //   }
  //   doca_check(doca_dpa_log_file_set_path(dpa, ".log/dpa.log"));
  doca_check(doca_dpa_start(dpa));

  dbg("create host pe");
  doca_check(doca_pe_create(&pe));
  dbg("create params");
  doca_check(doca_dpa_mem_alloc(dpa, sizeof(params), &dpa_params_ptr));
  dbg("create thread");
  doca_check(doca_dpa_thread_create(dpa, &t));
  doca_check(doca_dpa_thread_set_func_arg(t, test_msgq_1, dpa_params_ptr));
  doca_check(doca_dpa_thread_start(t));
  dbg("create completion");
  doca_check(doca_dpa_completion_create(dpa, 16, &dpa_comp));
  doca_check(doca_dpa_completion_set_thread(dpa_comp, t));
  doca_check(doca_dpa_completion_start(dpa_comp));
  doca_check(doca_dpa_completion_get_dpa_handle(dpa_comp, &p.comp_handle));
  dbg("create notificaion completion");
  doca_check(doca_dpa_notification_completion_create(dpa, t, &dpa_notify_comp));
  doca_check(doca_dpa_notification_completion_start(dpa_notify_comp));
  doca_check(doca_dpa_notification_completion_get_dpa_handle(
      dpa_notify_comp, &p.notify_comp_handle));
  dbg("create msgq");
  doca_check(doca_comch_msgq_create(dev, &inq));
  doca_check(doca_comch_msgq_set_max_num_consumers(inq, 1));
  doca_check(doca_comch_msgq_set_max_num_producers(inq, 1));
  doca_check(doca_comch_msgq_set_dpa_consumer(inq, dpa));
  doca_check(doca_comch_msgq_start(inq));
  dbg("create consumer completion");
  doca_check(doca_comch_consumer_completion_create(&dpa_cons_comp));
  doca_check(
      doca_comch_consumer_completion_set_max_num_consumers(dpa_cons_comp, 1));
  doca_check(doca_comch_consumer_completion_set_max_num_recv(dpa_cons_comp, 16));
  doca_check(doca_comch_consumer_completion_set_dpa_thread(dpa_cons_comp, t));
  doca_check(doca_comch_consumer_completion_start(dpa_cons_comp));
  doca_check(doca_comch_consumer_completion_get_dpa_handle(
      dpa_cons_comp, &p.cons_comp_handle));
  dbg("create msgq consumer");
  doca_check(doca_comch_msgq_consumer_create(inq, &dpa_cons));
  doca_check(doca_comch_consumer_set_dev_num_recv(dpa_cons, 16));
  doca_check(doca_comch_consumer_set_imm_data_len(dpa_cons, 4));
  doca_check(doca_comch_consumer_get_id(dpa_cons, &dpa_cons_id));
  doca_check(doca_comch_consumer_set_completion(dpa_cons, dpa_cons_comp, 114));
  dbg(dpa_cons_id);
  dpa_cons_ctx = doca_comch_consumer_as_ctx(dpa_cons);
  doca_check(doca_ctx_set_datapath_on_dpa(dpa_cons_ctx, dpa));
  doca_check(doca_ctx_start(dpa_cons_ctx));
  doca_check(doca_comch_consumer_get_dpa_handle(dpa_cons, &p.cons_handle));
  dbg("create msgq producer");
  doca_check(doca_comch_msgq_producer_create(inq, &cpu_prod));
  doca_check(doca_comch_producer_get_id(cpu_prod, &cpu_prod_id));
  dbg(cpu_prod_id);
  cpu_prod_ctx = doca_comch_producer_as_ctx(cpu_prod);
  doca_check(doca_ctx_set_state_changed_cb(cpu_prod_ctx, state_change_cb));
  doca_check(doca_pe_connect_ctx(pe, cpu_prod_ctx));
  cpu_prod_ctx_d.u64 = 114;
  doca_check(doca_ctx_set_user_data(cpu_prod_ctx, cpu_prod_ctx_d));
  doca_check(doca_comch_producer_task_send_set_conf(
      cpu_prod, task_send_comp_ok_cb, task_send_comp_err_cb, 16));
  doca_check(doca_ctx_start(cpu_prod_ctx));
  dbg("pass params");
  p.trigger_count = 0;
  doca_check(doca_dpa_h2d_memcpy(dpa, dpa_params_ptr, &p, sizeof(params)));
  dbg("run thread");
  doca_check(doca_dpa_thread_run(t));
  dbg("trigger rpc to nofity the thread post recv");
  uint64_t retval = 0;
  doca_check(doca_dpa_rpc(dpa, trigger, &retval, p.notify_comp_handle));
  std::this_thread::sleep_for(1s);
  dbg("alloc send task");
  doca_check(doca_comch_producer_task_send_alloc_init(
      cpu_prod, nullptr, (uint8_t *)"AAAA", 4, dpa_cons_id, &task_send));
  task = doca_comch_producer_task_send_as_task(task_send);
  task_d.u64 = 514;
  doca_task_set_user_data(task, task_d);
  dbg("submit task");
  do {
    result = doca_task_try_submit(task);
    if (result == DOCA_SUCCESS) {
      break;
    } else if (result == DOCA_ERROR_INVALID_VALUE) {
      dbg("invalid");
      abort();
    }
  } while (result == DOCA_ERROR_AGAIN);
  dbg("submitted");
  std::this_thread::sleep_for(1s);
  dbg(doca_error_get_descr(doca_task_get_status(task)));
  dbg("progress");
  uint8_t ok = 0;
  do {
    ok = doca_pe_progress(pe);
    dbg(ok);
  } while (ok != 0);
  dbg("committed");
  dbg(doca_error_get_descr(doca_task_get_status(task)));
  std::this_thread::sleep_for(1s);
  doca_check(doca_ctx_stop(dpa_cons_ctx));
  doca_check(doca_ctx_stop(cpu_prod_ctx));
  doca_check(doca_comch_consumer_completion_stop(dpa_cons_comp));
  doca_check(doca_dpa_notification_completion_stop(dpa_notify_comp));
  doca_check(doca_dpa_completion_stop(dpa_comp));
  doca_check(doca_dpa_thread_stop(t));
  doca_check(doca_comch_consumer_destroy(dpa_cons));
  doca_check(doca_comch_producer_destroy(cpu_prod));
  doca_check(doca_comch_msgq_stop(inq));
  doca_check(doca_comch_consumer_completion_destroy(dpa_cons_comp));
  doca_check(doca_comch_msgq_destroy(inq));
  doca_check(doca_dpa_mem_free(dpa, dpa_params_ptr));
  doca_check(doca_dpa_thread_destroy(t));
  doca_check(doca_dpa_notification_completion_destroy(dpa_notify_comp));
  doca_check(doca_dpa_completion_destroy(dpa_comp));
  doca_check(doca_pe_destroy(pe));
  doca_check(doca_dpa_destroy(dpa));
  doca_check(doca_dev_close(dev));
  dbg("done");
  return 0;
}

DPA-side code:

#include <doca_dpa_dev.h>
#include <doca_dpa_dev_buf.h>
#include <doca_dpa_dev_comch_msgq.h>
#include <doca_dpa_dev_rdma.h>
#include <doca_dpa_dev_sync_event.h>

#include "test_msgq_common.h"

__dpa_global__ static void test_msgq_1(doca_dpa_dev_uintptr_t params_ptr) {
  params *p = (params *)params_ptr;
  DOCA_DPA_DEV_LOG_CRIT("ok trigger %lu\n", p->trigger_count);
  if (p->trigger_count == 0) {
    DOCA_DPA_DEV_LOG_CRIT("first trigger\n");
    doca_dpa_dev_comch_consumer_ack(p->cons_handle, 1);
    doca_dpa_dev_comch_consumer_completion_request_notification(
        p->cons_comp_handle);
    DOCA_DPA_DEV_LOG_CRIT("request notification\n");
    p->trigger_count++;
  } else {
    doca_dpa_dev_comch_consumer_completion_element_t e;
    int got =
        doca_dpa_dev_comch_consumer_get_completion(p->cons_comp_handle, &e);
    if (got) {
      uint32_t imm_len = 0;
      const uint8_t *imm =
          doca_dpa_dev_comch_consumer_get_completion_imm(e, &imm_len);
      uint32_t prod_id =
          doca_dpa_dev_comch_consumer_get_completion_producer_id(e);
      DOCA_DPA_DEV_LOG_CRIT("%u %s %u\n", prod_id, (const char *)imm, imm_len);
      doca_dpa_dev_comch_consumer_completion_ack(p->cons_comp_handle, 1);
      doca_dpa_dev_comch_consumer_ack(p->cons_handle, 1);
      doca_dpa_dev_comch_consumer_completion_request_notification(
          p->cons_comp_handle);
    }
  }
  DOCA_DPA_DEV_LOG_CRIT("recheduled\n");
  doca_dpa_dev_thread_reschedule();
}

__dpa_rpc__ uint64_t
trigger(doca_dpa_dev_notification_completion_t notify_handle) {
  DOCA_DPA_DEV_LOG_CRIT("do notify\n");
  doca_dpa_dev_thread_notify(notify_handle);
  DOCA_DPA_DEV_LOG_CRIT("done notify\n");
  return 0;
}

Please help me.

Thanks.