Processing Shared Work Queue Using CUDA Atomic Operations and Grid Synchronization

I’m trying to write a kernel whose threads iteratively process items in a work queue. My understanding is that I should be able to do this by using atomic operations to manipulate the work queue, and using grid synchronization via cooperative groups to ensure all threads are at the same iteration (I ensure the number of thread blocks doesn’t exceed the device capacity for the kernel). However, sometimes I observe that work items are skipped or processed multiple times during an iteration.

The following is a working example to show this. In this example, an array with the size of input_len is created, which holds work items 0 to input_len - 1. The processWorkItems kernel processes these items for max_iter iterations. Each work item can put itself and its previous and next work items in the work queue, but marked array is used to ensure that during an iteration, each work item is added to the work queue at most once. What should happen in the end is that the sum of values in histogram be equal to input_len * max_iter, and no value in histogram be greater than 1. But I observe that occasionally both of these criteria are violated in the output, which implies that I’m not getting atomic operations and/or proper synchronization. I would appreciate it if someone could point out the flaws in my reasoning and/or implementation. My OS is Ubuntu 18.04, CUDA version is 10.1, and I’ve run experiments on P100, V100, and RTX 2080 Ti GPUs, and observed similar behavior.

The command I use for compiling for RTX 2080 Ti:

nvcc -O3 -o atomicsync atomicsync.cu --gpu-architecture=compute_75 -rdc=true

Some inputs and outputs for test:

./atomicsync 50 1000 1000
Skipped 0.01% of items. 5 extra item processing.

./atomicsync 500 1000 1000
Skipped 0.00% of items. 6 extra item processing.

./atomicsync 5000 1000 1000
Skipped 0.00% of items. 14 extra item processing.

atomicsync.cu:

#include <stdio.h>
#include <cooperative_groups.h>

#define checkCudaErrors(val) check ( (val), #val, __FILE__, __LINE__ )
template< typename T >
void check(T result, char const *const func, const char *const file, int const line)
{
    if (result)
    {
        fprintf(stderr, "CUDA error at %s:%d code=%d(%s) \"%s\" \n", file, line, static_cast<unsigned int>(result), cudaGetErrorString(result), func);
        cudaDeviceReset();
        exit(EXIT_FAILURE);
    }
}

__device__ inline void addWorkItem(int input_len, int item, int item_adder, int iter, int *queue, int *queue_size, int *marked) {
	int already_marked = atomicExch(&marked[item], 1);
	if(already_marked == 0) {
		int idx = atomicAdd(&queue_size[iter + 1], 1);
		queue[(iter + 1) * input_len + idx] = item;
	}
}

__global__ void processWorkItems(int input_len, int max_iter, int *histogram, int *queue, int *queue_size, int *marked) {
	auto grid = cooperative_groups::this_grid();

	const int items_per_block = (input_len + gridDim.x - 1) / gridDim.x;

	for(int iter = 0; iter < max_iter; ++iter) {
		while(true) {
			// Grab work item to process
			int idx = atomicSub(&queue_size[iter], 1);
			--idx;
			if(idx < 0) {
				break;
			}
			int item = queue[iter * input_len + idx];

			// Keep track of processed work items
			 ++histogram[iter * input_len + item];

			// Add previous, self, and next work items to work queue
			if(item > 0) {
				addWorkItem(input_len, item - 1, item, iter, queue, queue_size, marked);
			}
			addWorkItem(input_len, item, item, iter, queue, queue_size, marked);
			if(item + 1 < input_len) {
				addWorkItem(input_len, item + 1, item, iter, queue, queue_size, marked);
			}
		}
		__threadfence_system();
		grid.sync();

		// Reset marked array for next iteration
		for(int i = 0; i < items_per_block; ++i) {
			if(blockIdx.x * items_per_block + i < input_len) {
				marked[blockIdx.x * items_per_block + i] = 0;
			}
		}
		__threadfence_system();
		grid.sync();
	}
}

int main(int argc, char* argv[])
{
	int input_len = atoi(argv[1]);
	int max_iter = atoi(argv[2]);
	int num_blocks = atoi(argv[3]);

	// A histogram to keep track of work items that have been processed in each iteration
	int histogram_host[input_len * max_iter];
	memset(histogram_host, 0, sizeof(int) * input_len * max_iter);
	int *histogram_device;
	checkCudaErrors(cudaMalloc(&histogram_device, sizeof(int) * input_len * max_iter));
	checkCudaErrors(cudaMemcpy(histogram_device, histogram_host, sizeof(int) * input_len * max_iter, cudaMemcpyHostToDevice));

	// Size of the work queue for each iteration
	int queue_size_host[max_iter + 1];
	queue_size_host[0] = input_len;
	memset(&queue_size_host[1], 0, sizeof(int) * max_iter);
	int *queue_size_device;
    checkCudaErrors(cudaMalloc(&queue_size_device, sizeof(int) * (max_iter + 1)));
	checkCudaErrors(cudaMemcpy(queue_size_device, queue_size_host, sizeof(int) * (max_iter + 1), cudaMemcpyHostToDevice));

	// Work queue
	int queue_host[input_len * (max_iter + 1)];
	for(int i = 0; i < input_len; ++i) {
		queue_host[i] = i;
	}
	memset(&queue_host[input_len], 0, sizeof(int) * input_len * max_iter);
	int *queue_device;
    checkCudaErrors(cudaMalloc(&queue_device, sizeof(int) * input_len * (max_iter + 1)));
	checkCudaErrors(cudaMemcpy(queue_device, queue_host, sizeof(int) * input_len * (max_iter + 1), cudaMemcpyHostToDevice));

	// An array used to keep track of work items already added to the work queue to
	// avoid multiple additions of a work item in the same iteration
	int marked_host[input_len];
	memset(marked_host, 0, sizeof(int) * input_len);
	int *marked_device;
    checkCudaErrors(cudaMalloc(&marked_device, sizeof(int) * input_len));
	checkCudaErrors(cudaMemcpy(marked_device, marked_host, sizeof(int) * input_len, cudaMemcpyHostToDevice));

	const dim3 threads(1, 1, 1);
	const dim3 blocks(num_blocks, 1, 1);

	processWorkItems<<<blocks, threads>>>(input_len, max_iter, histogram_device, queue_device, queue_size_device, marked_device);
	checkCudaErrors(cudaDeviceSynchronize());

	checkCudaErrors(cudaMemcpy(histogram_host, histogram_device, sizeof(int) * input_len * max_iter, cudaMemcpyDeviceToHost));

	int extra = 0;
	double deficit = 0;
	for(int i = 0; i < input_len; ++i) {
		int cnt = 0;
		for(int iter = 0; iter < max_iter; ++iter) {
			if(histogram_host[iter * input_len + i] > 1) {
				++extra;
			}
			cnt += histogram_host[iter * input_len + i];
		}
		deficit += max_iter - cnt;
	}
	printf("Skipped %.2f%% of items. %d extra item processing.\n", deficit / (input_len * max_iter) * 100, extra);

	checkCudaErrors(cudaFree(histogram_device));
	checkCudaErrors(cudaFree(queue_device));
	checkCudaErrors(cudaFree(queue_size_device));
	checkCudaErrors(cudaFree(marked_device));

	return 0;
}