Ibv_poll_cq() not able to poll event and wating for infinitely

Hello forum members,

I hope this message finds you well. I am currently working on a project that involves receiving signals from an FPGA over Ethernet at 10Gbps, and I am specifically interested in implementing RoCE (RDMA over Converged Ethernet) for this purpose. To achieve this, I am using the ConnectX®-4 Lx EN Card.

This is the code i write for reciving UDP data packet over ethernet using rdma in unreliable datagram connection mode,but everyting the code indefinitly waits for ibv_poll_cq and not able to recive data packet

please help i am stuck.

include <stdlib.h>
include <string.h>
include <stdio.h>
include <errno.h>
include <sys/types.h>
include <netinet/in.h>
include <arpa/inet.h>
include <sys/socket.h>
include <netdb.h>
include <byteswap.h>
include <unistd.h>
include <getopt.h>
include <fcntl.h>
include <rdma/rdma_cma.h>

struct cmatest_node
{
int id;
struct rdma_cm_id *cma_id;
int connected;
struct ibv_pd *pd;
struct ibv_cq *cq;
struct ibv_mr *mr;
struct ibv_ah *ah;
uint32_t remote_qpn;
uint32_t remote_qkey;
void *mem;
};

struct cmatest
{
struct rdma_event_channel *channel;
struct cmatest_node *nodes;
int conn_index;
int connects_left;
struct sockaddr_in6 dst_in;
struct sockaddr *dst_addr;
struct sockaddr_in6 src_in;
struct sockaddr *src_addr;
};

static struct cmatest test;
static int connections = 1;
static int message_size = 1456;
static int message_count = 10;
static char *dst_addr;
static char *src_addr;
static enum rdma_port_space port_space = RDMA_PS_UDP;

define CHUNK_SIZE (1024 * 1024) ///< (1MB) Size of chunks to devide the main buffer, while writing to a file.

static int create_message(struct cmatest_node *node)
{
if (!message_size)
message_count = 0;

if (!message_count)
return 0;

node->mem = malloc(message_size + sizeof(struct ibv_grh));
if (!node->mem)
{
printf(“failed message allocation\n”);
return -1;
}

node->mr = ibv_reg_mr(node->pd, node->mem, message_size + sizeof(struct ibv_grh),IBV_ACCESS_LOCAL_WRITE);

if (!node->mr)
{
printf(“failed to reg MR\n”);
goto err;
}

return 0;

err:
free(node->mem);
return -1;
}

static int verify_test_params(struct cmatest_node *node)
{

struct ibv_port_attr port_attr;
int ret;

ret = ibv_query_port(node->cma_id->verbs, node->cma_id->port_num, &port_attr);
if (ret)
return ret;

port_attr.active_mtu =IBV_MTU_4096;

if (message_count && message_size > (1 << (port_attr.active_mtu + 7)))
{
printf(“mckey: message_size %d is larger than active mtu %d\n”, message_size, 1 <<
(port_attr.active_mtu + 7));
return -EINVAL;
}

return 0;
}

static int init_node(struct cmatest_node *node)
{

struct ibv_qp_init_attr init_qp_attr;
int cqe, ret;

node->pd = ibv_alloc_pd(node->cma_id->verbs);
if (!node->pd)
{
ret = -ENOMEM;
printf(“mckey: unable to allocate PD\n”);
goto out;
}

cqe = message_count ? message_count * 2 : 2;

node->cq = ibv_create_cq(node->cma_id->verbs, cqe, node, 0, 0);
if (!node->cq)
{
ret = -ENOMEM;
printf(“mckey: unable to create CQ\n”);
goto out;
}

memset(&init_qp_attr, 0, sizeof init_qp_attr);

init_qp_attr.cap.max_send_wr = message_count ? message_count : 1;
init_qp_attr.cap.max_recv_wr = message_count ? message_count : 1;
init_qp_attr.cap.max_send_sge = 1;
init_qp_attr.cap.max_recv_sge = 1;
init_qp_attr.qp_context = node;
init_qp_attr.sq_sig_all = 0;
init_qp_attr.qp_type = IBV_QPT_UD;
init_qp_attr.send_cq = node->cq;
init_qp_attr.recv_cq = node->cq;
ret = rdma_create_qp(node->cma_id, node->pd, &init_qp_attr);

if (ret)
{
printf(“mckey: unable to create QP: %d\n”, ret);
goto out;
}

ret = create_message(node);
if (ret)
{
printf(“mckey: failed to create messages: %d\n”, ret);
goto out;
}

out:
return ret;
}

