Overview
I need to do multiple iterations of segmented reduction (reduce by key) on some data. Because I need to do multiple subsequent segmented reductions, I would very much like to avoid writing it back to global memory and then reloading it.
As far as I know, all the library functions that offer segmented reduction (thrust, cub, mgpu) offer it only for data from global memory as a __host__
call. IE, they require a pointer to global memory. I want to do a segmented reduction inside a kernel without having to launch a new kernel so I can utilize the data already loaded into the registers for each thread.
Related Questions
- This Stack Overflow Q.A gives a decent iterative solution. I’d like to utilize a library so as not to re-invent the wheel as suggested here
- In this google groups question, its mentioned that a block-wide
BlockReduceByKey
will be added to CUB in the next release, but doesn’t seem to be available. - This forum question seems to be asking something very similar, but no response.
- The cuda samples are mentioned in a number of questions. They do address variable sized arrays, but only variable template sized arrays. Not each segment being a unique size.
- A cg::labeled_partition would work, except that it needs to run on Pascal hardware. Labeled partitions require compute capability 7.0+.
CUB Library
- DeviceSegmentedReduce can only be called from the host. I’d like to use data that’s already been loaded in from global memory.
- BlockAdjacentDifference has a deprecated function
FlagHeads
that I could probably utilize to make it work. The deprecated function suggests using theSubtractLeft
orSubtractRight
functions instead, but these don’t have any way to take in Head Flags, so I don’t see how it’s actually a replacement.
Purpose
I am attempting to generate point-and-figure or candlestick charts in CUDA. Given a long array of input data, I need to test for changes in direction (increasing vs. decreasing) but ignore small fluctuations in price until the cumulative change is greater than 4 from the previous high or low point.
I start by finding the adjacent difference along the data, and then successively remove the smaller fluctuations. This leaves only the increasing and decreasing trends where the change is >= 4.
Example
This is what the flow of data should look like:
adj_diff[] = [..., -1, 1, 1, -1, 1, 2, 0, -1, 1, 1, ...]
seg_keys[] = [..., 0, 1, 1, 0, 1, 1, 1, 0, 1, 1, ...]
reduced[ ] = [...,(-1)(2)(-1)(3)(-1)(2), ...]
This process is repeated untill all fluctuations less than 4 have been eliminated.
Code
Here is a sample of what I’ve written so far. I’ve included what I would like to be able to do, but it doesn’t exist in the CUB library.
Structs
struct CustomDifference {
template <typename DataType>
__device__ DataType operator()(DataType &lhs, DataType &rhs)
{ return lhs - rhs; }
};
struct Increasing {
int threshold;
__device__ Increasing(int threshold) : threshold(threshold) {}
__device__ bool operator()(const int &a, const int &b) {
return ((a < threshold) && (b >= threshold)) ||
((a >= threshold) && (b < threshold)) ;
}
};
struct Decreasing {
int threshold;
__device__ Decreasing(int threshold) : threshold(threshold) {}
__device__ bool operator()(const int &a, const int &b) {
return ((a > threshold) && (b <= threshold)) ||
((a <= threshold) && (b > threshold));
}
};
Segmented Reduction
__global__ void block_segmented_reduce_kernel(float* input_1, float* input_2, int* results, const uint length)
{
const uint tid = (blockDim.x * blockIdx.x) + threadIdx.x;
const size_t array_size = 4;
const size_t stride = BLOCK_SIZE * array_size;
__shared__ volatile int previous_stride_end;
if (tid == 0)
previous_stride_end = 0;
// --- CUB Templates
typedef cub::BlockLoad<float, BLOCK_SIZE, array_size, cub::BLOCK_LOAD_DIRECT> BlockLoad_F;
typedef cub::BlockLoad<int, BLOCK_SIZE, array_size, cub::BLOCK_LOAD_DIRECT> BlockLoad_I;
typedef cub::BlockStore<int, BLOCK_SIZE, 4, cub::BLOCK_STORE_DIRECT> BlockStore;
typedef cub::BlockAdjacentDifference<int, BLOCK_SIZE> BlockAdjacentDifference;
typedef cub::BlockDiscontinuity<int, BLOCK_SIZE> BlockDiscontinuity;
typedef cub::BlockReduce<int, BLOCK_SIZE> BlockReduce;
// --- Shared Memory
__shared__ union TempStorage
{
typename BlockLoad_F::TempStorage load_f;
typename BlockLoad_I::TempStorage load_i;
typename BlockStore::TempStorage store;
typename BlockAdjacentDifference::TempStorage adjDiff;
typename BlockDiscontinuity::TempStorage heads;
typename BlockReduce::TempStorage reduce;
} temp_storage;
__shared__ int sm_unreduced_values[stride];
__shared__ int sm_reduced_values[stride];
// --- Thread Data
float thread_input_1[array_size];
float thread_input_2[array_size];
int ratio[array_size];
int adj_diff[array_size];
int flag_heads[array_size];
int valid_items = length;
int write_index = 0;
// --- Block-Stride Loop
for (uint index=0; index < length; index += stride)
{
float* p_input_1 = input_1 + index;
float* p_input_2 = input_2 + index;
// --- Block Load [4]
BlockLoad_F(temp_storage.load_f).Load(p_input_1, thread_input_1, valid_items, -1);
BlockLoad_F(temp_storage.load_f).Load(p_input_2, thread_input_2, valid_items, -1);
__syncthreads();
// --- Ratio
for (int i=0; i<array_size; i++)
ratio[i] = static_cast<int>(thread_input_1[i] / thread_input_2[i]);
// --- Adjacent Difference
BlockAdjacentDifference(temp_storage.adjDiff).SubtractLeft(ratio, adj_diff, CustomDifference(), previous_stride_end);
BlockStore(temp_storage.store).Store(sm_unreduced_values, adj_diff, valid_items);
/* ---------------------------------------------------------------
* This is what I would like to do, if segmented block functions are possible.
* --------------------------------------------------------------- */
int unreduced[4];
int count_unreduced = stride;
int num_segments;
for (int i=0; i<5; i++)
{
// ------------ Increasing ------------- //
// --- Load shared memory into local threads
BlockLoad_I(temp_storage.load_i).Load(sm_unreduced_values, unreduced, count_unreduced);
// --- Generate Head Flags
BlockDiscontinuity(temp_storage.heads).FlagHeads(flag_heads, unreduced, Increasing(i));
num_segments = BlockReduce(temp_storage.reduce).Sum(flag_heads);
// --- Segmented Reduction
// Reduce data loaded from shared memory by segment, and write reduced sums back to shared memory
BlockReduce(temp_storage.reduce).Reduce(unreduced, cub::Sum(), flag_heads, sm_reduced_values, num_segments);
// ------------ Decreasing ------------- //
// --- Load shared memory into local threads
BlockLoad_I(temp_storage.load_i).Load(sm_reduced_values, unreduced, num_segments);
// --- Generate head flags
BlockDiscontinuity(temp_storage.heads).FlagHeads(flag_heads, unreduced, Decreasing(-1 * i));
num_segments = BlockReduce(temp_storage.reduce).Sum(flag_heads);
// --- Segmented Reduction
// Reduce data loaded from shared memory by segment, and write reduced sums back to shared memory
BlockReduce(temp_storage.reduce).Reduce(unreduced, cub::Sum(), flag_heads, sm_unreduced_values, num_segments);
}
// --- Save last position for future iterations
if (tid == (BLOCK_SIZE - 1))
previous_stride_end = ratio[array_size - 1];
__syncthreads();
// --- Write Results to global memory
BlockStore(temp_storage.store).Store(&results[write_index], adj_diff, valid_items);
__syncthreads();
// --- Update Pointers
write_index += stride;
valid_items -= stride;
}
}