D2H cudaMemcpyAsync Blocks Irrelevant Kernels

I understand that when cudaMemcpyAsync involves host, it will be a blocking call. However, what exactly does “blocking” mean? I was under the impression that it would block the calling CPU thread until the copy is done and that’s it. However I ran into a problem in a particular use case that goes to show that it will in fact block even irrelevant empty kernel launches in a separate thread, on a separate stream.

The behaviour could be reproduced in below standalone program, some of the constructs are borrowed from this other post

So I will have a cpu_block_stream, on which I launch a host function to block it to wait for a CPU semaphore to be signalled. Then I queue an event on it, let default stream wait for that event, and then perform a D2H memcpy on default stream. Now from my understanding, this memcpy will now block the main thread, unless a separate thread comes to signal the CPU semaphore (I understand this is a weird sequence, but this is what I ran into and partly involves the PyTorch ProcessGroupNCCL call flow).

Then in a separate thread function, if I only signal the CPU semaphore, it works as expected. However, if before that cpu_event->Signal() line, I launch an empty kernel on an irrelevant stream, that will get blocked(I suppose by the memcpy involving host) and causes a deadlock.

Is this the expected behaviour? Does the “blocking” mean that it will block the entire cuda world?

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

#include "cuda_runtime.h"
#include "errorcheck.h"

// cpu event construct
class Event {
 private:
  std::mutex mtx_condition_;
  std::condition_variable condition_;
  bool signalled = false;

 public:
  void Signal() {
    {
      std::lock_guard<decltype(mtx_condition_)> lock(mtx_condition_);
      signalled = true;
    }
    condition_.notify_all();
  }

  void Wait() {
    std::unique_lock<decltype(mtx_condition_)> lock(mtx_condition_);
    while (!signalled) {
      condition_.wait(lock);
    }
  }
};

// host function launched on cpu_block_stream that blocks it until cpu event signaled
void CUDART_CB block_op_host_fn(void* arg) {
  Event* evt = (Event*)arg;
  evt->Wait();
}

__global__ void NoOpKernel() {}

static void ThreadFn(cudaStream_t irrelevant_stream, Event* cpu_event) {
  std::cout << "Launching empty kernel on irrelevant_stream in ThreadFn" << std::endl;

  NoOpKernel<<<1, 1, 0, irrelevant_stream>>>(); // HANGS HERE, BLOCKED BY D2H MEM COPY

  std::cout << "Launched empty kernel on irrelevant_stream in ThreadFn" << std::endl;
  
  cpu_event->Signal(); // for unblocking cpu_block_stream, however code won't get here
}

int main() {
  // create minimal CPU & GPU buffer for the cudaMemcpyAsync later
  size_t numel = 1;
  float *h_input;
  float *input;
  h_input = (float*)malloc(numel * sizeof(float));
  CUDACHECK(cudaMalloc(&input, numel * sizeof(float)));

  cudaStream_t cpu_block_stream, irrelevant_stream;
  CUDACHECK(cudaStreamCreateWithFlags(&cpu_block_stream, cudaStreamNonBlocking));
  CUDACHECK(cudaStreamCreateWithFlags(&irrelevant_stream, cudaStreamNonBlocking));

  cudaEvent_t cpu_unblock_event;
  CUDACHECK(cudaEventCreate(&cpu_unblock_event));

  // launch the CPU function on cpu_block_stream to block it until cpu_event is signaled
  Event* cpu_event = new Event();
  CUDACHECK(cudaLaunchHostFunc(cpu_block_stream, block_op_host_fn, cpu_event));

  // record cpu_unblock_event, which will be triggered after the cpu_event gets signaled
  CUDACHECK(cudaEventRecord(cpu_unblock_event, cpu_block_stream));
  // make default stream wait for it
  CUDACHECK(cudaStreamWaitEvent(0, cpu_unblock_event));

  // launch separate thread that does two things:
  // 1. launch noop kernel on an irrelevant stream
  // 2. signal the cpu_event to unblock cpu_block_stream
  std::thread t = std::thread(ThreadFn, irrelevant_stream, cpu_event);  

  // issue async D2H memcpy on default stream
  // blocks the empty kernel in thread function and causes a deadlock
  std::cout << "issuing cudaMemcpyAsync\n" << std::endl;
  CUDACHECK(cudaMemcpyAsync(h_input, input, numel * sizeof(float), cudaMemcpyDeviceToHost, 0));

  // clean up
  CUDACHECK(cudaStreamSynchronize(0));
  t.join();
  delete cpu_event;
  free(h_input);
  CUDACHECK(cudaFree(input));
  CUDACHECK(cudaStreamDestroy(cpu_block_stream));
  CUDACHECK(cudaStreamDestroy(irrelevant_stream));
  CUDACHECK(cudaEventDestroy(cpu_unblock_event));
  return 0;
}

If you want sensible, useful stream-oriented behavior from cudaMemcpyAsync, any host pointers used in the call should refer to pinned memory, not this:

h_input = (float*)malloc(numel * sizeof(float));

This concept is covered in many places on the web, the CUDA documentation, as well as the CUDA concurrency section of this training series.

And I certainly wouldn’t launch anything into the default stream if I wanted it to not interfere with concurrency or have the possiblity to overlap with something else.

The unmodified default stream has these semantics:

  • Work issued into the default stream will not begin until all previously issued work to that device is complete.
  • Work issued to the device after a work item issued into the default stream will not begin until the work item issued into the default stream has completed.

Thanks for the reply. If I just swap out the default stream(0) in the program for some separately created copy_stream, the program will work as expected.

All the uses of the default stream in the test program mimics behaviour of PyTorch and ProcessGroupNCCL. When we do something like below, the async handle’s wait() call will queue a cudaStreamWaitEvent() on the default stream, to wait for the event recorded after collective launch. And then checking the tensor’s value will implicitly invoke a D2H cudaMemcpyAsync on default stream. I suppose they probably should not do this as unnecessary synchronizations would happen?

dist.init_process_group(backend='nccl')
...
work = dist.all_reduce(tensor, async_op=True)
work.wait() # cudaStreamWaitEvent(0, all_reduce_end_event)
print(f'success ? {torch.all(tensor == world_size)}') # implicit cudaMemcpyAsync on stream 0

I suppose they probably should not do this as unnecessary synchronizations would happen?

  • perhaps torch is modifying the default stream behavior.
  • perhaps torch is launching (non-default-stream) work using streams created with the non-blocking flag

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.