Hi,
I created a demo to test the priority of CUDA streams. My goal is to verify that the CUDA stream scheduler always selects a task from the stream queue with a higher priority until there are no tasks left in the queue. Please see the test code below:
#include <cuda_runtime.h>
#include <cuda_runtime_api.h>
#include <stdio.h>
#include <time.h>
#include <thread>
#include <chrono>
#include <iostream>
#include <vector>
#include <unistd.h>
#include <mutex>
#include <condition_variable>
#include <functional>
static std::mutex stage_mutex;
static std::condition_variable stage_cond;
static bool g_begin = false;
static int vector_size = 1024 * 1024 * 2;
class Timer {
public:
template<typename F, typename... Args>
Timer(int total_time, int interval_time, F&& callback, Args&&... args) :
total_time(total_time), interval_time(interval_time),
callback(std::bind(std::forward<F>(callback), std::forward<Args>(args)...)) {
kernelCount = 0;
totalKernelTime = 0;
}
void start() {
std::chrono::milliseconds interval(interval_time);
std::chrono::milliseconds duration(total_time);
auto start_time = std::chrono::steady_clock::now();
auto current_time = start_time;
while (current_time - start_time <= duration) {
std::this_thread::sleep_for(std::chrono::milliseconds(interval_time));
auto start = std::chrono::steady_clock::now();
callback();
current_time = std::chrono::steady_clock::now();
totalKernelTime += std::chrono::duration_cast<std::chrono::milliseconds>(current_time - start).count();
kernelCount++;
}
}
float getMeanKernelTime() {
return (totalKernelTime / static_cast<float>(kernelCount));
}
int getKernelCount() {
return kernelCount;
}
private:
int total_time; //ms
int interval_time; //ms
std::function<void()> callback;
int kernelCount;
std::chrono::milliseconds::rep totalKernelTime;
};
void cudaCheck(cudaError_t ret, std::ostream& err = std::cerr) {
if (ret != cudaSuccess) {
printf("Cuda failure: %s", cudaGetErrorString(ret));
abort();
}
}
void sumArrays(float* a, float* b, float* res, const int size) {
for (int i = 0; i < size; i += 4) {
res[i] = a[i] + b[i];
res[i + 1] = a[i + 1] + b[i + 1];
res[i + 2] = a[i + 2] + b[i + 2];
res[i + 3] = a[i + 3] + b[i + 3];
}
}
void initialData(float* ip, int size) {
time_t t;
srand((unsigned)time(&t));
for (int i = 0; i < size; i++) {
ip[i] = (float)(rand() & 0xffff) / 1000.0f;
}
}
void checkResult(float* hostRef, float* gpuRef, const int N) {
double epsilon = 1.0E-8;
for (int i = 0; i < N; i++) {
if (abs(hostRef[i] - gpuRef[i]) > epsilon) {
printf("Results don\'t match!\n");
printf("%f(hostRef[%d] )!= %f(gpuRef[%d])\n", hostRef[i], i, gpuRef[i], i);
return;
}
}
printf("Check result success!\n");
}
__global__ void sumArraysLowP(const float* a, const float* b, float* c, int32_t n) {
for (int i = 0; i < n; i++) {
c[i] = a[i] + b[i];
}
}
__global__ void sumArraysHighP(const float* a, const float* b, float* c, int32_t n) {
for (int i = 0; i < n; i++) {
c[i] = a[i] + b[i];
}
}
__global__ void sumArraysGPU(const float* a, const float* b, float* c, int32_t n) {
int32_t idx = threadIdx.x + blockIdx.x * blockDim.x;
if (idx < n) {
for (int i = 0; i < 100000; i++) {
c[idx] = a[idx] + b[idx];
}
}
}
void launchKernelLowP(float *a_d, float *a_h, float *b_d, float *b_h, float *res_d, float *res_from_gpu_h, int nElem, int nByte, cudaStream_t stream) {
// cudaMemcpyAsync(a_d, a_h, nByte, cudaMemcpyHostToDevice, stream);
// cudaMemcpyAsync(b_d, b_h, nByte, cudaMemcpyHostToDevice, stream);
sumArraysLowP<<<1, 1, 0, stream>>>(a_d, b_d, res_d, nElem);
// cudaMemcpyAsync(res_from_gpu_h, res_d, nByte, cudaMemcpyDeviceToHost, stream);
// cudaCheck(cudaStreamSynchronize(stream));
}
void launchKernelHighP(float *a_d, float *a_h, float *b_d, float *b_h, float *res_d, float *res_from_gpu_h, int nElem, int nByte, cudaStream_t stream) {
// cudaMemcpyAsync(a_d, a_h, nByte, cudaMemcpyHostToDevice, stream);
// cudaMemcpyAsync(b_d, b_h, nByte, cudaMemcpyHostToDevice, stream);
sumArraysHighP<<<1, 1, 0, stream>>>(a_d, b_d, res_d, nElem);
// cudaMemcpyAsync(res_from_gpu_h, res_d, nByte, cudaMemcpyDeviceToHost, stream);
// cudaCheck(cudaStreamSynchronize(stream));
}
void worker(int threadIdx, cudaStream_t stream, int total_time, int interval) {
int priority;
cudaStreamGetPriority(stream, &priority);
printf("Stream priority of worker: %d\n", priority);
cudaEvent_t startEvent, stopEvent;
cudaCheck(cudaEventCreate(&startEvent));
cudaCheck(cudaEventCreate(&stopEvent));
int nElem = vector_size;
// printf("Vector size:%d\n", nElem);
int nByte = sizeof(float) * nElem;
float* a_h = (float*)malloc(nByte);
float* b_h = (float*)malloc(nByte);
float* res_h = (float*)malloc(nByte);
float* res_from_gpu_h = (float*)malloc(nByte);
memset(res_h, 0, nByte);
memset(res_from_gpu_h, 0, nByte);
float *a_d, *b_d, *res_d;
cudaMalloc((float**)&a_d, nByte);
cudaMalloc((float**)&b_d, nByte);
cudaMalloc((float**)&res_d, nByte);
initialData(a_h, nElem);
initialData(b_h, nElem);
// printf("Thread %d begin to wait for signal\n", threadIdx);
{
std::unique_lock<std::mutex> lock(stage_mutex);
while (!g_begin) {
std::cv_status cvRet = stage_cond.wait_for(lock, std::chrono::seconds(5));
if (cvRet == std::cv_status::timeout) {
break;
}
}
}
// auto start = std::chrono::high_resolution_clock::now();
cudaMemcpyAsync(a_d, a_h, nByte, cudaMemcpyHostToDevice, stream);
cudaMemcpyAsync(b_d, b_h, nByte, cudaMemcpyHostToDevice, stream);
Timer timer(total_time, interval, launchKernelLowP, a_d, a_h, b_d, b_h, res_d, res_from_gpu_h, nElem, nByte, stream);
timer.start();
// auto end = std::chrono::high_resolution_clock::now();
std::cout << "Time form thread " << threadIdx << " : " << "launched kernel count: " << timer.getKernelCount() << ", mean latency :" <<timer.getMeanKernelTime() << " ms" << std::endl;
cudaMemcpyAsync(res_from_gpu_h, res_d, nByte, cudaMemcpyDeviceToHost, stream);
cudaCheck(cudaStreamSynchronize(stream));
sumArrays(a_h, b_h, res_h, nElem);
checkResult(res_h, res_from_gpu_h, nElem);
// Destroy the CUDA events
cudaEventDestroy(startEvent);
cudaEventDestroy(stopEvent);
free(a_h);
free(b_h);
free(res_h);
free(res_from_gpu_h);
}
void higherPriorityWorker(cudaStream_t stream, int total_time, int interval) {
int priority;
cudaStreamGetPriority(stream, &priority);
printf("Stream priority of higherPriorityWorker: %d\n", priority);
cudaEvent_t startEvent, stopEvent;
cudaCheck(cudaEventCreate(&startEvent));
cudaCheck(cudaEventCreate(&stopEvent));
int nElem = vector_size;
// printf("Vector size:%d\n", nElem);
int nByte = sizeof(float) * nElem;
float* a_h = (float*)malloc(nByte);
float* b_h = (float*)malloc(nByte);
float* res_h = (float*)malloc(nByte);
float* res_from_gpu_h = (float*)malloc(nByte);
memset(res_h, 0, nByte);
memset(res_from_gpu_h, 0, nByte);
float *a_d, *b_d, *res_d;
cudaMalloc((float**)&a_d, nByte);
cudaMalloc((float**)&b_d, nByte);
cudaMalloc((float**)&res_d, nByte);
initialData(a_h, nElem);
initialData(b_h, nElem);
// printf("Thread %d begin to wait for signal\n", threadIdx);
{
std::unique_lock<std::mutex> lock(stage_mutex);
while (!g_begin) {
std::cv_status cvRet = stage_cond.wait_for(lock, std::chrono::seconds(5));
if (cvRet == std::cv_status::timeout) {
break;
}
}
}
// auto start = std::chrono::high_resolution_clock::now();
cudaMemcpyAsync(a_d, a_h, nByte, cudaMemcpyHostToDevice, stream);
cudaMemcpyAsync(b_d, b_h, nByte, cudaMemcpyHostToDevice, stream);
Timer timer(total_time, interval, launchKernelHighP, a_d, a_h, b_d, b_h, res_d, res_from_gpu_h, nElem, nByte, stream);
timer.start();
cudaMemcpyAsync(res_from_gpu_h, res_d, nByte, cudaMemcpyDeviceToHost, stream);
cudaCheck(cudaStreamSynchronize(stream));
// auto end = std::chrono::high_resolution_clock::now();
std::cout << "Time form higher priority thread" << " : " << "launched kernel count: " << timer.getKernelCount() << ", mean latency :" <<timer.getMeanKernelTime() << " ms" << std::endl;
// std::cout << "Time form higher priority thread : " << " " << timer.getMeanKernelTime() << " ms" << std::endl;
// Destroy the CUDA events
cudaEventDestroy(startEvent);
cudaEventDestroy(stopEvent);
free(a_h);
free(b_h);
free(res_h);
free(res_from_gpu_h);
}
int main(int argc, char** argv) {
printf("Usage: ./priority_test [thread_num] [priority] [total_time] [interval] [highPriorityInterval].\n");
// get leastest priority and highest priority
int g_lowestPriority;
int g_highestPriority;
cudaDeviceGetStreamPriorityRange(&g_lowestPriority, &g_highestPriority);
printf("lowest priority: %d, highest priority: %d\n", g_lowestPriority, g_highestPriority);
cudaSetDevice(0);
int thread_num = 1;
int priority = -1;
int total_time = 5000;
int interval = 100;
int highPriorityInterval = 100;
if (argc == 2) {
thread_num = atoi(argv[1]);
} else if (argc == 3) {
thread_num = atoi(argv[1]);
priority = atoi(argv[2]);
} else if (argc > 3){
thread_num = atoi(argv[1]);
priority = atoi(argv[2]);
total_time = atoi(argv[3]);
interval = atoi(argv[4]);
highPriorityInterval = atoi(argv[5]);
}
printf("Thread num : %d\n", thread_num);
printf("priority : %d\n", priority);
std::vector<cudaStream_t> streams;
for (int i = 0; i < thread_num; i++) {
cudaStream_t stream;
cudaStreamCreateWithPriority(&stream, cudaStreamNonBlocking, 0);
// cudaStreamCreate(&stream);
streams.push_back(stream);
}
// ceate thread
std::vector<std::thread> threads;
for (int i = 0; i < thread_num; i++) {
threads.push_back(std::thread(worker, i, streams[i], total_time, interval));
}
// create worker with higher priority
cudaStream_t higherPriorityStream;
cudaStreamCreateWithPriority(&higherPriorityStream, cudaStreamNonBlocking, priority);
// cudaStreamCreate(&higherPriorityStream);
threads.push_back(std::thread(higherPriorityWorker, higherPriorityStream, total_time, highPriorityInterval));
{
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::unique_lock<std::mutex> lock(stage_mutex);
g_begin = true;
printf("Notify all threads to start\n");
stage_cond.notify_all();
}
auto start = std::chrono::high_resolution_clock::now();
// join thread
for (int i = 0; i < thread_num + 1; i++) {
threads[i].join();
}
auto end = std::chrono::high_resolution_clock::now();
std::cout << "Total time: " << ((end - start).count() / 1e6) << " ms" << std::endl;
// destroy streams
cudaStreamDestroy(higherPriorityStream);
for (int i = 0; i < thread_num; i++) {
cudaStreamDestroy(streams[i]);
}
cudaDeviceReset();
return 0;
}
To build and run this code, please use the command line below:
nvcc -o priority_test priority_test.cu
./priority_test 2 -5 5000 500 10
The test launches two threads with low-priority CUDA streams and one thread with a higher-priority CUDA stream. A timer is set for a duration of 5000 ms. During this duration, the threads with lower priority launch a kernel every 500 ms, while the thread with higher priority launches a kernel every 10 ms.
In this test, the stream scheduler should prioritize taking tasks from the queue of higher-priority CUDA streams first, and only take tasks from the queue of lower-priority streams when there are no tasks left in the higher-priority queue. However, based on the profiling results from nsys, it seems that the low-priority stream was not preempted by the high-priority stream, and the kernel was still executed every 500ms.
Please see the profile results in the image below.
Where did I go wrong in the process? Or is it possible that the conclusion itself - that the CUDA stream scheduler always prioritizes the high-priority stream - is incorrect?