Using AMD Radeon PRO W6800 and CX6 HCA to test the GPU Direct RDMA, using Send/Recv operation, it was found that when sending data from CPU memory, the receiving buffer on GPU memory is OK, but when sending data from GPU memory, the receiving buffer on CPU memory is failed. The Send side error log shows wc. status=0x4, wc.vendor_err=0x51.
The same error applies to GPU memory on both sides. The same goes for RDMA write operation.
The code and logs are as follows:
/*
log in server(local buf is in CPU memory):
$ ./rdma_peer_direct_test -p 19997
[INFO] rdma_peer_direct_test_commit.cpp:1004 - ------------------------------------------------
[INFO] rdma_peer_direct_test_commit.cpp:1005 - Device name : "(null)"
[INFO] rdma_peer_direct_test_commit.cpp:1006 - IB port : 1
[INFO] rdma_peer_direct_test_commit.cpp:1010 - TCP port : 19997
[INFO] rdma_peer_direct_test_commit.cpp:1014 - ------------------------------------------------
[INFO] rdma_peer_direct_test_commit.cpp: 511 - waiting on port 19997 for TCP connection
[INFO] rdma_peer_direct_test_commit.cpp: 520 - TCP connection was established
[INFO] rdma_peer_direct_test_commit.cpp: 521 - searching for IB devices in host
[INFO] rdma_peer_direct_test_commit.cpp: 537 - found 2 device(s)
[INFO] rdma_peer_direct_test_commit.cpp: 544 - device not specified, using first one found: mlx5_0
[INFO] rdma_peer_direct_test_commit.cpp: 608 - res->buf: ''
[INFO] rdma_peer_direct_test_commit.cpp: 629 - MR was registered with addr=0x1253b10, lkey=0x126e9, rkey=0x126e9, size=0x40, flags=0x7
[INFO] rdma_peer_direct_test_commit.cpp: 647 - QP was created, QP number=0x73
[INFO] rdma_peer_direct_test_commit.cpp: 844 - Local LID = 0x1
[INFO] rdma_peer_direct_test_commit.cpp: 868 - Remote address = 0x7faa13c00000
[INFO] rdma_peer_direct_test_commit.cpp: 869 - Remote rkey = 0x1820f8
[INFO] rdma_peer_direct_test_commit.cpp: 870 - Remote QP number = 0x74
[INFO] rdma_peer_direct_test_commit.cpp: 871 - Remote LID = 0x1
[INFO] rdma_peer_direct_test_commit.cpp: 454 - Receive Request was posted
[INFO] rdma_peer_direct_test_commit.cpp: 911 - QP state was change to RTS
[ERROR] rdma_peer_direct_test_commit.cpp: 330 - completion wasn't found in the CQ after timeout
[ERROR] rdma_peer_direct_test_commit.cpp:1139 - poll completion failed
[INFO] rdma_peer_direct_test_commit.cpp:1155 - ************test result is 1**************
log in client(local buf is in GPU memory):
$ ./rdma_peer_direct_test 10.31.91.80 -p 19997
[INFO] rdma_peer_direct_test_commit.cpp:1004 - ------------------------------------------------
[INFO] rdma_peer_direct_test_commit.cpp:1005 - Device name : "(null)"
[INFO] rdma_peer_direct_test_commit.cpp:1006 - IB port : 1
[INFO] rdma_peer_direct_test_commit.cpp:1008 - IP : 10.31.91.80
[INFO] rdma_peer_direct_test_commit.cpp:1010 - TCP port : 19997
[INFO] rdma_peer_direct_test_commit.cpp:1014 - ------------------------------------------------
[INFO] rdma_peer_direct_test_commit.cpp: 502 - Preparing to connect to server 10.31.91.80:19997
[INFO] rdma_peer_direct_test_commit.cpp: 520 - TCP connection was established
[INFO] rdma_peer_direct_test_commit.cpp: 521 - searching for IB devices in host
[INFO] rdma_peer_direct_test_commit.cpp: 537 - found 2 device(s)
[INFO] rdma_peer_direct_test_commit.cpp: 544 - device not specified, using first one found: mlx5_0
[INFO] rdma_peer_direct_test_commit.cpp: 615 - res->buf: 'SEND operation '
[INFO] rdma_peer_direct_test_commit.cpp: 629 - MR was registered with addr=0x7faa13c00000, lkey=0x1820f8, rkey=0x1820f8, size=0x40, flags=0x7
[INFO] rdma_peer_direct_test_commit.cpp: 647 - QP was created, QP number=0x74
[INFO] rdma_peer_direct_test_commit.cpp: 844 - Local LID = 0x1
[INFO] rdma_peer_direct_test_commit.cpp: 868 - Remote address = 0x1253b10
[INFO] rdma_peer_direct_test_commit.cpp: 869 - Remote rkey = 0x126e9
[INFO] rdma_peer_direct_test_commit.cpp: 870 - Remote QP number = 0x73
[INFO] rdma_peer_direct_test_commit.cpp: 871 - Remote LID = 0x1
[INFO] rdma_peer_direct_test_commit.cpp: 454 - Receive Request was posted
[INFO] rdma_peer_direct_test_commit.cpp: 911 - QP state was change to RTS
[INFO] rdma_peer_direct_test_commit.cpp: 407 - RDMA Write with Immediate Request was posted
[ERROR] rdma_peer_direct_test_commit.cpp: 339 - got bad completion with status: 0x4, vendor syndrome: 0x51
[ERROR] rdma_peer_direct_test_commit.cpp:1139 - poll completion failed
[INFO] rdma_peer_direct_test_commit.cpp:1155 - ************test result is 1**************
*/
/*
* BUILD COMMAND:
* hipcc -Wall -O2 -g -I/opt/rocm/include -D__HIP_PLATFORM_AMD__ -L/opt/rocm/lib -lamdhip64 -o rdma_peer_direct_test rdma_peer_direct_test_commit.cpp -libverbs
*/
#include <iostream>
#include <arpa/inet.h>
#include <byteswap.h>
#include <endian.h>
#include <getopt.h>
#include <infiniband/verbs.h>
#include <inttypes.h>
#include <netdb.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdarg.h>
#include <hip/hip_runtime.h>
#include <hip/hip_runtime_api.h>
#define LOG_LEVEL_DEBUG 0
#define LOG_LEVEL_INFO 1
#define LOG_LEVEL_WARN 2
#define LOG_LEVEL_ERROR 3
int log_level = LOG_LEVEL_INFO;
void log_message(int level, const char *funcname, int line, const char *format,
...) {
const char* env_log_level = getenv("RDMA_TESTS_LOG_LEVEL");
if (env_log_level != NULL) {
if (strcmp(env_log_level, "DEBUG") == 0) {
log_level = LOG_LEVEL_DEBUG;
} else if (strcmp(env_log_level, "WARN") == 0) {
log_level = LOG_LEVEL_WARN;
} else if (strcmp(env_log_level, "ERROR") == 0) {
log_level = LOG_LEVEL_ERROR;
}
}
if (level < log_level) {
return;
}
char buffer[1024];
va_list args;
va_start(args, format);
vsnprintf(buffer, sizeof(buffer), format, args);
va_end(args);
char level_str[10];
switch (level) {
case LOG_LEVEL_DEBUG:
strcpy(level_str, "DEBUG");
break;
case LOG_LEVEL_INFO:
strcpy(level_str, "INFO");
break;
case LOG_LEVEL_WARN:
strcpy(level_str, "WARN");
break;
case LOG_LEVEL_ERROR:
strcpy(level_str, "ERROR");
break;
default:
strcpy(level_str, "UNKNOWN");
}
printf("[%s] %s:%4d - %s\n", level_str, funcname, line, buffer);
}
#define LOG_DEBUG(format, ...) \
log_message(LOG_LEVEL_DEBUG, __FILE__, __LINE__, format, ##__VA_ARGS__)
#define LOG_INFO(format, ...) \
log_message(LOG_LEVEL_INFO, __FILE__, __LINE__, format, ##__VA_ARGS__)
#define LOG_WARN(format, ...) \
log_message(LOG_LEVEL_WARN, __FILE__, __LINE__, format, ##__VA_ARGS__)
#define LOG_ERROR(format, ...) \
log_message(LOG_LEVEL_ERROR, __FILE__, __LINE__, format, ##__VA_ARGS__)
/* poll CQ timeout in millisec (2 seconds) */
#define MAX_POLL_CQ_TIMEOUT 2000
#define MSG "SEND operation "
#define MSG_SIZE 64
#define IMM_DATA 100
#if __BYTE_ORDER == __LITTLE_ENDIAN
static inline uint64_t htonll(uint64_t x) { return bswap_64(x); }
static inline uint64_t ntohll(uint64_t x) { return bswap_64(x); }
#elif __BYTE_ORDER == __BIG_ENDIAN
static inline uint64_t htonll(uint64_t x) { return x; }
static inline uint64_t ntohll(uint64_t x) { return x; }
#else
#error __BYTE_ORDER is neither __LITTLE_ENDIAN nor __BIG_ENDIAN
#endif
#define HIP_CALL(cmd) \
do { \
hipError_t error = (cmd); \
if (error != hipSuccess) \
{ \
LOG_ERROR("Encountered HIP error (%s)", hipGetErrorString(error)); \
exit(-1); \
} \
} while (0)
/* structure of test parameters */
struct config_t {
const char *dev_name; /* IB device name */
char *server_name; /* server host name */
uint32_t tcp_port; /* server TCP port */
int ib_port; /* local IB port to work with */
int gid_idx; /* gid index to use */
};
/* structure to exchange data which is needed to connect the QPs */
struct cm_con_data_t {
uint64_t addr; /* Buffer address */
uint32_t rkey; /* Remote key */
uint32_t qp_num; /* QP number */
uint16_t lid; /* LID of the IB port */
uint8_t gid[16]; /* gid */
} __attribute__((packed));
/* structure of system resources */
struct resources {
struct ibv_device_attr device_attr; /* Device attributes */
struct ibv_port_attr port_attr; /* IB port attributes */
struct cm_con_data_t remote_props; /* values to connect to remote side */
struct ibv_context *ib_ctx; /* device handle */
struct ibv_pd *pd; /* PD handle */
struct ibv_cq *cq; /* CQ handle */
struct ibv_qp *qp; /* QP handle */
struct ibv_mr *mr; /* MR handle for buf */
char *buf; /* memory buffer pointer, used for RDMA and send ops */
int sock; /* TCP socket file descriptor */
};
struct config_t config = {
NULL, /* dev_name */
NULL, /* server_name */
19875, /* tcp_port */
1, /* ib_port */
-1 /* gid_idx */
};
/******************************************************************************
Socket operations:
For simplicity, the example program uses TCP sockets to exchange control
information. If a TCP/IP stack/connection is not available, connection manager
(CM) may be used to pass this information. Use of CM is beyond the scope of
this example
******************************************************************************/
/******************************************************************************
* Function: sock_connect
* Input:
* servername: URL of server to connect to (NULL for server mode)
* port: port of service
*
* Output:none
*
* Returns: socket (fd) on success, negative error code on failure
*
* Description:
* Connect a socket. If servername is specified a client connection will be
* initiated to the indicated server and port. Otherwise listen on the
* indicated port for an incoming connection.
*
******************************************************************************/
static int sock_connect(const char *servername, int port) {
struct addrinfo *resolved_addr = NULL;
struct addrinfo *iterator;
char service[6];
int sockfd = -1;
int listenfd = 0;
int tmp;
struct addrinfo hints = {
.ai_flags = AI_PASSIVE, .ai_family = AF_INET, .ai_socktype = SOCK_STREAM};
if (sprintf(service, "%d", port) < 0) {
goto sock_connect_exit;
}
/* Resolve DNS address, use sockfd as temp storage */
sockfd = getaddrinfo(servername, service, &hints, &resolved_addr);
if (sockfd < 0) {
LOG_ERROR("%s for %s:%d", gai_strerror(sockfd), servername, port);
goto sock_connect_exit;
}
/* Search through results and find the one we want */
for (iterator = resolved_addr; iterator; iterator = iterator->ai_next) {
sockfd = socket(iterator->ai_family, iterator->ai_socktype,
iterator->ai_protocol);
if (sockfd >= 0) {
if (servername) {
/* Client mode. Initiate connection to remote */
if ((tmp = connect(sockfd, iterator->ai_addr, iterator->ai_addrlen))) {
LOG_ERROR("failed connect ");
close(sockfd);
sockfd = -1;
}
} else {
/* Server mode. Set up listening socket an accept a connection
*/
listenfd = sockfd;
sockfd = -1;
if (bind(listenfd, iterator->ai_addr, iterator->ai_addrlen)) {
goto sock_connect_exit;
}
listen(listenfd, 1);
sockfd = accept(listenfd, NULL, 0);
}
}
}
sock_connect_exit:
if (listenfd) {
close(listenfd);
}
if (resolved_addr) {
freeaddrinfo(resolved_addr);
}
if (sockfd < 0) {
if (servername) {
LOG_ERROR("Couldn't connect to %s:%d", servername, port);
} else {
perror("server accept");
LOG_ERROR("accept() failed");
}
}
return sockfd;
}
/******************************************************************************
* Function: sock_sync_data
* Input:
* sock: socket to transfer data on
* xfer_size: size of data to transfer
* local_data: pointer to data to be sent to remote
*
* Output: remote_data pointer to buffer to receive remote data
*
* Returns: 0 on success, negative error code on failure
*
* Description:
* Sync data across a socket. The indicated local data will be sent to the
* remote. It will then wait for the remote to send its data back. It is
* assumed that the two sides are in sync and call this function in the proper
* order. Chaos will ensue if they are not. :)
*
* Also note this is a blocking function and will wait for the full data to be
* received from the remote.
*
******************************************************************************/
int sock_sync_data(int sock, int xfer_size, char *local_data,
char *remote_data) {
int rc;
int read_bytes = 0;
int total_read_bytes = 0;
rc = write(sock, local_data, xfer_size);
if (rc < xfer_size) {
LOG_ERROR("Failed writing data during sock_sync_data");
} else {
rc = 0;
}
while (!rc && total_read_bytes < xfer_size) {
read_bytes = read(sock, remote_data, xfer_size);
if (read_bytes > 0) {
total_read_bytes += read_bytes;
} else {
rc = read_bytes;
}
}
return rc;
}
/******************************************************************************
End of socket operations
******************************************************************************/
/* poll_completion */
/******************************************************************************
* Function: poll_completion
*
* Input:
* res: pointer to resources structure
*
* Output: none
*
* Returns: 0 on success, 1 on failure
*
* Description:
* Poll the completion queue for a single event. This function will continue to
* poll the queue until MAX_POLL_CQ_TIMEOUT milliseconds have passed.
*
******************************************************************************/
static int poll_completion(struct resources *res) {
struct ibv_wc wc;
unsigned long start_time_msec;
unsigned long cur_time_msec;
struct timeval cur_time;
int poll_result;
int rc = 0;
/* poll the completion for a while before giving up of doing it .. */
gettimeofday(&cur_time, NULL);
start_time_msec = (cur_time.tv_sec * 1000) + (cur_time.tv_usec / 1000);
do {
poll_result = ibv_poll_cq(res->cq, 1, &wc);
gettimeofday(&cur_time, NULL);
cur_time_msec = (cur_time.tv_sec * 1000) + (cur_time.tv_usec / 1000);
} while ((poll_result == 0) &&
((cur_time_msec - start_time_msec) < MAX_POLL_CQ_TIMEOUT));
if (poll_result < 0) {
/* poll CQ failed */
LOG_ERROR("poll CQ failed");
rc = 1;
} else if (poll_result == 0) {
/* the CQ is empty */
LOG_ERROR("completion wasn't found in the CQ after timeout");
rc = 1;
} else {
/* CQE found */
LOG_DEBUG("completion was found in CQ with status 0x%x", wc.status);
/* check the completion status (here we don't care about the completion
* opcode */
if (wc.status != IBV_WC_SUCCESS) {
LOG_ERROR("got bad completion with status: 0x%x, vendor syndrome: 0x%x",
wc.status, wc.vendor_err);
rc = 1;
}
}
return rc;
}
/******************************************************************************
* Function: post_send
*
* Input:
* res: pointer to resources structure
* opcode: IBV_WR_SEND, IBV_WR_RDMA_READ or IBV_WR_RDMA_WRITE
*
* Output: none
*
* Returns: 0 on success, error code on failure
*
* Description: This function will create and post a send work request
******************************************************************************/
static int post_send(struct resources *res, enum ibv_wr_opcode opcode) {
struct ibv_send_wr sr;
struct ibv_sge sge;
struct ibv_send_wr *bad_wr = NULL;
int rc;
/* prepare the scatter/gather entry */
memset(&sge, 0, sizeof(sge));
sge.addr = (uintptr_t)res->buf;
sge.length = MSG_SIZE;
sge.lkey = res->mr->lkey;
/* prepare the send work request */
memset(&sr, 0, sizeof(sr));
sr.next = NULL;
sr.wr_id = 0;
sr.sg_list = &sge;
sr.num_sge = 1;
sr.opcode = opcode;
sr.send_flags = IBV_SEND_SIGNALED;
if (opcode != IBV_WR_SEND && opcode != IBV_WR_SEND_WITH_IMM) {
sr.wr.rdma.remote_addr = res->remote_props.addr;
sr.wr.rdma.rkey = res->remote_props.rkey;
}
if (opcode == IBV_WR_SEND_WITH_IMM || opcode == IBV_WR_RDMA_WRITE_WITH_IMM) {
sr.imm_data = IMM_DATA;
}
/* there is a Receive Request in the responder side, so we won't get any
* into RNR flow */
rc = ibv_post_send(res->qp, &sr, &bad_wr);
if (rc) {
LOG_ERROR("failed to post SR");
} else {
switch (opcode) {
case IBV_WR_SEND:
LOG_INFO("Send Request was posted");
break;
case IBV_WR_SEND_WITH_IMM:
LOG_INFO("Send with Immediate Request was posted");
break;
case IBV_WR_RDMA_READ:
LOG_INFO("RDMA Read Request was posted");
break;
case IBV_WR_RDMA_WRITE:
LOG_INFO("RDMA Write Request was posted");
break;
case IBV_WR_RDMA_WRITE_WITH_IMM:
LOG_INFO("RDMA Write with Immediate Request was posted");
break;
default:
LOG_ERROR("Unknown Request was posted");
break;
}
}
return rc;
}
/******************************************************************************
* Function: post_receive
*
* Input:
* res: pointer to resources structure
*
* Output: none
*
* Returns: 0 on success, error code on failure
*
* Description: post RR to be prepared for incoming messages
*
******************************************************************************/
static int post_receive(struct resources *res) {
struct ibv_recv_wr rr;
struct ibv_sge sge;
struct ibv_recv_wr *bad_wr;
int rc;
/* prepare the scatter/gather entry */
memset(&sge, 0, sizeof(sge));
sge.addr = (uintptr_t)res->buf;
sge.length = MSG_SIZE;
sge.lkey = res->mr->lkey;
/* prepare the receive work request */
memset(&rr, 0, sizeof(rr));
rr.next = NULL;
rr.wr_id = 0;
rr.sg_list = &sge;
rr.num_sge = 1;
/* post the Receive Request to the RQ */
rc = ibv_post_recv(res->qp, &rr, &bad_wr);
if (rc) {
LOG_ERROR("failed to post RR");
} else {
LOG_INFO("Receive Request was posted");
}
return rc;
}
/******************************************************************************
* Function: resources_init
*
* Input:
* res: pointer to resources structure
*
* Output: res is initialized
*
* Returns: none
*
* Description: res is initialized to default values
******************************************************************************/
static void resources_init(struct resources *res) {
memset(res, 0, sizeof *res);
res->sock = -1;
}
/******************************************************************************
* Function: resources_create
*
* Input: res pointer to resources structure to be filled in
*
* Output: res filled in with resources
*
* Returns: 0 on success, 1 on failure
*
* Description:
* This function creates and allocates all necessary system resources. These
* are stored in res.
*****************************************************************************/
static int resources_create(struct resources *res) {
struct ibv_device **dev_list = NULL;
struct ibv_qp_init_attr qp_init_attr;
struct ibv_device *ib_dev = NULL;
size_t size;
int i;
int mr_flags = 0;
int cq_size = 0;
int num_devices;
int rc = 0;
/* if client side */
if (config.server_name) {
LOG_INFO("Preparing to connect to server %s:%d", config.server_name, config.tcp_port); // server_name是服务端的ip地址
res->sock = sock_connect(config.server_name, config.tcp_port);
if (res->sock < 0) {
LOG_ERROR("failed to establish TCP connection to server %s, port %d",
config.server_name, config.tcp_port);
rc = -1;
goto resources_create_exit;
}
} else {
LOG_INFO("waiting on port %d for TCP connection", config.tcp_port);
res->sock = sock_connect(NULL, config.tcp_port);
if (res->sock < 0) {
LOG_ERROR("failed to establish TCP connection with client on port %d",
config.tcp_port);
rc = -1;
goto resources_create_exit;
}
}
LOG_INFO("TCP connection was established");
LOG_INFO("searching for IB devices in host");
/* get device names in the system */
dev_list = ibv_get_device_list(&num_devices);
if (!dev_list) {
LOG_ERROR("failed to get IB devices list");
rc = 1;
goto resources_create_exit;
}
/* if there isn't any IB device in host */
if (!num_devices) {
LOG_ERROR("found %d device(s)", num_devices);
rc = 1;
goto resources_create_exit;
}
LOG_INFO("found %d device(s)", num_devices);
/* search for the specific device we want to work with */
for (i = 0; i < num_devices; i++) {
if (!config.dev_name) {
config.dev_name = strdup(ibv_get_device_name(dev_list[i]));
LOG_INFO("device not specified, using first one found: %s",
config.dev_name);
}
/* find the specific device */
if (!strcmp(ibv_get_device_name(dev_list[i]), config.dev_name)) {
ib_dev = dev_list[i];
break;
}
}
/* if the device wasn't found in host */
if (!ib_dev) {
LOG_ERROR("IB device %s wasn't found", config.dev_name);
rc = 1;
goto resources_create_exit;
}
/* get device handle */
res->ib_ctx = ibv_open_device(ib_dev);
if (!res->ib_ctx) {
LOG_ERROR("failed to open device %s", config.dev_name);
rc = 1;
goto resources_create_exit;
}
/* We are now done with device list, free it */
ibv_free_device_list(dev_list);
dev_list = NULL;
ib_dev = NULL;
/* query port properties */
if (ibv_query_port(res->ib_ctx, config.ib_port, &res->port_attr)) {
LOG_ERROR("ibv_query_port on port %u failed", config.ib_port);
rc = 1;
goto resources_create_exit;
}
/* allocate Protection Domain */
res->pd = ibv_alloc_pd(res->ib_ctx);
if (!res->pd) {
LOG_ERROR("ibv_alloc_pd failed");
rc = 1;
goto resources_create_exit;
}
/* each side will send only one WR, so Completion Queue with 1 entry is enough */
cq_size = 1;
res->cq = ibv_create_cq(res->ib_ctx, cq_size, NULL, NULL, 0);
if (!res->cq) {
LOG_ERROR("failed to create CQ with %u entries", cq_size);
rc = 1;
goto resources_create_exit;
}
/* allocate the memory buffer that will hold the data */
size = MSG_SIZE;
if (!config.server_name) {
res->buf = (char *)malloc(size);
if (!res->buf) {
LOG_ERROR("failed to malloc %Zu bytes to memory buffer", size);
rc = 1;
goto resources_create_exit;
}
memset(res->buf, 0, size);
// strcpy(res->buf, MSG);
LOG_INFO("res->buf: '%s'", res->buf);
} else {
HIP_CALL(hipSetDevice(0));
HIP_CALL(hipMalloc((void**)&res->buf, size * sizeof(char)));
HIP_CALL(hipMemcpy(res->buf, MSG, size * sizeof(char), hipMemcpyHostToDevice));
char *tmp_buf = (char *)malloc(size * sizeof(char));
HIP_CALL(hipMemcpy(tmp_buf, res->buf, size * sizeof(char), hipMemcpyDeviceToHost));
LOG_INFO("res->buf: '%s'", tmp_buf);
free(tmp_buf);
}
/* register the memory buffer */
mr_flags =
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
res->mr = ibv_reg_mr(res->pd, res->buf, size, mr_flags);
if (!res->mr) {
LOG_ERROR("ibv_reg_mr failed with mr_flags=0x%x", mr_flags);
rc = 1;
goto resources_create_exit;
}
LOG_INFO("MR was registered with addr=%p, lkey=0x%x, rkey=0x%x, size=0x%x, flags=0x%x",
res->buf, res->mr->lkey, res->mr->rkey, size, mr_flags);
/* create the Queue Pair */
memset(&qp_init_attr, 0, sizeof(qp_init_attr));
qp_init_attr.qp_type = IBV_QPT_RC;
qp_init_attr.sq_sig_all = 1; // 1表示QP中每个SR都会生成CQE
qp_init_attr.send_cq = res->cq; // SQ和RQ共用一个CQ
qp_init_attr.recv_cq = res->cq; // SQ和RQ共用一个CQ
qp_init_attr.cap.max_send_wr = 1;
qp_init_attr.cap.max_recv_wr = 1;
qp_init_attr.cap.max_send_sge = 1;
qp_init_attr.cap.max_recv_sge = 1;
res->qp = ibv_create_qp(res->pd, &qp_init_attr);
if (!res->qp) {
LOG_ERROR("failed to create QP");
rc = 1;
goto resources_create_exit;
}
LOG_INFO("QP was created, QP number=0x%x", res->qp->qp_num);
resources_create_exit:
if (rc) {
/* Error encountered, cleanup */
if (res->qp) {
ibv_destroy_qp(res->qp);
res->qp = NULL;
}
if (res->mr) {
ibv_dereg_mr(res->mr);
res->mr = NULL;
}
if (res->buf) {
if (!config.server_name) {
free(res->buf);
} else {
HIP_CALL(hipFree(res->buf));
}
}
if (res->cq) {
ibv_destroy_cq(res->cq);
res->cq = NULL;
}
if (res->pd) {
ibv_dealloc_pd(res->pd);
res->pd = NULL;
}
if (res->ib_ctx) {
ibv_close_device(res->ib_ctx);
res->ib_ctx = NULL;
}
if (dev_list) {
ibv_free_device_list(dev_list);
dev_list = NULL;
}
if (res->sock >= 0) {
if (close(res->sock)) {
LOG_ERROR("failed to close socket");
}
res->sock = -1;
}
}
return rc;
}
/******************************************************************************
* Function: modify_qp_to_init
*
* Input:
* qp: QP to transition
*
* Output: none
*
* Returns: 0 on success, ibv_modify_qp failure code on failure
*
* Description: Transition a QP from the RESET to INIT state
******************************************************************************/
static int modify_qp_to_init(struct ibv_qp *qp) {
struct ibv_qp_attr attr;
int flags;
int rc;
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_INIT;
attr.port_num = config.ib_port;
attr.pkey_index = 0;
attr.qp_access_flags =
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
flags = IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS;
rc = ibv_modify_qp(qp, &attr, flags);
if (rc) {
LOG_ERROR("failed to modify QP state to INIT");
}
return rc;
}
/******************************************************************************
* Function: modify_qp_to_rtr
*
* Input:
* qp: QP to transition
* remote_qpn: remote QP number
* dlid: destination LID
* dgid: destination GID (mandatory for RoCEE)
*
* Output: none
*
* Returns: 0 on success, ibv_modify_qp failure code on failure
*
* Description:
* Transition a QP from the INIT to RTR state, using the specified QP number
******************************************************************************/
static int modify_qp_to_rtr(struct ibv_qp *qp, uint32_t remote_qpn,
uint16_t dlid, uint8_t *dgid) {
struct ibv_qp_attr attr;
int flags;
int rc;
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTR;
attr.path_mtu = IBV_MTU_256;
attr.dest_qp_num = remote_qpn;
attr.rq_psn = 0;
attr.max_dest_rd_atomic = 1;
attr.min_rnr_timer = 0x12;
attr.ah_attr.is_global = 0;
attr.ah_attr.dlid = dlid;
attr.ah_attr.sl = 0;
attr.ah_attr.src_path_bits = 0;
attr.ah_attr.port_num = config.ib_port;
if (config.gid_idx >= 0) {
attr.ah_attr.is_global = 1;
attr.ah_attr.port_num = 1;
memcpy(&attr.ah_attr.grh.dgid, dgid, 16);
attr.ah_attr.grh.flow_label = 0;
attr.ah_attr.grh.hop_limit = 1;
attr.ah_attr.grh.sgid_index = config.gid_idx;
attr.ah_attr.grh.traffic_class = 0;
}
flags = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN |
IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER;
rc = ibv_modify_qp(qp, &attr, flags);
if (rc) {
LOG_ERROR("failed to modify QP state to RTR");
}
return rc;
}
/******************************************************************************
* Function: modify_qp_to_rts
*
* Input:
* qp: QP to transition
*
* Output: none
*
* Returns: 0 on success, ibv_modify_qp failure code on failure
*
* Description: Transition a QP from the RTR to RTS state
******************************************************************************/
static int modify_qp_to_rts(struct ibv_qp *qp) {
struct ibv_qp_attr attr;
int flags;
int rc;
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTS;
attr.timeout = 0x12;
attr.retry_cnt = 6;
attr.rnr_retry = 0;
attr.sq_psn = 0;
attr.max_rd_atomic = 1;
flags = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY |
IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC;
rc = ibv_modify_qp(qp, &attr, flags);
if (rc) {
LOG_ERROR("failed to modify QP state to RTS");
}
return rc;
}
/******************************************************************************
* Function: connect_qp
*
* Input:
* res: pointer to resources structure
*
* Output: none
*
* Returns: 0 on success, error code on failure
*
* Description:
* Connect the QP. Transition the server side to RTR, sender side to RTS
******************************************************************************/
static int connect_qp(struct resources *res) {
struct cm_con_data_t local_con_data;
struct cm_con_data_t remote_con_data;
struct cm_con_data_t tmp_con_data;
int rc = 0;
char temp_char;
union ibv_gid my_gid;
if (config.gid_idx >= 0) {
rc = ibv_query_gid(res->ib_ctx, config.ib_port, config.gid_idx, &my_gid);
if (rc) {
LOG_ERROR("could not get gid for port %d, index %d",
config.ib_port, config.gid_idx);
return rc;
}
} else {
memset(&my_gid, 0, sizeof my_gid);
}
/* exchange using TCP sockets info required to connect QPs */
local_con_data.addr = htonll((uintptr_t)res->buf);
local_con_data.rkey = htonl(res->mr->rkey);
local_con_data.qp_num = htonl(res->qp->qp_num);
local_con_data.lid = htons(res->port_attr.lid);
memcpy(local_con_data.gid, &my_gid, 16); // RoCE/RoCEv2总是需要GID
LOG_INFO("Local LID = 0x%x", res->port_attr.lid);
if (config.gid_idx >= 0) {
uint8_t *p = local_con_data.gid;
LOG_INFO("Local GID = "
"%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:"
"%02x%02x",
p[0], p[1], p[2], p[3], p[4], p[5], p[6], p[7], p[8], p[9], p[10],
p[11], p[12], p[13], p[14], p[15]);
}
if (sock_sync_data(res->sock, sizeof(struct cm_con_data_t),
(char *)&local_con_data, (char *)&tmp_con_data) < 0) {
LOG_ERROR("failed to exchange connection data between sides");
rc = 1;
goto connect_qp_exit;
}
remote_con_data.addr = ntohll(tmp_con_data.addr);
remote_con_data.rkey = ntohl(tmp_con_data.rkey);
remote_con_data.qp_num = ntohl(tmp_con_data.qp_num);
remote_con_data.lid = ntohs(tmp_con_data.lid);
memcpy(remote_con_data.gid, tmp_con_data.gid, 16);
/* save the remote side attributes, we will need it for the post SR */
res->remote_props = remote_con_data;
LOG_INFO("Remote address = 0x%" PRIx64 "", remote_con_data.addr);
LOG_INFO("Remote rkey = 0x%x", remote_con_data.rkey);
LOG_INFO("Remote QP number = 0x%x", remote_con_data.qp_num);
LOG_INFO("Remote LID = 0x%x", remote_con_data.lid);
if (config.gid_idx >= 0) {
uint8_t *p = remote_con_data.gid;
LOG_INFO("Remote GID = "
"%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:"
"%02x%02x",
p[0], p[1], p[2], p[3], p[4], p[5], p[6], p[7], p[8], p[9], p[10],
p[11], p[12], p[13], p[14], p[15]);
}
/* modify the QP to init */
rc = modify_qp_to_init(res->qp);
if (rc) {
LOG_ERROR("change QP state to INIT failed");
goto connect_qp_exit;
}
/* let the client post RR to be prepared for incoming messages */
//if (config.server_name) {
rc = post_receive(res);
if (rc) {
LOG_ERROR("failed to post RR");
goto connect_qp_exit;
}
//}
/* modify the QP to RTR */
rc = modify_qp_to_rtr(res->qp, remote_con_data.qp_num, remote_con_data.lid,
remote_con_data.gid);
if (rc) {
LOG_ERROR("failed to modify QP state to RTR");
goto connect_qp_exit;
}
/* modify the QP to RTS */
rc = modify_qp_to_rts(res->qp);
if (rc) {
LOG_ERROR("failed to modify QP state to RTS");
goto connect_qp_exit;
}
LOG_INFO("QP state was change to RTS");
/* sync to make sure that both sides are in states that they can connect to
* prevent packet loose */
if (sock_sync_data(res->sock, 1, "Q",
&temp_char)) /* just send a dummy char back and forth */
{
LOG_ERROR("sync error after QPs are were moved to RTS");
rc = 1;
}
connect_qp_exit:
return rc;
}
/******************************************************************************
* Function: resources_destroy
*
* Input:
* res: pointer to resources structure
*
* Output: none
*
* Returns: 0 on success, 1 on failure
*
* Description: Cleanup and deallocate all resources used
******************************************************************************/
static int resources_destroy(struct resources *res) {
int rc = 0;
if (res->qp) {
if (ibv_destroy_qp(res->qp)) {
LOG_ERROR("failed to destroy QP");
rc = 1;
}
}
if (res->mr) {
if (ibv_dereg_mr(res->mr)) {
LOG_ERROR("failed to deregister MR");
rc = 1;
}
}
if (res->buf) {
if (!config.server_name) {
free(res->buf);
} else {
HIP_CALL(hipFree(res->buf));
}
}
if (res->cq) {
if (ibv_destroy_cq(res->cq)) {
LOG_ERROR("failed to destroy CQ");
rc = 1;
}
}
if (res->pd) {
if (ibv_dealloc_pd(res->pd)) {
LOG_ERROR("failed to deallocate PD");
rc = 1;
}
}
if (res->ib_ctx) {
if (ibv_close_device(res->ib_ctx)) {
LOG_ERROR("failed to close device context");
rc = 1;
}
}
if (res->sock >= 0) {
if (close(res->sock)) {
LOG_ERROR("failed to close socket");
rc = 1;
}
}
return rc;
}
/******************************************************************************
* Function: print_config
*
* Input: none
*
* Output: none
*
* Returns: none
*
* Description: Print out config information
******************************************************************************/
static void print_config(void) {
LOG_INFO(" ------------------------------------------------");
LOG_INFO(" Device name : \"%s\"", config.dev_name);
LOG_INFO(" IB port : %u", config.ib_port);
if (config.server_name) {
LOG_INFO(" IP : %s", config.server_name);
}
LOG_INFO(" TCP port : %u", config.tcp_port);
if (config.gid_idx >= 0) {
LOG_INFO(" GID index : %u", config.gid_idx);
}
LOG_INFO(" ------------------------------------------------");
}
/******************************************************************************
* Function: usage
*
* Input:
* argv0: command line arguments
*
* Output: none
*
* Returns: none
*
* Description: print a description of command line syntax
******************************************************************************/
static void usage(const char *argv0) {
LOG_INFO("Usage:");
LOG_INFO(" %s start a server and wait for connection", argv0);
LOG_INFO(" %s <host> connect to server at <host>", argv0);
LOG_INFO("");
LOG_INFO("Options:");
LOG_INFO(" -p, --port <port> listen on/connect to port <port> "
"(default 18515)");
LOG_INFO(" -d, --ib-dev <dev> use IB device <dev> (default first "
"device found)");
LOG_INFO(" -i, --ib-port <port> use port <port> of IB device (default 1)");
LOG_INFO(" -g, --gid_idx <git index> gid index to be used in GRH "
"(default not used)");
}
/******************************************************************************
* Function: main
*
* Input:
* argc: number of items in argv
* argv: command line parameters
*
* Output: none
*
* Returns: 0 on success, 1 on failure
*
* Description: Main program code
******************************************************************************/
int main(int argc, char *argv[]) {
struct resources res;
int rc = 1;
/* parse the command line parameters */
while (1) {
int c;
/* Designated Initializer */
static struct option long_options[] = {
{.name = "port", .has_arg = 1, .val = 'p'},
{.name = "ib-dev", .has_arg = 1, .val = 'd'},
{.name = "ib-port", .has_arg = 1, .val = 'i'},
{.name = "gid-idx", .has_arg = 1, .val = 'g'},
{.name = NULL, .has_arg = 0, .val = '\0'}};
c = getopt_long(argc, argv, "p:d:i:g:", long_options, NULL);
if (c == -1) {
break;
}
switch (c) {
case 'p':
config.tcp_port = strtoul(optarg, NULL, 0);
break;
case 'd':
config.dev_name = strdup(optarg);
break;
case 'i':
config.ib_port = strtoul(optarg, NULL, 0);
if (config.ib_port < 0) {
usage(argv[0]);
return 1;
}
break;
case 'g':
config.gid_idx = strtoul(optarg, NULL, 0);
if (config.gid_idx < 0) {
usage(argv[0]);
return 1;
}
break;
default:
usage(argv[0]);
return 1;
}
}
/* parse the last parameter (if exists) as the server name */
/*
* server_name is null means this node is a server,
* otherwise this node is a client which need to connect to
* the specific server
*/
if (optind == argc - 1) {
config.server_name = argv[optind];
} else if (optind < argc) {
usage(argv[0]);
return 1;
}
/* print the used parameters for info*/
print_config();
/* init all of the resources, so cleanup will be easy */
resources_init(&res);
/* create resources before using them */
if (resources_create(&res)) {
LOG_ERROR("failed to create resources");
goto main_exit;
}
/* connect the QPs */
if (connect_qp(&res)) {
LOG_ERROR("failed to connect QPs");
goto main_exit;
}
/* let the client post the sr */
if (config.server_name) {
if (post_send(&res, IBV_WR_RDMA_WRITE_WITH_IMM)) {
LOG_ERROR("failed to post sr");
goto main_exit;
}
}
/* in both sides we expect to get a completion */
if (poll_completion(&res)) {
LOG_ERROR("poll completion failed");
goto main_exit;
}
/* after polling the completion we have the message in the client buffer too
*/
LOG_INFO("Message is: '%s'", res.buf);
rc = 0;
main_exit:
if (resources_destroy(&res)) {
LOG_ERROR("failed to destroy resources");
rc = 1;
}
if (config.dev_name) {
free((char *)config.dev_name);
}
LOG_INFO("************test result is %d**************", rc);
return rc;
}