static int post_recvs(struct cmatest_node *node)
{

struct ibv_recv_wr recv_wr, *recv_failure;
struct ibv_sge sge;
int i, ret = 0;

if (!message_count)
return 0;

recv_wr.next = NULL;
recv_wr.sg_list = &sge;
recv_wr.num_sge = 1;
recv_wr.wr_id = (uintptr_t) node;

sge.length = message_size + sizeof(struct ibv_grh);
sge.lkey = node->mr->lkey;
sge.addr = (uintptr_t) node->mem;

for (i = 0; i < message_count && !ret; i++ )
{
ret = ibv_post_recv(node->cma_id->qp, &recv_wr, &recv_failure);

if (ret)
{
printf(“failed to post receives: %d\n”, ret);
break;
}

}

return ret;
}

static void connect_error(void)
{

test.connects_left–;
}

static int addr_handler(struct cmatest_node *node,struct rdma_ud_param *param)
{

int ret;

ret = verify_test_params(node);
if (ret)
goto err;

ret = init_node(node);
if (ret)
goto err;

ret = post_recvs(node);
if (ret)
goto err;

node->connected = 1;
test.connects_left–;

return 0;
err:
connect_error();
return ret;
}

static int cma_handler(struct rdma_cm_id *cma_id, struct rdma_cm_event *event)
{

int ret = 0;
switch (event->event)
{
case RDMA_CM_EVENT_ADDR_RESOLVED:
ret = addr_handler(cma_id->context,&event->param.ud);
break;

case RDMA_CM_EVENT_ADDR_ERROR:
case RDMA_CM_EVENT_ROUTE_ERROR:
printf(“mckey: event: %s, error: %d\n”, rdma_event_str(event->event), event->status);
connect_error();
ret = event->status;
break;

case RDMA_CM_EVENT_DEVICE_REMOVAL:
/* Cleanup will occur after test completes. */
break;

default:
break;
}

return ret;
}

static void destroy_node(struct cmatest_node *node)
{

if (!node->cma_id)
return;

if (node->ah)
ibv_destroy_ah(node->ah);

if (node->cma_id->qp)
rdma_destroy_qp(node->cma_id);

if (node->cq)
ibv_destroy_cq(node->cq);

if (node->mem)
{
ibv_dereg_mr(node->mr);
free(node->mem);
}

if (node->pd)
ibv_dealloc_pd(node->pd);

/* Destroy the RDMA ID after all device resources */
rdma_destroy_id(node->cma_id);
}

static int alloc_nodes(void)
{

int ret, i;
test.nodes = malloc(sizeof *test.nodes * connections);
if (!test.nodes)
{
printf(“mckey: unable to allocate memory for test nodes\n”);
return -ENOMEM;
}

memset(test.nodes, 0, sizeof *test.nodes * connections);

for (i = 0; i < connections; i++)
{
test.nodes[i].id = i;
ret = rdma_create_id(test.channel, &test.nodes[i].cma_id, &test.nodes[i], port_space);
if (ret)
goto err;
}

return 0;
err:
while (–i >= 0)
rdma_destroy_id(test.nodes[i].cma_id);
free(test.nodes);
return ret;
}

static void destroy_nodes(void)
{

int i;
for (i = 0; i < connections; i++)
destroy_node(&test.nodes[i]);
free(test.nodes);
}

void read_data_from_memory(struct cmatest_node *node)
{

// Add your code here to read data from 'node->mem'
int file_descriptor;
ssize_t total_bytes_received = 0;
// Save the buffer to a binary file
file_descriptor = open("data2.bin", O_CREAT | O_WRONLY | O_TRUNC, 0644);
if (file_descriptor < 0)
{
    perror("file creation failed");
    exit(EXIT_FAILURE);
}

ssize_t bytes_written;
ssize_t remaining_bytes = total_bytes_received;
off_t offset = 0;

// Write the buffer to the file in chunks
while (remaining_bytes > 0)
{
    ssize_t chunk_size = (remaining_bytes < CHUNK_SIZE) ? remaining_bytes : CHUNK_SIZE;
    bytes_written = pwrite(file_descriptor, node->mem + offset, chunk_size, offset);

    if (bytes_written < 0)
    {
        perror("file write failed");
        exit(EXIT_FAILURE);
    }

    remaining_bytes -= bytes_written;
    offset += bytes_written;
}

// Close the file after writing
close(file_descriptor);

}

