DOCA Comch consumer start event lost on Bluefield-3 (but not BF-2) when using epoll and notification handles

Hello,

I’m having a strange problem with the DOCA comch fast path API (version 2.9.1) on Bluefield-3 with code that works on a Bluefield-2. The problem seems to be related to my use of doca_pe_request_notification in an epoll loop, as in the doca_common/pe_event sample. I wonder if anyone has run into this before and if there’s a workaround.

In short, if doca_pe_request_notification is called before doca_pe_progress, then the start event for DOCA Comch consumer contexts is not generated. All other contexts I’ve tried work fine, including DOCA Comch producer, client, and server, and consumers also work fine if the doca_pe_request_notification call is removed epoll_wait called with a timeout, but of course that makes epoll_wait into a sleep and is not really a sensible workaround.

I have a minimal example that is still unfortunately somewhat long. It sets up a Comch server on the DPU side that logs connecting consumers, and the client on the host just connects to the server, creates a consumer, and tears it down again just after it has started. I can reproduce the problem on Bluefield-3 only, on BF-2 it works as desired. On the BF-3, the client can be made to “work” by removing line 171 (the 'doca_pe_request_notification` call). Sending messages back and forth before setting up the consumer changes nothing, setting up a producer on the server also changes nothing.

When it’s working, this minimal example will complain about a second problem, namely that the client cannot be stopped from the consumer’s state-changed callback after the consumer is destroyed (doca_ctx_stop erroneously returns DOCA_ERROR_IN_USE), but that’s not the point of this post and occurs on both BF-2 and BF-3.

Server side:

#include <doca_comch.h>
#include <doca_ctx.h>
#include <doca_dev.h>
#include <doca_pe.h>

#include <inttypes.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <unistd.h>

#define ASSERT_SUCCESS(expr) do { \
    doca_error_t err = (expr); \
    if(err != DOCA_SUCCESS) { \
        printf("Error in %s, line %d: %s\n", __func__, __LINE__, doca_error_get_name(err)); \
        fflush(stdout); \
        exit(-3); \
    } \
} while(0)

void server_state_change_callback(
    union doca_data user_data,
    struct doca_ctx *ctx,
    enum doca_ctx_states prev_state,
    enum doca_ctx_states next_state
) {
    (void) user_data;
    (void) ctx;
    (void) prev_state;

    printf("server state change %d -> %d\n", prev_state, next_state);

    if(next_state == DOCA_CTX_STATE_RUNNING) {
        puts("accepting connections");
    }
}

void connection_callback(
    struct doca_comch_event_connection_status_changed *event,
    struct doca_comch_connection *connection,
    uint8_t change_successful
) {
    (void) event;
    (void) connection;
    (void) change_successful;
}

void send_task_completed_callback(
    struct doca_comch_task_send *task,
    union doca_data task_user_data,
    union doca_data ctx_user_data
) {
    (void) task_user_data;
    (void) ctx_user_data;

    doca_task_free(doca_comch_task_send_as_task(task));
}

void msg_recv_callback(
    struct doca_comch_event_msg_recv *event,
    uint8_t *recv_buffer,
    uint32_t msg_len,
    struct doca_comch_connection *connection
) {
    (void) event;
    (void) recv_buffer;
    (void) msg_len;
    (void) connection;
}

void new_consumer_callback(
    struct doca_comch_event_consumer *event,
    struct doca_comch_connection *connection,
    uint32_t id
) {
    (void) event;
    (void) connection;

    printf("new consumer %" PRIu32 "\n", id);
}

void expired_consumer_callback(
    struct doca_comch_event_consumer *event,
    struct doca_comch_connection *connection,
    uint32_t id
) {
    (void) event;
    (void) connection;

    printf("expired consumer %" PRIu32 "\n", id);
}

struct doca_dev *open_server_device(char const *pci_addr) {
    struct doca_dev *result = NULL;
    struct doca_devinfo **dev_list;
    uint32_t nb_devs;

    ASSERT_SUCCESS(doca_devinfo_create_list(&dev_list, &nb_devs));

