Hello,
I tried to create a DOCA Comch MsgQ between host and BlueField 3 DPA, according to the instructions mentioned at here. After successful initialization, I submitted a send_task with immediate data to the cpu producer, the DPA thread received the immediate data and acknowledged the consumer completion event. However, when I tried to call doca_pe_progress to trigger the task completion callback, it crashed inside.
By the way, I don’t find any sample code for DOCA Comch MsgQ. I’m not sure I’m using it correctly.
Common header test_msgq_common.h:
#pragma once
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
typedef struct params {
uint64_t comp_handle;
uint64_t notify_comp_handle;
uint64_t cons_handle;
uint64_t cons_comp_handle;
uint64_t trigger_count;
} params __attribute__((aligned(8)));
#ifdef __cplusplus
}
#endif
Host-side code:
#include <dbg.h>
#include "test_msgq_common.h"
#include <string>
#include <thread>
#include <sys/stat.h>
#include <unistd.h>
#include <doca_comch.h>
#include <doca_comch_consumer.h>
#include <doca_comch_msgq.h>
#include <doca_comch_producer.h>
#include <doca_ctx.h>
#include <doca_dev.h>
#include <doca_dpa.h>
#include <doca_pe.h>
using namespace std::chrono_literals;
extern "C" doca_dpa_app *test_msgq;
std::string ibdev_name = "mlx5_1";
doca_dev *dev;
doca_dpa *dpa;
doca_pe *pe;
doca_comch_msgq *inq;
doca_dpa_thread *t;
doca_comch_producer *cpu_prod;
doca_ctx *cpu_prod_ctx;
uint32_t cpu_prod_id;
doca_data cpu_prod_ctx_d;
doca_comch_consumer *dpa_cons;
doca_ctx *dpa_cons_ctx;
uint32_t dpa_cons_id;
doca_dpa_completion *dpa_comp;
doca_dpa_notification_completion *dpa_notify_comp;
doca_comch_consumer_completion *dpa_cons_comp;
doca_dpa_dev_uintptr_t dpa_params_ptr;
params p;
doca_comch_producer_task_send *task_send;
doca_task *task;
doca_data task_d;
extern "C" doca_dpa_func_t test_msgq_1;
extern "C" doca_dpa_func_t trigger;
#ifndef NDEBUG
#include <cstdio>
#include <cstdlib>
#include <doca_error.h>
#define doca_check(expr) \
do { \
doca_error_t __doca_check_result__ = expr; \
if (__doca_check_result__ != DOCA_SUCCESS) { \
fprintf(stderr, "%s:%d:%s:" #expr ": %s\n", __FILE__, __LINE__, \
__func__, doca_error_get_descr(__doca_check_result__)); \
if (dpa != nullptr) { \
doca_error_t __doca_dpa_last_error__ = \
doca_dpa_peek_at_last_error(dpa); \
fprintf(stderr, "DPA ERROR: %s\n", \
doca_error_get_descr(__doca_dpa_last_error__)); \
} \
std::abort(); \
} \
} while (0);
#else
#define doca_check(expr) (void)(expr)
#endif
auto task_send_comp_ok_cb(struct doca_comch_producer_task_send *task,
union doca_data d1, union doca_data d2) -> void {
std::cerr << "ok done" << std::endl;
std::cerr << d1.u64 << d2.u64 << std::endl;
doca_task_free(doca_comch_producer_task_send_as_task(task));
};
auto task_send_comp_err_cb(struct doca_comch_producer_task_send *task,
union doca_data, union doca_data) -> void {
std::cerr << "err done" << std::endl;
doca_task_free(doca_comch_producer_task_send_as_task(task));
};
auto state_change_cb(const union doca_data, struct doca_ctx *,
enum doca_ctx_states ps, enum doca_ctx_states ns) -> void {
switch (ps) {
case DOCA_CTX_STATE_IDLE:
std::cerr << "from idle ";
break;
case DOCA_CTX_STATE_STARTING:
std::cerr << "from starting ";
break;
case DOCA_CTX_STATE_RUNNING:
std::cerr << "from running ";
break;
case DOCA_CTX_STATE_STOPPING:
std::cerr << "from stopping ";
break;
default:
break;
}
switch (ns) {
case DOCA_CTX_STATE_IDLE:
std::cerr << "to idle" << std::endl;
break;
case DOCA_CTX_STATE_STARTING:
std::cerr << "to starting" << std::endl;
break;
case DOCA_CTX_STATE_RUNNING:
std::cerr << "to running" << std::endl;
break;
case DOCA_CTX_STATE_STOPPING:
std::cerr << "to stopping" << std::endl;
break;
default:
break;
}
};
int main() {
dbg("find device");
doca_devinfo **dev_list;
doca_devinfo *dev_info = NULL;
uint32_t nb_devs;
doca_check(doca_devinfo_create_list(&dev_list, &nb_devs));
char ibdev_name_buf[DOCA_DEVINFO_IBDEV_NAME_SIZE];
doca_error_t result = DOCA_SUCCESS;
for (uint32_t i = 0; i < nb_devs; i++) {
result = doca_devinfo_get_ibdev_name(dev_list[i], ibdev_name_buf,
DOCA_DEVINFO_IBDEV_NAME_SIZE);
dbg(ibdev_name_buf);
if (result == DOCA_SUCCESS && ibdev_name == ibdev_name_buf) {
dev_info = dev_list[i];
break;
}
}
if (!dev_info) {
std::abort();
}
dbg(doca_comch_producer_cap_is_supported(dev_info));
dbg("open device");
doca_check(doca_dev_open(dev_info, &dev));
doca_check(doca_devinfo_destroy_list(dev_list));
dbg("create dpa");
doca_check(doca_dpa_create(dev, &dpa));
doca_check(doca_dpa_set_app(dpa, test_msgq));
#ifndef NDEBUG
doca_check(doca_dpa_set_log_level(dpa, DOCA_DPA_DEV_LOG_LEVEL_DEBUG));
#else
doca_check(doca_dpa_set_log_level(dpa, DOCA_DPA_DEV_LOG_LEVEL_INFO));
#endif
// if (access(".log", F_OK) != 0) {
// int result = mkdir(".log", 0777);
// if (result != 0) {
// fprintf(stderr, "ERROR: create log file? %s\n", strerror(errno));
// std::abort();
// }
// }
// doca_check(doca_dpa_log_file_set_path(dpa, ".log/dpa.log"));
doca_check(doca_dpa_start(dpa));
dbg("create host pe");
doca_check(doca_pe_create(&pe));
dbg("create params");
doca_check(doca_dpa_mem_alloc(dpa, sizeof(params), &dpa_params_ptr));
dbg("create thread");
doca_check(doca_dpa_thread_create(dpa, &t));
doca_check(doca_dpa_thread_set_func_arg(t, test_msgq_1, dpa_params_ptr));
doca_check(doca_dpa_thread_start(t));
dbg("create completion");
doca_check(doca_dpa_completion_create(dpa, 16, &dpa_comp));
doca_check(doca_dpa_completion_set_thread(dpa_comp, t));
doca_check(doca_dpa_completion_start(dpa_comp));
doca_check(doca_dpa_completion_get_dpa_handle(dpa_comp, &p.comp_handle));
dbg("create notificaion completion");
doca_check(doca_dpa_notification_completion_create(dpa, t, &dpa_notify_comp));
doca_check(doca_dpa_notification_completion_start(dpa_notify_comp));
doca_check(doca_dpa_notification_completion_get_dpa_handle(
dpa_notify_comp, &p.notify_comp_handle));
dbg("create msgq");
doca_check(doca_comch_msgq_create(dev, &inq));
doca_check(doca_comch_msgq_set_max_num_consumers(inq, 1));
doca_check(doca_comch_msgq_set_max_num_producers(inq, 1));
doca_check(doca_comch_msgq_set_dpa_consumer(inq, dpa));
doca_check(doca_comch_msgq_start(inq));
dbg("create consumer completion");
doca_check(doca_comch_consumer_completion_create(&dpa_cons_comp));
doca_check(
doca_comch_consumer_completion_set_max_num_consumers(dpa_cons_comp, 1));
doca_check(doca_comch_consumer_completion_set_max_num_recv(dpa_cons_comp, 16));
doca_check(doca_comch_consumer_completion_set_dpa_thread(dpa_cons_comp, t));
doca_check(doca_comch_consumer_completion_start(dpa_cons_comp));
doca_check(doca_comch_consumer_completion_get_dpa_handle(
dpa_cons_comp, &p.cons_comp_handle));
dbg("create msgq consumer");
doca_check(doca_comch_msgq_consumer_create(inq, &dpa_cons));
doca_check(doca_comch_consumer_set_dev_num_recv(dpa_cons, 16));
doca_check(doca_comch_consumer_set_imm_data_len(dpa_cons, 4));
doca_check(doca_comch_consumer_get_id(dpa_cons, &dpa_cons_id));
doca_check(doca_comch_consumer_set_completion(dpa_cons, dpa_cons_comp, 114));
dbg(dpa_cons_id);
dpa_cons_ctx = doca_comch_consumer_as_ctx(dpa_cons);
doca_check(doca_ctx_set_datapath_on_dpa(dpa_cons_ctx, dpa));
doca_check(doca_ctx_start(dpa_cons_ctx));
doca_check(doca_comch_consumer_get_dpa_handle(dpa_cons, &p.cons_handle));
dbg("create msgq producer");
doca_check(doca_comch_msgq_producer_create(inq, &cpu_prod));
doca_check(doca_comch_producer_get_id(cpu_prod, &cpu_prod_id));
dbg(cpu_prod_id);
cpu_prod_ctx = doca_comch_producer_as_ctx(cpu_prod);
doca_check(doca_ctx_set_state_changed_cb(cpu_prod_ctx, state_change_cb));
doca_check(doca_pe_connect_ctx(pe, cpu_prod_ctx));
cpu_prod_ctx_d.u64 = 114;
doca_check(doca_ctx_set_user_data(cpu_prod_ctx, cpu_prod_ctx_d));
doca_check(doca_comch_producer_task_send_set_conf(
cpu_prod, task_send_comp_ok_cb, task_send_comp_err_cb, 16));
doca_check(doca_ctx_start(cpu_prod_ctx));
dbg("pass params");
p.trigger_count = 0;
doca_check(doca_dpa_h2d_memcpy(dpa, dpa_params_ptr, &p, sizeof(params)));
dbg("run thread");
doca_check(doca_dpa_thread_run(t));
dbg("trigger rpc to nofity the thread post recv");
uint64_t retval = 0;
doca_check(doca_dpa_rpc(dpa, trigger, &retval, p.notify_comp_handle));
std::this_thread::sleep_for(1s);
dbg("alloc send task");
doca_check(doca_comch_producer_task_send_alloc_init(
cpu_prod, nullptr, (uint8_t *)"AAAA", 4, dpa_cons_id, &task_send));
task = doca_comch_producer_task_send_as_task(task_send);
task_d.u64 = 514;
doca_task_set_user_data(task, task_d);
dbg("submit task");
do {
result = doca_task_try_submit(task);
if (result == DOCA_SUCCESS) {
break;
} else if (result == DOCA_ERROR_INVALID_VALUE) {
dbg("invalid");
abort();
}
} while (result == DOCA_ERROR_AGAIN);
dbg("submitted");
std::this_thread::sleep_for(1s);
dbg(doca_error_get_descr(doca_task_get_status(task)));
dbg("progress");
uint8_t ok = 0;
do {
ok = doca_pe_progress(pe);
dbg(ok);
} while (ok != 0);
dbg("committed");
dbg(doca_error_get_descr(doca_task_get_status(task)));
std::this_thread::sleep_for(1s);
doca_check(doca_ctx_stop(dpa_cons_ctx));
doca_check(doca_ctx_stop(cpu_prod_ctx));
doca_check(doca_comch_consumer_completion_stop(dpa_cons_comp));
doca_check(doca_dpa_notification_completion_stop(dpa_notify_comp));
doca_check(doca_dpa_completion_stop(dpa_comp));
doca_check(doca_dpa_thread_stop(t));
doca_check(doca_comch_consumer_destroy(dpa_cons));
doca_check(doca_comch_producer_destroy(cpu_prod));
doca_check(doca_comch_msgq_stop(inq));
doca_check(doca_comch_consumer_completion_destroy(dpa_cons_comp));
doca_check(doca_comch_msgq_destroy(inq));
doca_check(doca_dpa_mem_free(dpa, dpa_params_ptr));
doca_check(doca_dpa_thread_destroy(t));
doca_check(doca_dpa_notification_completion_destroy(dpa_notify_comp));
doca_check(doca_dpa_completion_destroy(dpa_comp));
doca_check(doca_pe_destroy(pe));
doca_check(doca_dpa_destroy(dpa));
doca_check(doca_dev_close(dev));
dbg("done");
return 0;
}
DPA-side code:
#include <doca_dpa_dev.h>
#include <doca_dpa_dev_buf.h>
#include <doca_dpa_dev_comch_msgq.h>
#include <doca_dpa_dev_rdma.h>
#include <doca_dpa_dev_sync_event.h>
#include "test_msgq_common.h"
__dpa_global__ static void test_msgq_1(doca_dpa_dev_uintptr_t params_ptr) {
params *p = (params *)params_ptr;
DOCA_DPA_DEV_LOG_CRIT("ok trigger %lu\n", p->trigger_count);
if (p->trigger_count == 0) {
DOCA_DPA_DEV_LOG_CRIT("first trigger\n");
doca_dpa_dev_comch_consumer_ack(p->cons_handle, 1);
doca_dpa_dev_comch_consumer_completion_request_notification(
p->cons_comp_handle);
DOCA_DPA_DEV_LOG_CRIT("request notification\n");
p->trigger_count++;
} else {
doca_dpa_dev_comch_consumer_completion_element_t e;
int got =
doca_dpa_dev_comch_consumer_get_completion(p->cons_comp_handle, &e);
if (got) {
uint32_t imm_len = 0;
const uint8_t *imm =
doca_dpa_dev_comch_consumer_get_completion_imm(e, &imm_len);
uint32_t prod_id =
doca_dpa_dev_comch_consumer_get_completion_producer_id(e);
DOCA_DPA_DEV_LOG_CRIT("%u %s %u\n", prod_id, (const char *)imm, imm_len);
doca_dpa_dev_comch_consumer_completion_ack(p->cons_comp_handle, 1);
doca_dpa_dev_comch_consumer_ack(p->cons_handle, 1);
doca_dpa_dev_comch_consumer_completion_request_notification(
p->cons_comp_handle);
}
}
DOCA_DPA_DEV_LOG_CRIT("recheduled\n");
doca_dpa_dev_thread_reschedule();
}
__dpa_rpc__ uint64_t
trigger(doca_dpa_dev_notification_completion_t notify_handle) {
DOCA_DPA_DEV_LOG_CRIT("do notify\n");
doca_dpa_dev_thread_notify(notify_handle);
DOCA_DPA_DEV_LOG_CRIT("done notify\n");
return 0;
}
Please help me.
Thanks.