static int poll_cqs(void)
{
struct ibv_wc wc[8];
int done, i, ret;

for (i = 0; i < connections; i++)
{

if (!test.nodes[i].connected)
    continue;

for (done = 0; done < message_count; done += ret)
{

    ret = ibv_poll_cq(test.nodes[i].cq, 8, wc);
    if (ret < 0)
    {
        printf("mckey: failed polling CQ: %d\n", ret);
        return ret;
    }

}

}

// Read data from memory after data transfer is complete
for (i = 0; i < connections; i++)
{
// Assuming each node has its own data buffer (node->mem)
read_data_from_memory(&test.nodes[i]);
}

return 0;
}

static int connect_events(void)
{

struct rdma_cm_event *event;
int ret = 0;

while (test.connects_left && !ret)
{
ret = rdma_get_cm_event(test.channel, &event);
if (!ret)
{

ret = cma_handler(event->id, event);
rdma_ack_cm_event(event);

}

}

return ret;
}

static int get_addr(char *dst, struct sockaddr *addr,int local)
{

struct addrinfo *res;
int ret;
struct addrinfo hints = {
.ai_family = AF_INET,
};
//ret = getaddrinfo(dst, NULL, NULL, &res);

if(local)
ret = getaddrinfo(dst, “4660”, &hints, &res);
else
ret = getaddrinfo(dst, “22136”, &hints, &res);

if (ret)
{
printf(“getaddrinfo failed - invalid hostname or IP address\n”);
return ret;
}
memcpy(addr, res->ai_addr, res->ai_addrlen);
freeaddrinfo(res);
return ret;
}

static int run(void)
{

int i, ret;
printf(“mckey: starting server\n”);

if (src_addr)
{
ret = get_addr(src_addr, (struct sockaddr *) &test.src_in,1);
if (ret)
return ret;
}

ret = get_addr(dst_addr, (struct sockaddr *) &test.dst_in,0);
if (ret)
return ret;

printf(“mckey: joining\n”);
for (i = 0; i < connections; i++)
{
if (src_addr)
{
ret = rdma_bind_addr(test.nodes[i].cma_id, test.src_addr);
if (ret)
{
printf(“mckey: addr bind failure: %d\n”, ret);
connect_error();
return ret;
}
}

ret = rdma_resolve_addr(test.nodes[i].cma_id, test.src_addr, test.dst_addr, 2000);

if (ret)
{
printf("mckey: resolve addr failure: %d\n", ret);
connect_error();
return ret;
}

}

ret = connect_events();
if (ret)
goto out;

/*

  • Pause to give SM chance to configure switches. We don’t want to
  • handle reliability issue in this simple test program.
    */

sleep(3);

if (message_count)
{

printf(“receiving data transfers\n”);
ret = poll_cqs();
if (ret)
goto out;

printf(“data transfers complete\n”);
}

out:
return ret;
}

int main(int argc, char **argv)
{
int op, ret;
while ((op = getopt(argc, argv, “m:b:”)) != -1) {
switch (op) {
case ‘m’:
dst_addr = optarg;
break;

    case 'b':
        src_addr = optarg;
        test.src_addr = (struct sockaddr *) &test.src_in;
        break;

    default:
        printf("usage: %s\n", argv[0]);
        printf("\t-m dest_address\n");
        printf("\t-b bind_address\n");
        exit(1);
}

}

test.dst_addr = (struct sockaddr *) &test.dst_in;
test.connects_left = connections;
test.channel = rdma_create_event_channel();
if (!test.channel)
{
printf(“failed to create event channel\n”);
exit(1);
}

if (alloc_nodes())
exit(1);

ret = run();

printf(“test complete\n”);
destroy_nodes();
rdma_destroy_event_channel(test.channel);

printf(“return status %d\n”, ret);
return ret;
}

You should test CX HCA to CX HCA first. BTW, CX4 had been EOL.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.