Time intervals and non-concurrent in multi streaming

Hi,
I tried to speed up my program with cuda streams.However, with the help of the nvvp, I found that there are time intervals between executions in the same cuda stream.And streams are not completely parallel, they are executed sequentially.Are these unavoidable or can be optimized?Here are screenshot from nvvp and my code.

#include <cuda_runtime_api.h>
#include "cublas_v2.h"
#include "cuda_runtime.h"
#include <iostream>

#define SMGR_N_STREAMS 8

#ifndef CUDA_CHECK
#define CUDA_CHECK(status)                                                     \
    if (status != cudaSuccess) {                                                 \
      std::cout << "Cuda failure! Error=" << cudaGetErrorString(status) << std::endl; \
    }
#endif

// cublas: various checks for different function calls.
#ifndef CUBLAS_CHECK
#define CUBLAS_CHECK(status)                                 \
    if (status != CUBLAS_STATUS_SUCCESS) {                     \
      std::cout << "Cublas failure! Error=" << status << std::endl; \
    }
#endif

__device__ inline float sigmoid(const float& x) {
  return 1.0 / (1.0 + exp(-x));
}

__global__ void ScatterMappingKernel(const int* gate_idx, const int num_expert, const int idx_num, int* mapping,
                                     int* acc_histogram) {
  int idx = threadIdx.x;
  extern __shared__ int his[];
  if (idx < num_expert + 1) his[idx] = 0;

  __syncthreads();

  for (int i = threadIdx.x; i < idx_num; i += blockDim.x) {
    // calc his
    /*if (gate_idx[i] < 0 || gate_idx[i] > num_expert) return;*/
    auto old = atomicAdd(&his[gate_idx[i] + 1], 1);
    mapping[i] = old;
  }

  __syncthreads();

  // acc his
  if (threadIdx.x == 0) {
    for (int i = 0; i < num_expert; i++) his[i + 1] += his[i];
  }
  __syncthreads();

  for (int i = threadIdx.x; i < idx_num; i += blockDim.x) {
  // calc his
    mapping[i] += his[gate_idx[i]];
  }

  if (idx < num_expert + 1) acc_histogram[idx] = his[idx];
}

int ComputeScatterMapping(const int* gate_idx, const int num_expert, const int idx_num, int* mapping,
                          int* acc_histogram, cudaStream_t stream) {
  int block_size = 0;
  if (idx_num < 1024)
    block_size = 256;
  else if (idx_num < 4096)
    block_size = 512;
  else
    block_size = 1024;

  ScatterMappingKernel<<<1, block_size, (num_expert + 1) * sizeof(int), stream>>>(gate_idx, num_expert, idx_num,
                                                          mapping, acc_histogram);
  return 0;
}

template <class T>
__global__ void ScatterMappingCopyKernel(const T* input, const int* mapping, const int dim, const int numel,
                                         T* output) {
  int idx = blockIdx.x * blockDim.x + threadIdx.x;
  if (idx >= numel) return;

  int s = idx / dim;
  int i = idx % dim;

  int mapping_idx = mapping[s];

  output[mapping_idx * dim + i] = input[idx];
}

template <class T>
int ComputeScatterMappingCopyTpl(const T* input, const int* mapping, const int S, const int dim, T* output,
                                 cudaStream_t stream) {
  auto numel = S * dim;

  int block_size = 256;
  int grid_size = (numel + block_size - 1) / block_size;

  ScatterMappingCopyKernel<T><<<grid_size, block_size, 0, stream>>>(input, mapping, dim, numel, output);

  return 0;
}

int ComputeScatterMappingCopy(const float* input, const int* mapping, const int S, const int dim, float* output,
                              cudaStream_t stream) {
  return ComputeScatterMappingCopyTpl(input, mapping, S, dim, output, stream);
}


template <typename T>
__global__ void BiasSiluKernel(const T* input, const T* bias, const int N, const int dim, T* output) {
  int idx = blockIdx.x * blockDim.x + threadIdx.x;
  if (idx < N) {
    int bias_idx = idx % dim;
    auto tmp = input[idx] + bias[bias_idx];
    output[idx] = tmp * sigmoid(tmp);
  }
}

template <typename T>
int ComputeBiasSiluTpl(const T* input, const T* bias, const int N, const int dim, T* output, cudaStream_t stream) {
  constexpr int block_size = 512;
  const int grid_size = (N + block_size - 1) / block_size;
  BiasSiluKernel<T><<<grid_size, block_size, 0, stream>>>(input, bias, N, dim, output);

  return 0;
}

int ComputeBiasSilu(const float* input, const float* bias, const int N, const int dim, float* output,
                    cudaStream_t stream) {
  return ComputeBiasSiluTpl<float>(input, bias, N, dim, output, stream);
}