    for(uint32_t i = 0; i < nb_devs; ++i) {
        uint8_t is_addr_equal = 0;

        ASSERT_SUCCESS(doca_devinfo_is_equal_pci_addr(dev_list[i], pci_addr, &is_addr_equal));

        if(is_addr_equal && doca_comch_cap_server_is_supported(dev_list[i]) == DOCA_SUCCESS) {
            ASSERT_SUCCESS(doca_dev_open(dev_list[i], &result));
            return result;
        }
    }

    exit(-1);
}

struct doca_dev_rep *open_server_device_representor(struct doca_dev *server_dev, char const *pci_addr) {
    struct doca_dev_rep *result = NULL;
    struct doca_devinfo_rep **rep_list = NULL;
    uint32_t nb_reps;

    ASSERT_SUCCESS(doca_devinfo_rep_create_list(server_dev, DOCA_DEVINFO_REP_FILTER_NET, &rep_list, &nb_reps));

    for(uint32_t i = 0; i < nb_reps; ++i) {
        uint8_t is_addr_equal;

        ASSERT_SUCCESS(doca_devinfo_rep_is_equal_pci_addr(rep_list[i], pci_addr, &is_addr_equal));

        if(is_addr_equal) {
            ASSERT_SUCCESS(doca_dev_rep_open(rep_list[i], &result));
            return result;
        }
    }

    exit(-2);
}

int main(void) {
    char const *dev_pci = getenv("DOCA_DEV");
    char const *rep_pci = getenv("DOCA_REP");

    struct doca_dev *dev = open_server_device(dev_pci ? dev_pci : "03:00.0");
    struct doca_dev_rep *rep = open_server_device_representor(dev, rep_pci ? rep_pci: "e1:00.0");

    struct doca_pe *pe;
    ASSERT_SUCCESS(doca_pe_create(&pe));

    doca_event_handle_t event_handle;
    ASSERT_SUCCESS(doca_pe_get_notification_handle(pe, &event_handle));

    int epoll_fd = epoll_create1(EPOLL_CLOEXEC);
    struct epoll_event events_in = { EPOLLIN, { .fd = event_handle }};
    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, event_handle, &events_in);

    struct doca_comch_server *server;
    ASSERT_SUCCESS(doca_comch_server_create(dev, rep, "consumer-start-bug", &server));
    ASSERT_SUCCESS(doca_comch_server_set_max_msg_size(server, 4080));
    ASSERT_SUCCESS(doca_comch_server_set_recv_queue_size(server, 16));
    ASSERT_SUCCESS(doca_comch_server_task_send_set_conf(server, send_task_completed_callback, send_task_completed_callback, 16));
    ASSERT_SUCCESS(doca_comch_server_event_msg_recv_register(server, msg_recv_callback));
    ASSERT_SUCCESS(doca_comch_server_event_connection_status_changed_register(server, connection_callback, connection_callback));
    ASSERT_SUCCESS(doca_comch_server_event_consumer_register(server, new_consumer_callback, expired_consumer_callback));

    struct doca_ctx *server_ctx = doca_comch_server_as_ctx(server);
    ASSERT_SUCCESS(doca_ctx_set_state_changed_cb(server_ctx, server_state_change_callback));
    ASSERT_SUCCESS(doca_pe_connect_ctx(pe, server_ctx));
    ASSERT_SUCCESS(doca_ctx_start(server_ctx));

    for(;;) {
        ASSERT_SUCCESS(doca_pe_request_notification(pe));
        struct epoll_event ep_event = { 0, { 0 } };
        epoll_wait(epoll_fd, &ep_event, 1, -1);
        ASSERT_SUCCESS(doca_pe_clear_notification(pe, 0));
        while(doca_pe_progress(pe) > 0) {
            // do nothing
        }

        enum doca_ctx_states server_state;
        ASSERT_SUCCESS(doca_ctx_get_state(server_ctx, &server_state));

        if(server_state == DOCA_CTX_STATE_IDLE) {
            break;
        }
    }

    doca_comch_server_destroy(server);
    doca_dev_rep_close(rep);
    doca_dev_close(dev);
    doca_pe_destroy(pe);
    close(epoll_fd);
}

