Sorry for my ignorance. It’s my first time looking for help here. Here’s the minimum reproducible code:
/*
export LD_LIBRARY_PATH=/usr/local/cuda/lib64:$PWD/nccl_2.17.1-1+cuda11.0_x86_64/lib/:$LD_LIBRARY_PATH
export LIBRARY_PATH=/usr/local/cuda/lib64:$PWD/nccl_2.17.1-1+cuda11.0_x86_64/lib/:$LIBRARY_PATH
export C_INCLUDE_PATH=/usr/local/cuda/include/:$PWD/nccl_2.17.1-1+cuda11.0_x86_64/include/:$C_INCLUDE_PATH
export CPLUS_INCLUDE_PATH=/usr/local/cuda/include/:$PWD/nccl_2.17.1-1+cuda11.0_x86_64/include/:$CPLUS_INCLUDE_PATH
g++ send_recv.cc -lpthread -lcudart -lnccl
*/
#include <unistd.h>
#include <cassert>
#include <chrono>
#include <functional>
#include <iostream>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include "cuda_runtime.h"
#include "nccl.h"
ncclUniqueId ncclId;
#define CUDACHECK(cmd) \
do { \
cudaError_t e = cmd; \
if (e != cudaSuccess) { \
printf("Failed: Cuda error %s:%d '%s'\n", __FILE__, __LINE__, \
cudaGetErrorString(e)); \
assert(false); \
} \
} while (0)
#define NCCLCHECK(cmd) \
do { \
ncclResult_t r = cmd; \
if (r != ncclSuccess) { \
printf("Failed, NCCL error %s:%d '%s'\n", __FILE__, __LINE__, \
ncclGetErrorString(r)); \
assert(false); \
} \
} while (0)
#define ASYNC 1
const int device_count = 4;
void custom_recv(int dev_id, int device_count) {
cudaSetDevice(dev_id);
ncclComm_t comm;
#if NCCL_VERSION >= 21700
ncclConfig_t config = NCCL_CONFIG_INITIALIZER;
NCCLCHECK(ncclCommInitRankConfig(&comm, device_count, ncclId, dev_id, &config));
#else
NCCLCHECK(ncclCommInitRank(&comm, device_count, ncclId, dev_id));
#endif
cudaStream_t stream;
CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
void* buffer;
CUDACHECK(cudaMalloc(&buffer, sizeof(float)));
ncclGroupStart();
NCCLCHECK(ncclRecv(buffer, 1, ncclFloat32, 0, comm, stream));
ncclGroupEnd();
float host_buf;
#if ASYNC
CUDACHECK(cudaMemcpyAsync(&host_buf, buffer, sizeof(float), cudaMemcpyDeviceToHost, stream));
// void* buffer2;
// CUDACHECK(cudaMallocAsync(&buffer2, sizeof(float)*1024, stream));
// CUDACHECK(cudaStreamSynchronize(stream));
#else
cudaStream_t copy_stream;
CUDACHECK(cudaStreamCreateWithFlags(©_stream, cudaStreamNonBlocking));
CUDACHECK(cudaMemcpyAsync(&host_buf, buffer, sizeof(float), cudaMemcpyDeviceToHost, copy_stream));
CUDACHECK(cudaStreamSynchronize(stream));
#endif
assert(host_buf == 1);
end:
std::cout << "device " << dev_id << " recv done" << std::endl;
}
int main(int argc, char** argv) {
{
NCCLCHECK(ncclGetUniqueId(&ncclId));
// send thread, 0 -> 1 2 3
std::thread thr0([]() {
cudaSetDevice(0);
ncclComm_t comm;
#if NCCL_VERSION >= 21700
ncclConfig_t config = NCCL_CONFIG_INITIALIZER;
NCCLCHECK(ncclCommInitRankConfig(&comm, device_count, ncclId, 0, &config));
#else
NCCLCHECK(ncclCommInitRank(&comm, device_count, ncclId, 0));
#endif
cudaStream_t stream;
CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
void* buffer[device_count];
for (int i = 1; i < device_count; i++) {
CUDACHECK(cudaMalloc(&buffer[i], sizeof(float)));
}
float r = 1;
for (int i = 1; i < device_count; i++) {
CUDACHECK(cudaMemcpy(buffer[i], &r, sizeof(float), cudaMemcpyHostToDevice));
}
NCCLCHECK(ncclGroupStart());
for (int i = 1; i < device_count; i++) {
NCCLCHECK(ncclSend(buffer[i], 1, ncclFloat32, i, comm, stream));
}
NCCLCHECK(ncclGroupEnd());
CUDACHECK(cudaStreamSynchronize(stream));
std::cout << "device 0 send done" << std::endl;
});
// recv thread: i <- 0
std::vector<std::thread> threads;
for (int i = 1; i < device_count; i++) {
threads.push_back(std::thread(custom_recv, i, device_count));
}
thr0.join();
for (auto& thr : threads) {
thr.join();
}
}
}