template <typename T>
__global__ void BiasKernel(const T* input, const T* bias, const int N, const int dim, T* output) {
  int idx = blockIdx.x * blockDim.x + threadIdx.x;

  if (idx < N) {
    int bias_idx = idx % dim;
    output[idx] = input[idx] + bias[bias_idx];
  }
}

template <typename T>
int ComputeBiasTpl(const T* input, const T* bias, const int N, const int dim, T* output, cudaStream_t stream) {
  constexpr int block_size = 512;
  const int grid_size = (N + block_size - 1) / block_size;
  BiasKernel<T><<<grid_size, block_size, 0, stream>>>(input, bias, N, dim, output);

  return 0;
}

int ComputeBias(const float* input, const float* bias, const int N, const int dim, float* output, cudaStream_t stream) {
  return ComputeBiasTpl<float>(input, bias, N, dim, output, stream);
}

template <class T>
__global__ void GatherrMappingCopyKernel(const T* input, const int* mapping, const int dim, const int numel,
                                         T* output) {
  int idx = blockIdx.x * blockDim.x + threadIdx.x;
  if (idx >= numel) return;
  int s = idx / dim;
  int i = idx % dim;

  int mapping_idx = mapping[s];

  output[idx] = input[mapping_idx * dim + i];
}

template <class T>
int ComputeGatherMappingCopyTpl(const T* input, const int* mapping, const int S, const int dim, T* output,
                                cudaStream_t stream) {
  auto numel = S * dim;

  int block_size = 256;
  int grid_size = (numel + block_size - 1) / block_size;

  GatherrMappingCopyKernel<T><<<grid_size, block_size, 0, stream>>>(input, mapping, dim, numel, output);

  return 0;
}

int ComputeGatherrMappingCopy(const float* input, const int* mapping, const int S, const int dim, float* output,
                              cudaStream_t stream) {
  return ComputeGatherMappingCopyTpl(input, mapping, S, dim, output, stream);
}

cublasStatus_t cublasGemm(cublasHandle_t handle, cublasOperation_t transa, cublasOperation_t transb,
                          int m, int n, int k,
                          const float alpha, const float* A, const float* B, 
                          const float beta, float* C) {
  int lda = k;
  if (transa == CUBLAS_OP_T) lda = m;
  int ldb = n;
  if (transb == CUBLAS_OP_T) ldb = k;
  int ldc = n;

  auto status = cublasSgemm(handle, transb, transa, 
                            n, m, k, 
                            &alpha, B, ldb, A, lda,
                            &beta, C, ldc);
  return status;
}