Client side:

#include <doca_comch.h>
#include <doca_comch_consumer.h>
#include <doca_ctx.h>
#include <doca_dev.h>
#include <doca_mmap.h>
#include <doca_pe.h>

#include <inttypes.h>
#include <stdint.h>
#include <stdio.h>
#include <sys/epoll.h>
#include <unistd.h>

#define ASSERT_SUCCESS(expr) do { \
    doca_error_t err = (expr); \
    if(err != DOCA_SUCCESS && err != DOCA_ERROR_IN_PROGRESS) { \
        printf("Error in %s, line %d: %s\n", __func__, __LINE__, doca_error_get_name(err)); \
        fflush(stdout); \
        exit(-3); \
    } \
} while(0)


struct client_state {
    struct doca_comch_client *client;
    struct doca_comch_consumer *consumer;
    struct doca_pe *engine;
    struct doca_mmap *memmap;
    void *buffer;
    uint32_t buflen;
};

void consumer_state_change_callback(
    union doca_data user_data,
    struct doca_ctx *ctx,
    enum doca_ctx_states prev_state,
    enum doca_ctx_states next_state
) {
    (void) user_data;
    (void) ctx;

    printf("consumer state change %d -> %d\n", prev_state, next_state);

    if(next_state == DOCA_CTX_STATE_RUNNING) {
        ASSERT_SUCCESS(doca_ctx_stop(ctx));
    } else if(next_state == DOCA_CTX_STATE_IDLE) {
        struct client_state *state = user_data.ptr;
        ASSERT_SUCCESS(doca_comch_consumer_destroy(state->consumer));

        struct doca_ctx *client_ctx = doca_comch_client_as_ctx(state->client);
        // this fails too even though the consumer is already gone, but it's not the main problem.
        ASSERT_SUCCESS(doca_ctx_stop(client_ctx));
    }
}

void client_state_change_callback(
    union doca_data user_data,
    struct doca_ctx *ctx,
    enum doca_ctx_states prev_state,
    enum doca_ctx_states next_state
) {
    (void) ctx;

    printf("client state change %d -> %d\n", prev_state, next_state);

    if(next_state == DOCA_CTX_STATE_RUNNING) {
        struct client_state *state = user_data.ptr;
        struct doca_comch_connection *connection;
        ASSERT_SUCCESS(doca_comch_client_get_connection(state->client, &connection));

        struct doca_comch_consumer *consumer;
        ASSERT_SUCCESS(doca_comch_consumer_create(connection, state->memmap, &consumer));
        
        struct doca_ctx *consumer_ctx = doca_comch_consumer_as_ctx(consumer);
        ASSERT_SUCCESS(doca_ctx_set_state_changed_cb(consumer_ctx, consumer_state_change_callback));
        union doca_data consumer_user_data = { .ptr = state };
        ASSERT_SUCCESS(doca_ctx_set_user_data(consumer_ctx, consumer_user_data));
        ASSERT_SUCCESS(doca_pe_connect_ctx(state->engine, consumer_ctx));
        ASSERT_SUCCESS(doca_ctx_start(consumer_ctx));
    }
}

void send_task_completed_callback(
    struct doca_comch_task_send *task,
    union doca_data task_user_data,
    union doca_data ctx_user_data
) {
    (void) task_user_data;
    (void) ctx_user_data;

    doca_task_free(doca_comch_task_send_as_task(task));
}

void msg_recv_callback(
    struct doca_comch_event_msg_recv *event,
    uint8_t *recv_buffer,
    uint32_t msg_len,
    struct doca_comch_connection *connection
) {
    (void) event;
    (void) recv_buffer;
    (void) msg_len;
    (void) connection;
}

struct doca_dev *open_client_device(char const *pci_addr) {
    struct doca_dev *result = NULL;
    struct doca_devinfo **dev_list;
    uint32_t nb_devs;

    ASSERT_SUCCESS(doca_devinfo_create_list(&dev_list, &nb_devs));

