I’m trying to write a kernel whose threads iteratively process items in a work queue. My understanding is that I should be able to do this by using atomic operations to manipulate the work queue, and using grid synchronization via cooperative groups to ensure all threads are at the same iteration (I ensure the number of thread blocks doesn’t exceed the device capacity for the kernel). However, sometimes I observe that work items are skipped or processed multiple times during an iteration.
The following is a working example to show this. In this example, an array with the size of input_len
is created, which holds work items 0
to input_len - 1
. The processWorkItems
kernel processes these items for max_iter
iterations. Each work item can put itself and its previous and next work items in the work queue, but marked
array is used to ensure that during an iteration, each work item is added to the work queue at most once. What should happen in the end is that the sum of values in histogram
be equal to input_len * max_iter
, and no value in histogram
be greater than 1. But I observe that occasionally both of these criteria are violated in the output, which implies that I’m not getting atomic operations and/or proper synchronization. I would appreciate it if someone could point out the flaws in my reasoning and/or implementation. My OS is Ubuntu 18.04, CUDA version is 10.1, and I’ve run experiments on P100, V100, and RTX 2080 Ti GPUs, and observed similar behavior.
The command I use for compiling for RTX 2080 Ti:
nvcc -O3 -o atomicsync atomicsync.cu --gpu-architecture=compute_75 -rdc=true
Some inputs and outputs for test:
./atomicsync 50 1000 1000
Skipped 0.01% of items. 5 extra item processing.
./atomicsync 500 1000 1000
Skipped 0.00% of items. 6 extra item processing.
./atomicsync 5000 1000 1000
Skipped 0.00% of items. 14 extra item processing.
atomicsync.cu:
#include <stdio.h>
#include <cooperative_groups.h>
#define checkCudaErrors(val) check ( (val), #val, __FILE__, __LINE__ )
template< typename T >
void check(T result, char const *const func, const char *const file, int const line)
{
if (result)
{
fprintf(stderr, "CUDA error at %s:%d code=%d(%s) \"%s\" \n", file, line, static_cast<unsigned int>(result), cudaGetErrorString(result), func);
cudaDeviceReset();
exit(EXIT_FAILURE);
}
}
__device__ inline void addWorkItem(int input_len, int item, int item_adder, int iter, int *queue, int *queue_size, int *marked) {
int already_marked = atomicExch(&marked[item], 1);
if(already_marked == 0) {
int idx = atomicAdd(&queue_size[iter + 1], 1);
queue[(iter + 1) * input_len + idx] = item;
}
}
__global__ void processWorkItems(int input_len, int max_iter, int *histogram, int *queue, int *queue_size, int *marked) {
auto grid = cooperative_groups::this_grid();
const int items_per_block = (input_len + gridDim.x - 1) / gridDim.x;
for(int iter = 0; iter < max_iter; ++iter) {
while(true) {
// Grab work item to process
int idx = atomicSub(&queue_size[iter], 1);
--idx;
if(idx < 0) {
break;
}
int item = queue[iter * input_len + idx];
// Keep track of processed work items
++histogram[iter * input_len + item];
// Add previous, self, and next work items to work queue
if(item > 0) {
addWorkItem(input_len, item - 1, item, iter, queue, queue_size, marked);
}
addWorkItem(input_len, item, item, iter, queue, queue_size, marked);
if(item + 1 < input_len) {
addWorkItem(input_len, item + 1, item, iter, queue, queue_size, marked);
}
}
__threadfence_system();
grid.sync();
// Reset marked array for next iteration
for(int i = 0; i < items_per_block; ++i) {
if(blockIdx.x * items_per_block + i < input_len) {
marked[blockIdx.x * items_per_block + i] = 0;
}
}
__threadfence_system();
grid.sync();
}
}
int main(int argc, char* argv[])
{
int input_len = atoi(argv[1]);
int max_iter = atoi(argv[2]);
int num_blocks = atoi(argv[3]);
// A histogram to keep track of work items that have been processed in each iteration
int histogram_host[input_len * max_iter];
memset(histogram_host, 0, sizeof(int) * input_len * max_iter);
int *histogram_device;
checkCudaErrors(cudaMalloc(&histogram_device, sizeof(int) * input_len * max_iter));
checkCudaErrors(cudaMemcpy(histogram_device, histogram_host, sizeof(int) * input_len * max_iter, cudaMemcpyHostToDevice));
// Size of the work queue for each iteration
int queue_size_host[max_iter + 1];
queue_size_host[0] = input_len;
memset(&queue_size_host[1], 0, sizeof(int) * max_iter);
int *queue_size_device;
checkCudaErrors(cudaMalloc(&queue_size_device, sizeof(int) * (max_iter + 1)));
checkCudaErrors(cudaMemcpy(queue_size_device, queue_size_host, sizeof(int) * (max_iter + 1), cudaMemcpyHostToDevice));
// Work queue
int queue_host[input_len * (max_iter + 1)];
for(int i = 0; i < input_len; ++i) {
queue_host[i] = i;
}
memset(&queue_host[input_len], 0, sizeof(int) * input_len * max_iter);
int *queue_device;
checkCudaErrors(cudaMalloc(&queue_device, sizeof(int) * input_len * (max_iter + 1)));
checkCudaErrors(cudaMemcpy(queue_device, queue_host, sizeof(int) * input_len * (max_iter + 1), cudaMemcpyHostToDevice));
// An array used to keep track of work items already added to the work queue to
// avoid multiple additions of a work item in the same iteration
int marked_host[input_len];
memset(marked_host, 0, sizeof(int) * input_len);
int *marked_device;
checkCudaErrors(cudaMalloc(&marked_device, sizeof(int) * input_len));
checkCudaErrors(cudaMemcpy(marked_device, marked_host, sizeof(int) * input_len, cudaMemcpyHostToDevice));
const dim3 threads(1, 1, 1);
const dim3 blocks(num_blocks, 1, 1);
processWorkItems<<<blocks, threads>>>(input_len, max_iter, histogram_device, queue_device, queue_size_device, marked_device);
checkCudaErrors(cudaDeviceSynchronize());
checkCudaErrors(cudaMemcpy(histogram_host, histogram_device, sizeof(int) * input_len * max_iter, cudaMemcpyDeviceToHost));
int extra = 0;
double deficit = 0;
for(int i = 0; i < input_len; ++i) {
int cnt = 0;
for(int iter = 0; iter < max_iter; ++iter) {
if(histogram_host[iter * input_len + i] > 1) {
++extra;
}
cnt += histogram_host[iter * input_len + i];
}
deficit += max_iter - cnt;
}
printf("Skipped %.2f%% of items. %d extra item processing.\n", deficit / (input_len * max_iter) * 100, extra);
checkCudaErrors(cudaFree(histogram_device));
checkCudaErrors(cudaFree(queue_device));
checkCudaErrors(cudaFree(queue_size_device));
checkCudaErrors(cudaFree(marked_device));
return 0;
}