okay thanks
for those who are interested :
cuda call to create the issue
#include <stdio.h>
#include <cuda.h>
#include <cuda_runtime_api.h>
#define cudaCheckErrors(msg) \
do { \
cudaError_t __err = cudaGetLastError(); \
if (__err != cudaSuccess) { \
fprintf(stderr, "Fatal error: %s (%s at %s:%d)\n", \
msg, cudaGetErrorString(__err), \
__FILE__, __LINE__); \
fprintf(stderr, "*** FAILED - ABORTING\n"); \
exit(1); \
} \
} while (0)
void select_cpu(int c)
{
cpu_set_t set;
CPU_ZERO(&set);
CPU_SET(c, &set);
sched_setaffinity(0, sizeof(set), &set);
unsigned cpu = sched_getcpu();
printf("thread using cpu %d\n", cpu);
}
int main()
{
select_cpu(5);
int devId = 1;
cudaSetDevice(devId);
cudaDeviceProp prop;
checkCuda( cudaGetDeviceProperties(&prop,devId) );
printf("RDMA and GPU Device: %s\n", prop.name);
int sharedCPU_GPU;
cudaHostRegister((void*)&sharedCPU_GPU, 4, cudaHostRegisterDefault);
return 0;
}
recv RoCEv2 usinsing UD queue pair and ping pong buffer
#define _GNU_SOURCE
//for CPU_SET
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <netdb.h>
#include <malloc.h>
#include <getopt.h>
#include <arpa/inet.h>
#include <time.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <infiniband/verbs.h>
#include "ud.h"
#define TEST 1000
char displayBuf[4096];
int main(int argc, char**argv)
{
int gpu = atoi(argv[1]);
int d = atoi(argv[2]);
int nWr = atoi(argv[3]);
int bufLen = atoi(argv[4]);
int nIter = atoi(argv[5]);
int memorySize = nWr*bufLen;
select_cpu(3);
struct timespec startTime, endTime;
struct ibv_device **dev_list;
struct ibv_device *ib_dev;
struct ibv_comp_channel *channel=NULL;
struct ibv_pd *pd;
struct ibv_mr *mr,*mr_gth;
struct ibv_cq *cq;
struct ibv_qp *qp;
struct ibv_qp_init_attr qp_init_attr;
struct ibv_context *context;
int page_size=sysconf(_SC_PAGESIZE);
int ib_port = 1 ;
void* addr = malloc(memorySize);
for (int i=0; i<nWr; i++)
*((char*)addr+i*bufLen) = 0;
//40 bytes extra bytes memory allocation for UD
char *buf_gth = memalign(page_size, 40);
//get devices list
dev_list = ibv_get_device_list(NULL);
if (!dev_list)
{
perror("Failed to get IB devices list");
return -1;
}
ib_dev = dev_list[d];
if (!ib_dev)
{
fprintf(stderr, "No IB devices found\n");
return -1;
}
//create context
context = ibv_open_device(ib_dev);
if (!context)
{
fprintf(stderr, "Couldn't get context for %s\n",
ibv_get_device_name(ib_dev));
return -1;
}
printf("using device %s\n",ibv_get_device_name(ib_dev));
//Protection Domain
pd = ibv_alloc_pd(context);
if (!pd)
{
fprintf(stderr, "Couldn't allocate PD\n");
return -1;
}
//Memory Region
mr = ibv_reg_mr(pd, addr, memorySize, IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE);
mr_gth = ibv_reg_mr(pd, buf_gth, 40, IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE);
if (!mr)
{
fprintf(stderr, "Couldn't register MR\n");
return -1;
}
//Completion Queue
printf("ibv_create_cq\n");
cq = ibv_create_cq(context, nWr, NULL,NULL, 0);
if (!cq)
{
fprintf(stderr, "Couldn't create CQ\n");
return -1;
}
//Queue Pair
printf("creating QP\n");
memset(&qp_init_attr, 0, sizeof(struct ibv_qp_init_attr));
qp_init_attr.send_cq = cq;
qp_init_attr.recv_cq = cq;
qp_init_attr.cap.max_send_wr = 1;
qp_init_attr.cap.max_send_sge = 1;
//qp_init_attr.cap.max_inline_data = 100;
//qp_init_attr.srq = NULL;
qp_init_attr.cap.max_recv_wr = 2*nWr;
qp_init_attr.cap.max_recv_sge = 2;
qp_init_attr.qp_type = IBV_QPT_UD;
qp = ibv_create_qp(pd,&qp_init_attr);
if (!qp)
{
fprintf(stderr, "---------Error, ibv_create_qp() failed\n");
return -1;
}
printf("ibv_create_qp qnum %d\n",qp->qp_num);
struct ibv_qp_attr attr = {
.qp_state = IBV_QPS_INIT,
.pkey_index = 0,
.port_num = ib_port,
.qkey = 0xCAFECAFE
//.qp_access_flags = 0 //IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE
};
printf("ibv_modify_qp to INIT\n");
if (ibv_modify_qp(qp, &attr,
IBV_QP_STATE |
IBV_QP_PKEY_INDEX |
IBV_QP_PORT |
IBV_QP_QKEY //IBV_QP_ACCESS_FLAGS
))
{
fprintf(stderr, ">>>>>>>>>Failed to modify QP to INIT\n");
return -1;
}
//Modify QP to RTR
printf("ibv_modify_qp to RTR\n");
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTR;
if (ibv_modify_qp(qp, &attr, IBV_QP_STATE))
{
fprintf(stderr, "Failed to modify QP to RTR\n");
return -1;
}
//Post Recv Work Request
struct ibv_sge *sg = (struct ibv_sge*) memalign(page_size,2*nWr*sizeof(struct ibv_sge));
struct ibv_recv_wr *wr = (struct ibv_recv_wr*) memalign(page_size,nWr*sizeof(struct ibv_recv_wr)),*bad_wr;
struct ibv_wc *wc = (struct ibv_wc*) memalign(page_size,nWr*sizeof(struct ibv_wc));
memset(sg,0,2*nWr*sizeof(struct ibv_sge));
memset(wr,0,nWr*sizeof(struct ibv_recv_wr));
for (int i = 0 ; i < nWr ; i++)
{
wr[i].wr_id = i;
wr[i].next = (i == (nWr -1) || i == (nWr/2 -1)) ? NULL : &wr[i+1];
wr[i].sg_list = &(sg[2*i]);
wr[i].num_sge = 2;
sg[2*i].length = 40;
sg[2*i].lkey = mr_gth->lkey;
sg[2*i].addr = (uint64_t) buf_gth;
sg[2*i+1].length = bufLen;
sg[2*i+1].lkey = mr->lkey;
sg[2*i+1].addr = ((uint64_t) addr) + i * bufLen;
}
//post nWr/2 requests
if (ibv_post_recv(qp, wr, &bad_wr))
{
printf("error ibv_post_recv 1\n");
return -1;
}
clock_gettime(CLOCK_MONOTONIC, &startTime);
int psn = ntohl(wc[0].imm_data) % nWr;
//main loop, post (2*nIter-1)*nWr/2
//start with i=1 because 1 wr has alredy been sent
int bank = 0,p,n;
for (int i=1;i<2*nIter;i++)
{
if (ibv_post_recv(qp, bank ? &wr[0] : &wr[nWr/2], &bad_wr))
{
printf("error ibv_post_recv 2\n");
return -1;
}
p=0;
do
{
n = ibv_poll_cq(cq, nWr/2-p, wc);
p +=n;
} while (p < nWr/2);
//testing missing packet
int onum,inum;
onum = inum;
inum = ntohl(wc[n-1].imm_data);
if ((inum-onum) > (nWr/2))
printf("lost data %d\n",inum);
bank = !bank;
//display some data
if (((i-1)%TEST) ==0)
{
printf(((((char*)addr)+(0)*bufLen)));
printf(((((char*)addr)+(nWr/2-1)*bufLen)));
}
for (int ii = (bank ? 0 : nWr/2) ; ii < (bank ? nWr/2 : nWr) ; ii++)
* (char*) (((uint64_t) addr) + ii*bufLen) = 0;
}
//free removed for clarity
}
send application
destination ip addr hard coded
#define _GNU_SOURCE
//CPU_SET
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <netdb.h>
#include <malloc.h>
#include <getopt.h>
#include <arpa/inet.h>
#include <time.h>
#include <infiniband/verbs.h>
#include "../ud_post_recv/ud.h"
int main(int argc, char**argv)
{
struct timespec startTime, endTime;
int d = atoi(argv[1]);
int nWr = atoi(argv[2]);
int bufLen = atoi(argv[3]);
int nIter = atoi(argv[4]);
int remote_qpn = atoi(argv[5]);
struct ibv_device **dev_list;
struct ibv_device *ib_dev;
struct ibv_comp_channel *channel=NULL;
struct ibv_pd *pd;
struct ibv_mr *mr,*mr2,*mr3,*mr4;
struct ibv_cq *cq;
struct ibv_qp *qp;
struct ibv_qp_init_attr qp_init_attr;
struct ibv_context *context;
int page_size = sysconf(_SC_PAGESIZE);
int send_flags = IBV_SEND_SIGNALED;
int ib_port=1;
select_cpu(5);
printf("ud send v1.0\n");
//memory allocation
char *buf = malloc(bufLen*nWr);
if (!buf)
{
fprintf(stderr, "Couldn't allocate work buf.\n");
exit(0);
}
for (int i=0;i<nWr;i++)
{
sprintf(buf+i*bufLen,"--------------------init line number %04d---image num %d---",i,i / nWr) ;
}
//get device
dev_list = ibv_get_device_list(NULL);
if (!dev_list)
{
perror("Failed to get IB devices list");
return -1;
}
ib_dev = dev_list[d];
if (!ib_dev) {
fprintf(stderr, "No IB devices found\n");
return -1;
}
//create context
context = ibv_open_device(ib_dev);
if (!context) {
fprintf(stderr, "Couldn't get context for %s\n",
ibv_get_device_name(ib_dev));
return -1;
}
printf("using device %s\n",ibv_get_device_name(ib_dev));
struct ibv_port_attr port_info = {};
if (ibv_query_port(context, ib_port, &port_info))
{
fprintf(stderr, "Unable to query port info for port %d\n", ib_port);
return -1;
}
int mtu = 1 << (port_info.active_mtu + 7);
printf("MTU (%d)\n", mtu);
if (bufLen > mtu)
{
fprintf(stderr, "Requested size larger than port MTU (%d)\n", mtu);
return -1;
}
//Protection Domain
printf("ibv_alloc_pd\n");
pd = ibv_alloc_pd(context);
if (!pd) {
fprintf(stderr, "Couldn't allocate PD\n");
return -1;
}
//Memory Region
printf("ibv_reg_mr\n");
mr = ibv_reg_mr(pd, buf, bufLen*nWr , IBV_ACCESS_LOCAL_WRITE);
if (!mr)
{
fprintf(stderr, "Couldn't register MR\n");
return -1;
}
//Completion Queue
printf("ibv_create_cq %p %p %d\n",context,channel,nWr);
cq = ibv_create_cq(context, nWr, NULL,channel, 0);
if (!cq)
{
fprintf(stderr, "Couldn't create CQ\n");
return -1;
}
//Queue Pair
printf("creating QP\n");
memset(&qp_init_attr, 0, sizeof(struct ibv_qp_init_attr));
qp_init_attr.send_cq = cq;
qp_init_attr.recv_cq = cq;
qp_init_attr.cap.max_send_wr = nWr;
qp_init_attr.cap.max_send_sge = 1;
//~ qp_init_attr.cap.max_inline_data = 512;
//qp_init_attr.srq = NULL;
qp_init_attr.cap.max_recv_wr = 1;
qp_init_attr.cap.max_recv_sge = 1;
qp_init_attr.qp_type = IBV_QPT_UD;
printf("ibv_create_qp\n");
qp = ibv_create_qp(pd,&qp_init_attr);
if (!qp)
{
fprintf(stderr, "---------Error, ibv_create_qp() failed\n");
return -1;
}
struct ibv_qp_attr attr = {
.qp_state = IBV_QPS_INIT,
.pkey_index = 0,
.port_num = ib_port,
.qkey = 0xCAFECAFE
};
printf("ibv_modify_qp to INIT\n");
if (ibv_modify_qp(qp, &attr,
IBV_QP_STATE |
IBV_QP_PKEY_INDEX |
IBV_QP_PORT |
IBV_QP_QKEY
))
{
fprintf(stderr, ">>>>>>>>>Failed to modify QP to INIT\n");
return -1;
}
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTR;
if (ibv_modify_qp(qp, &attr, IBV_QP_STATE))
{
fprintf(stderr, "Failed to modify QP to RTR\n");
return -1;
}
//Modify QP to RTS
printf("ibv_modify_qp to RTS\n");
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTS;
attr.sq_psn = 1234;
if (ibv_modify_qp(qp, &attr, IBV_QP_STATE|IBV_QP_SQ_PSN))
{
fprintf(stderr, "Failed to modify QP to RTS\n");
return -1;
}
//Create ah
struct ibv_ah *ah;
struct ibv_ah_attr ah_attr;
memset(&ah_attr, 0, sizeof(ah_attr));
union ibv_gid gid;
if (ibv_query_gid(context, ib_port, 1, &gid))
{
fprintf(stderr, "ibv_query_gid\n");
return -1;
}
//hard coded ip addr
gid.raw[10] = 255;
gid.raw[11] = 255;
gid.raw[12] = 192;
gid.raw[13] = 168;
gid.raw[14] = 3;
gid.raw[15] = 14;
ah_attr.is_global = 1;
ah_attr.grh.dgid = gid;
ah_attr.grh.sgid_index = 0;
ah_attr.grh.hop_limit = 0xFF;
ah_attr.grh.traffic_class = 1;
ah_attr.dlid = 0;
ah_attr.sl = 0;
ah_attr.src_path_bits = 0;
ah_attr.port_num = ib_port;
ah = ibv_create_ah(pd, &ah_attr);
if (!ah)
{
fprintf(stderr, "Error, ibv_create_ah() failed\n");
return -1;
}
printf("ibv_post_send nWR %d to %d\n", nWr, remote_qpn);
struct ibv_sge* sg = (struct ibv_sge*) memalign(page_size, nWr*sizeof(struct ibv_sge));
struct ibv_send_wr* wr = (struct ibv_send_wr*) memalign(page_size, nWr*sizeof(struct ibv_send_wr)),*bad_wr;
memset(sg, 0, nWr*sizeof(struct ibv_sge));
memset(wr, 0, nWr*sizeof(struct ibv_send_wr));
for(int i=0; i<nWr; i++)
{
wr[i].wr_id = i;
wr[i].opcode = IBV_WR_SEND_WITH_IMM;
wr[i].send_flags = IBV_SEND_SIGNALED; //try UNSIGNALED qp_init_attr.sq_sig_all=0
wr[i].wr.ud.remote_qpn = remote_qpn;
wr[i].wr.ud.ah = ah;
wr[i].wr.ud.remote_qkey = 0xCAFECAFE;
wr[i].next = (i == (nWr -1) || i == (nWr/2 -1)) ? NULL : &wr[i+1];
wr[i].imm_data = htonl(i);
wr[i].sg_list = &sg[i];
wr[i].num_sge = 1;
sg[i].length = bufLen;
sg[i].lkey = mr->lkey;
sg[i].addr = (uint64_t) buf+i*bufLen;
}
struct ibv_wc wc[nWr];
int bank=0;
clock_gettime(CLOCK_MONOTONIC, &startTime);
if (ibv_post_send(qp, wr, &bad_wr))
{
fprintf(stderr, "Error, ibv_post_send() failed\n");
return -1;
}
for(int j=1;j<(2*nIter);j++)
{
if (ibv_post_send(qp, bank ? &wr[0]:&wr[nWr/2], &bad_wr))
{
fprintf(stderr, "Error, ibv_post_send() failed\n");
return -1;
}
int num_comp=0;
do
{
num_comp += ibv_poll_cq(cq, nWr/2-num_comp, wc);
} while (num_comp<nWr/2);
if (wc[0].status != IBV_WC_SUCCESS)
{
fprintf(stderr, "Failed status %s (%d) for wr_id %d\n",
ibv_wc_status_str(wc[0].status),wc[0].status, (int)wc[0].wr_id);
return 0;
}
bank = !bank;
//taint data buffer
for (int ii = (bank ? 0 : nWr/2) ; ii < (bank ? nWr/2 : nWr) ; ii++)
{
sprintf(buf+ii*bufLen,"Line number %04d of image %d\n", ii,(j+1)/2) ;
wr[ii].imm_data = htonl(ii+((int)(j+1)/2)*nWr);
}
if ((j%1000)==0)
{
clock_gettime(CLOCK_MONOTONIC, &endTime);
double dt_ms = (endTime.tv_nsec-startTime.tv_nsec)/1000000.0 + (endTime.tv_sec-startTime.tv_sec)*1000.0;
double gbps = (double) 1000*nWr/2*bufLen*8 / dt_ms * 1e3 / 1e9;
printf("BW \t\t%01f Gbits/s\r", gbps);
fflush(stdout);
clock_gettime(CLOCK_MONOTONIC, &startTime);
}
}
}