cufftDx synchronization behavior and cuda::pipeline

Hi,
I’m trying to accelerate my cuda kernel. in the algorithm, I need to perform fft and another mathematical operations on matrix rows.
I have read about cuda::pipeline and I want to make the data loads from global memory overlap with the fft operation.
My question is: what is the synchronization behavior of the method FFT.execute() implemented in the cufftDx library? Is this method have _syncthreads() in it?
In my understanding, if the FFT.execute() synchronize all thread in the block, I wont be able to use it in the cuda:pipeline in a way that let the FFT overlap the memory-loads, Is it true?
Thank you in advance

Are you wanting to execute multiple stages? For example, with two stages you load data, execute FFT, execute second mathematical operation and write output? The idea being you can load the second chunk of data when your are executing your first FFT?

Yes.
I want to execute the first fft (and other operations) while loading the second chunk of data, and then execute the second fft while loading the third chunk, and so on.

There are multiple __syncthreads(); inside FFT.execute(). I’m not sure how that will affect cuda::pipeline. We will need to investigate.

Appreciate it. Thank you

Would you be willing to code up your desired use case (i.e., cuFFTDx + cuda::pipeline) and send us a reproducer?

Sorry for the late respond:

#include "cufftdx.hpp"
#include <cuda_runtime.h>
#include <iostream>
#include <cooperative_groups/memcpy_async.h>
#include <cuda/pipeline>

using namespace cufftdx;
using namespace std;

#define NROW 4*48*8
#define NCOL 1024

#pragma nv_diag_suppress static_var_with_dynamic_init


template<class FFT>
__launch_bounds__( FFT::max_threads_per_block, 4) __global__
    void fft_kernel( complex<float> *global_in , int batch_sz) {

        using complex_type = typename FFT::value_type;
        const int stride = size_of<FFT>::value / FFT::elements_per_thread;

        extern __shared__ complex_type shared_mem[];

        auto grid = cooperative_groups::this_grid();
        auto block = cooperative_groups::this_thread_block();
        constexpr size_t stages_count = 2;
        size_t shared_offset[stages_count] = { 0, NCOL };
        __shared__ cuda::pipeline_shared_state<cuda::thread_scope::thread_scope_block,stages_count> shared_state;
        auto pipeline = cuda::make_pipeline(block, &shared_state);

        auto block_batch = [&](size_t batch) -> int {
            return (blockIdx.x + gridDim.x*batch)*NCOL;
        };

        // Kickstart pipe:
        pipeline.producer_acquire();
        cuda::memcpy_async(block, shared_mem + shared_offset[0], global_in +
            block_batch(0), sizeof(complex_type)*NCOL, pipeline);
        pipeline.producer_commit();

        for (size_t batch = 1; batch < batch_sz; ++batch) {
            // Stage indices for the compute and copy stages:
            size_t compute_stage_idx = (batch - 1) % 2;
            size_t copy_stage_idx = batch % 2;
            size_t global_idx = block_batch(batch);
            // Collectively acquire the pipeline head stage from all producer threads:
            pipeline.producer_acquire();
            // Submit async copies to the pipeline's head stage to be
            // computed in the next loop iteration
            cuda::memcpy_async(block, shared_mem + shared_offset[copy_stage_idx], global_in
            + global_idx, sizeof(complex_type)*NCOL, pipeline);
            // Collectively commit (advance) the pipeline's head stage
            pipeline.producer_commit();
            // Collectively wait for the operations commited to the
            // previous `compute` stage to complete:
            pipeline.consumer_wait();
            // Execute FFT
            FFT( ).execute(shared_mem + shared_offset[compute_stage_idx]);
            // Save results
            for (int i = 0; i < FFT::elements_per_thread; ++i){
                global_in[threadIdx.x + block_batch(batch -1) + i * stride].x = shared_mem[ shared_offset[compute_stage_idx] + threadIdx.x + i * stride].x;
                global_in[threadIdx.x + block_batch(batch -1) + i * stride].y = shared_mem[ shared_offset[compute_stage_idx] + threadIdx.x + i * stride].y;
            }
            // Collectively release the stage resources
            pipeline.consumer_release();
        }
        // last chunk
        pipeline.consumer_wait();
        FFT( ).execute(shared_mem + shared_offset[(batch_sz -1) % 2]);
        // Save results
        for (int i = 0; i < FFT::elements_per_thread; ++i){
            global_in[threadIdx.x + block_batch(batch_sz-1) + i * stride].x = shared_mem[ shared_offset[(batch_sz -1) % 2] + threadIdx.x + i * stride].x;
            global_in[threadIdx.x + block_batch(batch_sz-1) + i * stride].y = shared_mem[ shared_offset[(batch_sz -1) % 2] + threadIdx.x + i * stride].y;
        }
        pipeline.consumer_release();

    }



//*****************************************************************************************



void cufftdx_on_rows( complex<float> *data, int Nrow, int Ncol ) {

    // FFT is defined, its: size, type, direction, precision. Block() operator
    // informs that FFT will be executed on block level. Shared memory is
    // required for co-operation between threads.

    using FFTComplete = decltype(Size<1024>() + Precision<float>() + Type<fft_type::c2c>()
        + Direction<fft_direction::forward>() + SM<750>());
  
    if(is_complete_fft<FFTComplete>::value == true) {
  
        // Retrieve suggested elements per block and FFTs per block and use them
        // to create a complete descriptor
        using FFT = decltype(FFTComplete()
                                    + FFTsPerBlock<1>()
                                    + Block()
                                    + ElementsPerThread<4>() 
                                                            );
        
        int numFFTinBlock = FFT::ffts_per_block;
        
        // Increase dynamic memory limit if required.
        // cudaFuncSetAttribute( fft_kernel<FFT>,
        //     cudaFuncAttributeMaxDynamicSharedMemorySize,
        //     FFT::shared_memory_size );

        int blocks_per_grid = 48*4;
        int batch_size = NROW/blocks_per_grid;

        cudaDeviceSynchronize();


        fft_kernel<FFT><<<blocks_per_grid, FFT::block_dim, 2*FFT::shared_memory_size>>>(data, batch_size);


        cudaDeviceSynchronize();
   
    }


}

//*****************************************************************************************

int main( int argc, char **argv ) {

    int Nrow = NROW;
    int Ncol = NCOL; 
    
    complex<float> *dataHostIn = new complex<float>[Nrow*Ncol];
    complex<float> *dataHostOut_Dx = new complex<float>[Nrow*Ncol];

    // Initialize matrix 1024x1024
    
    for(int i =0; i<Nrow*Ncol; i++){
        dataHostIn[i].x = 25*sin(17*i);
        dataHostIn[i].y = 64*sin(121*i);
    }

    cout << endl;


    complex<float> *dataDev_Dx;
    cudaMalloc(&dataDev_Dx ,Nrow*Ncol*sizeof(complex<float>) );

    cudaMemcpy(dataDev_Dx, dataHostIn, Nrow*Ncol*sizeof(complex<float>), cudaMemcpyHostToDevice);

    cufftdx_on_rows( dataDev_Dx, Nrow, Ncol );

    cudaMemcpy(dataHostOut_Dx, dataDev_Dx, Nrow*Ncol*sizeof(complex<float>), cudaMemcpyDeviceToHost);

    cudaDeviceSynchronize();
   
    cout << endl;
    

    delete[] dataHostIn;
    delete[] dataHostOut_Dx;

    cudaFree(dataDev_Dx);  

    return 0;
}

Hi. is there any news?
thank you,
Ori

Hi Ori,
Thank you for the reproducer. It is in the queue to investigate. There is no ETA at this time

Thank you