    for(uint32_t i = 0; i < nb_devs; ++i) {
        uint8_t is_addr_equal = 0;

        ASSERT_SUCCESS(doca_devinfo_is_equal_pci_addr(dev_list[i], pci_addr, &is_addr_equal));

        if(is_addr_equal && doca_comch_cap_client_is_supported(dev_list[i]) == DOCA_SUCCESS) {
            ASSERT_SUCCESS(doca_dev_open(dev_list[i], &result));
            return result;
        }
    }

    exit(-1);
}

int main(void) {
    char const *dev_pci = getenv("DOCA_DEV");

    struct doca_dev *dev = open_client_device(dev_pci ? dev_pci : "e1:00.0");

    struct doca_pe *pe;
    ASSERT_SUCCESS(doca_pe_create(&pe));

    doca_event_handle_t event_handle;
    ASSERT_SUCCESS(doca_pe_get_notification_handle(pe, &event_handle));

    int epoll_fd = epoll_create1(EPOLL_CLOEXEC);
    struct epoll_event events_in = { EPOLLIN, { .fd = event_handle }};
    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, event_handle, &events_in);

    struct doca_comch_client *client;
    ASSERT_SUCCESS(doca_comch_client_create(dev, "consumer-start-bug", &client));
    ASSERT_SUCCESS(doca_comch_client_set_max_msg_size(client, 4080));
    ASSERT_SUCCESS(doca_comch_client_set_recv_queue_size(client, 16));
    ASSERT_SUCCESS(doca_comch_client_task_send_set_conf(client, send_task_completed_callback, send_task_completed_callback, 16));
    ASSERT_SUCCESS(doca_comch_client_event_msg_recv_register(client, msg_recv_callback));

    struct doca_ctx *client_ctx = doca_comch_client_as_ctx(client);
    ASSERT_SUCCESS(doca_ctx_set_state_changed_cb(client_ctx, client_state_change_callback));

    struct client_state *state = calloc(1, sizeof(struct client_state));
    state->client = client;
    state->engine = pe;
    state->buffer = calloc(1024, 1);
    state->buflen = 1024;
    ASSERT_SUCCESS(doca_mmap_create(&state->memmap));
    ASSERT_SUCCESS(doca_mmap_set_memrange(state->memmap, state->buffer, state->buflen));
    ASSERT_SUCCESS(doca_mmap_set_permissions(state->memmap, DOCA_ACCESS_FLAG_PCI_READ_WRITE));
    ASSERT_SUCCESS(doca_mmap_add_dev(state->memmap, dev));
    ASSERT_SUCCESS(doca_mmap_start(state->memmap));

    union doca_data client_user_data = { .ptr = state };
    ASSERT_SUCCESS(doca_ctx_set_user_data(client_ctx, client_user_data));
    ASSERT_SUCCESS(doca_pe_connect_ctx(pe, client_ctx));
    ASSERT_SUCCESS(doca_ctx_start(client_ctx));

    for(;;) {
        // comment out this line to make the sample work.
        // When notifications are requested, the consumer start event is lost on BF-3 (but not BF-2)
        ASSERT_SUCCESS(doca_pe_request_notification(pe));
        struct epoll_event ep_event = { 0, { 0 } };
        epoll_wait(epoll_fd, &ep_event, 1, 100);
        ASSERT_SUCCESS(doca_pe_clear_notification(pe, 0));

        while(doca_pe_progress(pe) > 0) {
            // do nothing
        }

        enum doca_ctx_states client_state;
        ASSERT_SUCCESS(doca_ctx_get_state(client_ctx, &client_state));

        if(client_state == DOCA_CTX_STATE_IDLE) {
            break;
        }
    }

    doca_comch_client_destroy(client);
    doca_mmap_destroy(state->memmap);
    free(state->buffer);
    free(state);
    doca_dev_close(dev);
    doca_pe_destroy(pe);
    close(epoll_fd);
}

The secondary problem is due to a bug in the minimal example. I forgot to set state->consumer = consumer; after creating the consumer, so doca_comch_consumer_destroy was called on a null pointer. Changing that, only the lost-event problem on BF-3 remains.