Hi,
I tried to speed up my program with cuda streams.However, with the help of the nvvp, I found that there are time intervals between executions in the same cuda stream.And streams are not completely parallel, they are executed sequentially.Are these unavoidable or can be optimized?Here are screenshot from nvvp and my code.
#include <cuda_runtime_api.h>
#include "cublas_v2.h"
#include "cuda_runtime.h"
#include <iostream>
#define SMGR_N_STREAMS 8
#ifndef CUDA_CHECK
#define CUDA_CHECK(status) \
if (status != cudaSuccess) { \
std::cout << "Cuda failure! Error=" << cudaGetErrorString(status) << std::endl; \
}
#endif
// cublas: various checks for different function calls.
#ifndef CUBLAS_CHECK
#define CUBLAS_CHECK(status) \
if (status != CUBLAS_STATUS_SUCCESS) { \
std::cout << "Cublas failure! Error=" << status << std::endl; \
}
#endif
__device__ inline float sigmoid(const float& x) {
return 1.0 / (1.0 + exp(-x));
}
__global__ void ScatterMappingKernel(const int* gate_idx, const int num_expert, const int idx_num, int* mapping,
int* acc_histogram) {
int idx = threadIdx.x;
extern __shared__ int his[];
if (idx < num_expert + 1) his[idx] = 0;
__syncthreads();
for (int i = threadIdx.x; i < idx_num; i += blockDim.x) {
// calc his
/*if (gate_idx[i] < 0 || gate_idx[i] > num_expert) return;*/
auto old = atomicAdd(&his[gate_idx[i] + 1], 1);
mapping[i] = old;
}
__syncthreads();
// acc his
if (threadIdx.x == 0) {
for (int i = 0; i < num_expert; i++) his[i + 1] += his[i];
}
__syncthreads();
for (int i = threadIdx.x; i < idx_num; i += blockDim.x) {
// calc his
mapping[i] += his[gate_idx[i]];
}
if (idx < num_expert + 1) acc_histogram[idx] = his[idx];
}
int ComputeScatterMapping(const int* gate_idx, const int num_expert, const int idx_num, int* mapping,
int* acc_histogram, cudaStream_t stream) {
int block_size = 0;
if (idx_num < 1024)
block_size = 256;
else if (idx_num < 4096)
block_size = 512;
else
block_size = 1024;
ScatterMappingKernel<<<1, block_size, (num_expert + 1) * sizeof(int), stream>>>(gate_idx, num_expert, idx_num,
mapping, acc_histogram);
return 0;
}
template <class T>
__global__ void ScatterMappingCopyKernel(const T* input, const int* mapping, const int dim, const int numel,
T* output) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx >= numel) return;
int s = idx / dim;
int i = idx % dim;
int mapping_idx = mapping[s];
output[mapping_idx * dim + i] = input[idx];
}
template <class T>
int ComputeScatterMappingCopyTpl(const T* input, const int* mapping, const int S, const int dim, T* output,
cudaStream_t stream) {
auto numel = S * dim;
int block_size = 256;
int grid_size = (numel + block_size - 1) / block_size;
ScatterMappingCopyKernel<T><<<grid_size, block_size, 0, stream>>>(input, mapping, dim, numel, output);
return 0;
}
int ComputeScatterMappingCopy(const float* input, const int* mapping, const int S, const int dim, float* output,
cudaStream_t stream) {
return ComputeScatterMappingCopyTpl(input, mapping, S, dim, output, stream);
}
template <typename T>
__global__ void BiasSiluKernel(const T* input, const T* bias, const int N, const int dim, T* output) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx < N) {
int bias_idx = idx % dim;
auto tmp = input[idx] + bias[bias_idx];
output[idx] = tmp * sigmoid(tmp);
}
}
template <typename T>
int ComputeBiasSiluTpl(const T* input, const T* bias, const int N, const int dim, T* output, cudaStream_t stream) {
constexpr int block_size = 512;
const int grid_size = (N + block_size - 1) / block_size;
BiasSiluKernel<T><<<grid_size, block_size, 0, stream>>>(input, bias, N, dim, output);
return 0;
}
int ComputeBiasSilu(const float* input, const float* bias, const int N, const int dim, float* output,
cudaStream_t stream) {
return ComputeBiasSiluTpl<float>(input, bias, N, dim, output, stream);
}
template <typename T>
__global__ void BiasKernel(const T* input, const T* bias, const int N, const int dim, T* output) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx < N) {
int bias_idx = idx % dim;
output[idx] = input[idx] + bias[bias_idx];
}
}
template <typename T>
int ComputeBiasTpl(const T* input, const T* bias, const int N, const int dim, T* output, cudaStream_t stream) {
constexpr int block_size = 512;
const int grid_size = (N + block_size - 1) / block_size;
BiasKernel<T><<<grid_size, block_size, 0, stream>>>(input, bias, N, dim, output);
return 0;
}
int ComputeBias(const float* input, const float* bias, const int N, const int dim, float* output, cudaStream_t stream) {
return ComputeBiasTpl<float>(input, bias, N, dim, output, stream);
}
template <class T>
__global__ void GatherrMappingCopyKernel(const T* input, const int* mapping, const int dim, const int numel,
T* output) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx >= numel) return;
int s = idx / dim;
int i = idx % dim;
int mapping_idx = mapping[s];
output[idx] = input[mapping_idx * dim + i];
}
template <class T>
int ComputeGatherMappingCopyTpl(const T* input, const int* mapping, const int S, const int dim, T* output,
cudaStream_t stream) {
auto numel = S * dim;
int block_size = 256;
int grid_size = (numel + block_size - 1) / block_size;
GatherrMappingCopyKernel<T><<<grid_size, block_size, 0, stream>>>(input, mapping, dim, numel, output);
return 0;
}
int ComputeGatherrMappingCopy(const float* input, const int* mapping, const int S, const int dim, float* output,
cudaStream_t stream) {
return ComputeGatherMappingCopyTpl(input, mapping, S, dim, output, stream);
}
cublasStatus_t cublasGemm(cublasHandle_t handle, cublasOperation_t transa, cublasOperation_t transb,
int m, int n, int k,
const float alpha, const float* A, const float* B,
const float beta, float* C) {
int lda = k;
if (transa == CUBLAS_OP_T) lda = m;
int ldb = n;
if (transb == CUBLAS_OP_T) ldb = k;
int ldc = n;
auto status = cublasSgemm(handle, transb, transa,
n, m, k,
&alpha, B, ldb, A, lda,
&beta, C, ldc);
return status;
}
int main() {
int seq = 150;
int dim = 512;
int hidden_units = 512;
int input_volume = seq * dim;
int num_expert = 32;
int* gate_idx = (int*)malloc(seq * sizeof(int));
float* input = (float*)malloc(seq * dim * sizeof(float));
float* output = (float*)malloc(seq * dim * sizeof(float));
float* w1_weight = (float*)malloc(num_expert * hidden_units * dim * sizeof(float));
float* w2_weight = (float*)malloc(num_expert * hidden_units * dim * sizeof(float));
float* w1_bias = (float*)malloc(num_expert * dim * sizeof(float));
float* w2_bias = (float*)malloc(num_expert * dim * sizeof(float));
int* h_acc_his = (int*)malloc((num_expert + 1) * sizeof(int));
int* d_gate_idx;
float* d_input;
float* d_output;
float* w1_weight_ptr;
float* w2_weight_ptr;
float* w1_bias_ptr;
float* w2_bias_ptr;
int* mapping;
int* his;
int* acc_his;
float* input_buffer;
float* hidden_buffer;
cudaMalloc((void**)&d_gate_idx, seq * sizeof(int));
cudaMalloc((void**)&d_input, input_volume * sizeof(float));
cudaMalloc((void**)&d_output, input_volume * sizeof(float));
cudaMalloc((void**)&w1_weight_ptr, num_expert * dim * hidden_units * sizeof(float));
cudaMalloc((void**)&w2_weight_ptr, num_expert * dim * hidden_units * sizeof(float));
cudaMalloc((void**)&w1_bias_ptr, num_expert * dim * sizeof(float));
cudaMalloc((void**)&w2_bias_ptr, num_expert * dim * sizeof(float));
cudaMalloc((void**)&mapping, seq * sizeof(int));
cudaMalloc((void**)&his, (num_expert + 1) * sizeof(int));
cudaMalloc((void**)&acc_his, (num_expert + 1) * sizeof(int));
cudaMalloc((void**)&input_buffer, input_volume * sizeof(float));
cudaMalloc((void**)&hidden_buffer, seq * hidden_units * sizeof(float));
for(int i = 0; i < seq; i++) {
gate_idx[i] = rand() & 0x1f;
}
for(int i = 0; i < input_volume; i++) {
input[i] = static_cast <float> (rand()) / static_cast <float> (RAND_MAX);
}
for(int i = 0; i < num_expert * dim * hidden_units; i++) {
w1_weight[i] = static_cast <float> (rand()) / static_cast <float> (RAND_MAX);
w2_weight[i] = static_cast <float> (rand()) / static_cast <float> (RAND_MAX);
}
for(int i = 0; i < num_expert * dim; i++) {
w1_bias[i] = static_cast <float> (rand()) / static_cast <float> (RAND_MAX);
w2_bias[i] = static_cast <float> (rand()) / static_cast <float> (RAND_MAX);
}
cudaMemcpy(d_gate_idx, gate_idx, seq * sizeof(int), cudaMemcpyHostToDevice);
cudaMemcpy(d_input, input, input_volume * sizeof(float), cudaMemcpyHostToDevice);
cudaMemcpy(w1_weight_ptr, w1_weight, num_expert * dim * hidden_units * sizeof(float), cudaMemcpyHostToDevice);
cudaMemcpy(w2_weight_ptr, w2_weight, num_expert * dim * hidden_units * sizeof(float), cudaMemcpyHostToDevice);
cudaMemcpy(w1_bias_ptr, w1_bias, num_expert * dim * sizeof(float), cudaMemcpyHostToDevice);
cudaMemcpy(w2_bias_ptr, w2_bias, num_expert * dim * sizeof(float), cudaMemcpyHostToDevice);
int status = -1;
status = ComputeScatterMapping(d_gate_idx, num_expert, seq, mapping, acc_his, 0);
if (status != 0) {
std::cout << "compute_scatter_mapping error!" << std::endl;
return status;
}
const size_t word_size = sizeof(float);
status = ComputeScatterMappingCopy(d_input, mapping, seq, dim, input_buffer, 0);
if (status != 0) {
std::cout << "ComputeScatterMappingCopy error!" << std::endl;
return status;
}
cudaMemcpyAsync(h_acc_his, acc_his, sizeof(int) * (num_expert + 1), cudaMemcpyDeviceToHost, 0);
cudaStreamSynchronize(0);
cublasOperation_t transa = CUBLAS_OP_N;
cublasOperation_t transb = CUBLAS_OP_T;
cublasHandle_t* handles_;
cudaStream_t* streams_;
streams_ = new cudaStream_t[SMGR_N_STREAMS];
handles_ = new cublasHandle_t[SMGR_N_STREAMS];
for (size_t i = 0; i < SMGR_N_STREAMS; ++i) {
cudaStreamCreate(streams_ + i);
cublasCreate(handles_ + i);
cublasSetStream(handles_[i], streams_[i]);
}
printf("his acc is %d\n", h_acc_his[31]);
for (int i = 0; i < num_expert; i++) {
auto cur_stream = streams_[i % SMGR_N_STREAMS];
auto handle = handles_[i % SMGR_N_STREAMS];
int m = h_acc_his[i + 1] - h_acc_his[i];
if (m == 0) continue;
// float* input_buffer_ptr = input_buffer + h_acc_his[i] * idim;
// float* hidden_buffer_ptr = hidden_buffer + h_acc_his[i] * hidden_units;
auto input_buffer_ptr = input_buffer + h_acc_his[i] * dim;
auto hidden_buffer_ptr = hidden_buffer + h_acc_his[i] * hidden_units;
// Weights offset
auto w_offset = i * dim * hidden_units;
auto cur_w1_weight_ptr = w1_weight_ptr + w_offset;
auto cur_w1_bias_ptr = w1_bias_ptr + i * hidden_units;
auto cur_w2_weight_ptr = w2_weight_ptr + w_offset;
auto cur_w2_bias_ptr = w2_bias_ptr + i * dim;
// print_data(input_buffer_ptr, 10, "input_buffer_ptr");
// w1 gemm, tmp => output
CUBLAS_CHECK(cublasGemm(handle, transa, transb, m, hidden_units, dim, 1.0f, input_buffer_ptr, cur_w1_weight_ptr,
0.0f, hidden_buffer_ptr));
// print_data(hidden_buffer_ptr, 10, "w1_weight");
// w1 bias + activate, tmp2
status = ComputeBiasSilu(hidden_buffer_ptr, cur_w1_bias_ptr, m * hidden_units, hidden_units, hidden_buffer_ptr,
cur_stream);
if (status != 0) {
std::cout << "ComputeBiasSilu error!" << std::endl;
return status;
}
// print_data(hidden_buffer_ptr, 10, "silu");
// w2 gemm tmp2 => tmp1
cublasGemm(handle, transa, transb, m, dim, hidden_units, 1.0f, hidden_buffer_ptr, cur_w2_weight_ptr,
0.0f, input_buffer_ptr);
// w2 bias tmp1
status = ComputeBias(input_buffer_ptr, cur_w2_bias_ptr, m * dim, dim, input_buffer_ptr, cur_stream);
if (status != 0) {
std::cout << "ComputeBias error!" << std::endl;
return status;
}
// print_data(input_buffer_ptr, 10, "w2");
// cout << "=================" << endl;
}
for (int i = 0; i < SMGR_N_STREAMS; ++i) cudaStreamSynchronize(streams_[i]);
status = ComputeGatherrMappingCopy(input_buffer, mapping, seq, dim, d_output, 0);
if (status != 0) {
std::cout << "ComputeGatherrMappingCopy error!" << std::endl;
return status;
}
cudaMemcpy(output, d_output, input_volume * sizeof(float), cudaMemcpyDeviceToHost);
for(int i = 0; i < 10; i++) {
printf("%f ", output[i]);
}
// print_data(output, 10, "output");
// cout << "=================" << endl;
for (size_t i = 0; i < SMGR_N_STREAMS; ++i) {
cudaStreamDestroy(streams_[i]);
cublasDestroy(handles_[i]);
}
delete[] streams_;
delete[] handles_;
return status;
}