int main() {
  int seq = 150;
  int dim = 512;
  int hidden_units = 512;
  int input_volume = seq * dim;
  int num_expert = 32;

  int* gate_idx = (int*)malloc(seq * sizeof(int));
  float* input = (float*)malloc(seq * dim * sizeof(float));
  float* output = (float*)malloc(seq * dim * sizeof(float));
  float* w1_weight = (float*)malloc(num_expert * hidden_units * dim * sizeof(float));
  float* w2_weight = (float*)malloc(num_expert * hidden_units * dim * sizeof(float));
  float* w1_bias = (float*)malloc(num_expert * dim * sizeof(float));
  float* w2_bias = (float*)malloc(num_expert * dim * sizeof(float));
  int* h_acc_his = (int*)malloc((num_expert + 1) * sizeof(int));

  int* d_gate_idx;
  float* d_input;
  float* d_output;
  float* w1_weight_ptr;
  float* w2_weight_ptr;
  float* w1_bias_ptr;
  float* w2_bias_ptr;

  int* mapping;
  int* his;
  int* acc_his;
  float* input_buffer;
  float* hidden_buffer;

  cudaMalloc((void**)&d_gate_idx, seq * sizeof(int));
  cudaMalloc((void**)&d_input, input_volume * sizeof(float));
  cudaMalloc((void**)&d_output, input_volume * sizeof(float));
  cudaMalloc((void**)&w1_weight_ptr, num_expert * dim * hidden_units * sizeof(float));
  cudaMalloc((void**)&w2_weight_ptr, num_expert * dim * hidden_units * sizeof(float));
  cudaMalloc((void**)&w1_bias_ptr, num_expert * dim * sizeof(float));
  cudaMalloc((void**)&w2_bias_ptr, num_expert * dim * sizeof(float));

  cudaMalloc((void**)&mapping, seq * sizeof(int));
  cudaMalloc((void**)&his, (num_expert + 1) * sizeof(int));
  cudaMalloc((void**)&acc_his, (num_expert + 1) * sizeof(int));
  cudaMalloc((void**)&input_buffer, input_volume * sizeof(float));
  cudaMalloc((void**)&hidden_buffer, seq * hidden_units * sizeof(float));

  for(int i = 0; i < seq; i++) {
    gate_idx[i] = rand() & 0x1f;
  }

  for(int i = 0; i < input_volume; i++) {
    input[i] = static_cast <float> (rand()) / static_cast <float> (RAND_MAX);
  }

  for(int i = 0; i < num_expert * dim * hidden_units; i++) {
    w1_weight[i] = static_cast <float> (rand()) / static_cast <float> (RAND_MAX);
    w2_weight[i] = static_cast <float> (rand()) / static_cast <float> (RAND_MAX);
  }

  for(int i = 0; i < num_expert * dim; i++) {
    w1_bias[i] = static_cast <float> (rand()) / static_cast <float> (RAND_MAX);
    w2_bias[i] = static_cast <float> (rand()) / static_cast <float> (RAND_MAX);
  }

  cudaMemcpy(d_gate_idx, gate_idx, seq * sizeof(int), cudaMemcpyHostToDevice);
  cudaMemcpy(d_input, input, input_volume * sizeof(float), cudaMemcpyHostToDevice);
  cudaMemcpy(w1_weight_ptr, w1_weight, num_expert * dim * hidden_units * sizeof(float), cudaMemcpyHostToDevice);
  cudaMemcpy(w2_weight_ptr, w2_weight, num_expert * dim * hidden_units * sizeof(float), cudaMemcpyHostToDevice);
  cudaMemcpy(w1_bias_ptr, w1_bias, num_expert * dim * sizeof(float), cudaMemcpyHostToDevice);
  cudaMemcpy(w2_bias_ptr, w2_bias, num_expert * dim * sizeof(float), cudaMemcpyHostToDevice);
  int status = -1;
  status = ComputeScatterMapping(d_gate_idx, num_expert, seq, mapping, acc_his, 0);
  if (status != 0) {
    std::cout << "compute_scatter_mapping error!" << std::endl;
    return status;
  }
  const size_t word_size = sizeof(float);
  status = ComputeScatterMappingCopy(d_input, mapping, seq, dim, input_buffer, 0);
  if (status != 0) {
    std::cout << "ComputeScatterMappingCopy error!" << std::endl;
    return status;
  }

  cudaMemcpyAsync(h_acc_his, acc_his, sizeof(int) * (num_expert + 1), cudaMemcpyDeviceToHost, 0);

  cudaStreamSynchronize(0);
  cublasOperation_t transa = CUBLAS_OP_N;
  cublasOperation_t transb = CUBLAS_OP_T;

  cublasHandle_t* handles_;
  cudaStream_t* streams_;
  streams_ = new cudaStream_t[SMGR_N_STREAMS];
  handles_ = new cublasHandle_t[SMGR_N_STREAMS];
  for (size_t i = 0; i < SMGR_N_STREAMS; ++i) {
    cudaStreamCreate(streams_ + i);
    cublasCreate(handles_ + i);
    cublasSetStream(handles_[i], streams_[i]);
  }
  printf("his acc is %d\n", h_acc_his[31]);
  for (int i = 0; i < num_expert; i++) {
    auto cur_stream = streams_[i % SMGR_N_STREAMS];
    auto handle = handles_[i % SMGR_N_STREAMS];
    int m = h_acc_his[i + 1] - h_acc_his[i];
    if (m == 0) continue;
    // float* input_buffer_ptr = input_buffer + h_acc_his[i] * idim;
    // float* hidden_buffer_ptr = hidden_buffer + h_acc_his[i] * hidden_units;
    auto input_buffer_ptr = input_buffer + h_acc_his[i] * dim;
    auto hidden_buffer_ptr = hidden_buffer + h_acc_his[i] * hidden_units;

    // Weights offset
    auto w_offset = i * dim * hidden_units;
    auto cur_w1_weight_ptr = w1_weight_ptr + w_offset;
    auto cur_w1_bias_ptr = w1_bias_ptr + i * hidden_units;
    auto cur_w2_weight_ptr = w2_weight_ptr + w_offset;
    auto cur_w2_bias_ptr = w2_bias_ptr + i * dim;

    // print_data(input_buffer_ptr, 10, "input_buffer_ptr");
    // w1 gemm, tmp => output
    CUBLAS_CHECK(cublasGemm(handle, transa, transb, m, hidden_units, dim, 1.0f, input_buffer_ptr, cur_w1_weight_ptr,
                            0.0f, hidden_buffer_ptr));
    // print_data(hidden_buffer_ptr, 10, "w1_weight");

    // w1 bias + activate, tmp2
    status = ComputeBiasSilu(hidden_buffer_ptr, cur_w1_bias_ptr, m * hidden_units, hidden_units, hidden_buffer_ptr,
                             cur_stream);
    if (status != 0) {
      std::cout << "ComputeBiasSilu error!" << std::endl;
      return status;
    }

    // print_data(hidden_buffer_ptr, 10, "silu");
    // w2 gemm tmp2 => tmp1
    cublasGemm(handle, transa, transb, m, dim, hidden_units, 1.0f, hidden_buffer_ptr, cur_w2_weight_ptr,
                            0.0f, input_buffer_ptr);
    // w2 bias tmp1
    status = ComputeBias(input_buffer_ptr, cur_w2_bias_ptr, m * dim, dim, input_buffer_ptr, cur_stream);
    if (status != 0) {
      std::cout << "ComputeBias error!" << std::endl;
      return status;
    }
    // print_data(input_buffer_ptr, 10, "w2");
    // cout << "=================" << endl;
  }
  for (int i = 0; i < SMGR_N_STREAMS; ++i) cudaStreamSynchronize(streams_[i]);

  status = ComputeGatherrMappingCopy(input_buffer, mapping, seq, dim, d_output, 0);
  if (status != 0) {
    std::cout << "ComputeGatherrMappingCopy error!" << std::endl;
    return status;
  }
  cudaMemcpy(output, d_output, input_volume * sizeof(float), cudaMemcpyDeviceToHost);
  for(int i = 0; i < 10; i++) {
    printf("%f ", output[i]);
  }
  // print_data(output, 10, "output");
  // cout << "=================" << endl;
  for (size_t i = 0; i < SMGR_N_STREAMS; ++i) {
    cudaStreamDestroy(streams_[i]);
    cublasDestroy(handles_[i]);
  }
  delete[] streams_;
  delete[] handles_;
  return status;
}

I agree that they are not “completely parallel”. However they are not purely sequential, either. You can witness overlap between (at least 2) streams in your picture. So you are getting concurrency - which is one of the benefits of using streams.

This sort of question/reasoning/logic comes up frequently, and I don’t understand it. I guess what you are suggesting is that the timeline of all 8 streams should be completely overlapped and all work issuance should be back-to-back - no gaps anywhere.

I’m not sure where that expectation comes from, its illogical to me. With simple extensions it implies that the machine has no limits of any kind and has infinite capacity.

Let’s look at 2 aspects of possible limits. In order to have all 8 streams fully overlapped and all work issuance back-to-back (i.e. no gaps, anywhere) you would probably be able to find instances of anywhere between 2 and 8 kernels that are running simultaneously (concurrently). You might also find instances of 2 or more cudaMemcpyAsync ops that are in the same direction and overlapped.

  1. cudaMemcpyAsync ops in the same direction (let’s say D->H) targetting a single GPU never overlap. That is a machine limit, and there is nothing you can do with streams or anything else to modify that behavior. They always serialize.

  2. kernel overlap (concurrency) is a function of machine limits also. If you launch enough threads in a kernel, so as to fully occupy the machine, it is not logical that you should expect another kernel to run concurrently - there is no room for it. Once you completely fill up a box, you cannot put more things into it - there is no room. The box has limits. A GPU has limits. One of those limits is the number of threads and blocks that it can be processing at any given instant in time (a concept related to occupancy).

I’ve noted that for some of your kernel launches, the grid size is data-dependent - I wasn’t able to quickly discover it by inspection. Likewise I note that you are using shared memory, which is another limit to occupancy and kernel concurrency.

I didn’t observe any obvious errors in your code, and furthermore you are getting some overlap/concurrency (which could only come about if you were using streams more-or-less correctly) so without spending a lot more time on your code and also knowing what GPU type you are running on (which you haven’t indicated, but is important), I wouldn’t be able to say anything else. My guess would be that you are running into machine limits that are preventing additional concurrency/overlap beyond what is already evident in your nvvp output.

After some additional checking there don’t appear to be any cudaMemcpyAsync operations in your main work issuance loop (I was looking at the colors in nvvp output but don’t have those memorized) so the primary area to focus for understanding would probably be limiters to kernel concurrency.

Also, you don’t appear to be using streams with your CUBLAS ops, so those are being issued into the NULL stream. That is going to introduce synchronization points in your timelines, which will be additional barriers to overlap/concurrency. I believe the cublas kernels may be the purple ones in your diagram, and those do not overlap with anything AFAICT, which is the behavior we expect for work issued into the NULL stream.

Issuing CUBLAS work into streams might be the first thing to look at.

1 Like

Thank you for your reply!

I tried to hugely reduce the blocks and threads in my kernel by reducing the data processed, however the situation didn’t improved.

My device is Tesla T4, and my environment is:
Cuda 10.2
Cudnn 8.0.4
Driver Version: 450.102.04
OS centos-7

I called cublasSetStream in my code and I thought it would work.