Is there a block equivalent to cub::DeviceSegmentedReduce

Overview

I need to do multiple iterations of segmented reduction (reduce by key) on some data. Because I need to do multiple subsequent segmented reductions, I would very much like to avoid writing it back to global memory and then reloading it.

As far as I know, all the library functions that offer segmented reduction (thrust, cub, mgpu) offer it only for data from global memory as a __host__ call. IE, they require a pointer to global memory. I want to do a segmented reduction inside a kernel without having to launch a new kernel so I can utilize the data already loaded into the registers for each thread.

Related Questions

  • This Stack Overflow Q.A gives a decent iterative solution. I’d like to utilize a library so as not to re-invent the wheel as suggested here
  • In this google groups question, its mentioned that a block-wide BlockReduceByKey will be added to CUB in the next release, but doesn’t seem to be available.
  • This forum question seems to be asking something very similar, but no response.
  • The cuda samples are mentioned in a number of questions. They do address variable sized arrays, but only variable template sized arrays. Not each segment being a unique size.
  • A cg::labeled_partition would work, except that it needs to run on Pascal hardware. Labeled partitions require compute capability 7.0+.

CUB Library

  • DeviceSegmentedReduce can only be called from the host. I’d like to use data that’s already been loaded in from global memory.
  • BlockAdjacentDifference has a deprecated function FlagHeads that I could probably utilize to make it work. The deprecated function suggests using the SubtractLeft or SubtractRight functions instead, but these don’t have any way to take in Head Flags, so I don’t see how it’s actually a replacement.

Purpose

I am attempting to generate point-and-figure or candlestick charts in CUDA. Given a long array of input data, I need to test for changes in direction (increasing vs. decreasing) but ignore small fluctuations in price until the cumulative change is greater than 4 from the previous high or low point.

I start by finding the adjacent difference along the data, and then successively remove the smaller fluctuations. This leaves only the increasing and decreasing trends where the change is >= 4.

Example

This is what the flow of data should look like:

adj_diff[] = [..., -1, 1, 1, -1, 1, 2, 0, -1, 1, 1, ...]
seg_keys[] = [...,  0, 1, 1,  0, 1, 1, 1,  0, 1, 1, ...]
reduced[ ] = [...,(-1)(2)(-1)(3)(-1)(2), ...]

This process is repeated untill all fluctuations less than 4 have been eliminated.

Code

Here is a sample of what I’ve written so far. I’ve included what I would like to be able to do, but it doesn’t exist in the CUB library.

Structs

struct CustomDifference {
    template <typename DataType>
    __device__ DataType operator()(DataType &lhs, DataType &rhs) 
        { return lhs - rhs; }
};

struct Increasing {
    int threshold;
    __device__ Increasing(int threshold) : threshold(threshold) {}
    __device__ bool operator()(const int &a, const int &b) {
        return ((a < threshold) && (b >= threshold)) ||
               ((a >= threshold) && (b < threshold)) ;
    }
};

struct Decreasing {
    int threshold;

    __device__ Decreasing(int threshold) : threshold(threshold) {}
    __device__ bool operator()(const int &a, const int &b) {
        return ((a > threshold) && (b <= threshold)) ||
               ((a <= threshold) && (b > threshold));
    }
};

Segmented Reduction

__global__ void block_segmented_reduce_kernel(float* input_1, float* input_2, int* results, const uint length)
{
    const uint tid = (blockDim.x * blockIdx.x) + threadIdx.x;
    const size_t array_size = 4;
    const size_t stride = BLOCK_SIZE * array_size;

    __shared__ volatile int previous_stride_end;
    if (tid == 0)
        previous_stride_end = 0;

    // --- CUB Templates
    typedef cub::BlockLoad<float, BLOCK_SIZE, array_size, cub::BLOCK_LOAD_DIRECT> BlockLoad_F;
    typedef cub::BlockLoad<int, BLOCK_SIZE, array_size, cub::BLOCK_LOAD_DIRECT> BlockLoad_I;
    typedef cub::BlockStore<int, BLOCK_SIZE, 4, cub::BLOCK_STORE_DIRECT> BlockStore;
    typedef cub::BlockAdjacentDifference<int, BLOCK_SIZE> BlockAdjacentDifference;
    typedef cub::BlockDiscontinuity<int, BLOCK_SIZE> BlockDiscontinuity;
    typedef cub::BlockReduce<int, BLOCK_SIZE> BlockReduce;

    // --- Shared Memory
    __shared__ union TempStorage
    {
        typename BlockLoad_F::TempStorage load_f;
        typename BlockLoad_I::TempStorage load_i;
        typename BlockStore::TempStorage store;
        typename BlockAdjacentDifference::TempStorage adjDiff;
        typename BlockDiscontinuity::TempStorage heads;
        typename BlockReduce::TempStorage reduce;
    } temp_storage;

    __shared__ int sm_unreduced_values[stride];
    __shared__ int sm_reduced_values[stride];

    // --- Thread Data
    float thread_input_1[array_size];
    float thread_input_2[array_size];
    int ratio[array_size];
    int adj_diff[array_size];
    int flag_heads[array_size];

    int valid_items = length;
    int write_index = 0;

    // --- Block-Stride Loop
    for (uint index=0; index < length; index += stride)
    {
        float* p_input_1 = input_1 + index;
        float* p_input_2 = input_2 + index;

        // --- Block Load [4]
        BlockLoad_F(temp_storage.load_f).Load(p_input_1, thread_input_1, valid_items, -1);
        BlockLoad_F(temp_storage.load_f).Load(p_input_2, thread_input_2, valid_items, -1);
        __syncthreads();

        // --- Ratio
        for (int i=0; i<array_size; i++)
            ratio[i] = static_cast<int>(thread_input_1[i] / thread_input_2[i]);

        // --- Adjacent Difference
        BlockAdjacentDifference(temp_storage.adjDiff).SubtractLeft(ratio, adj_diff, CustomDifference(), previous_stride_end);
        BlockStore(temp_storage.store).Store(sm_unreduced_values, adj_diff, valid_items);

        /* ---------------------------------------------------------------
        *      This is what I would like to do, if segmented block functions are possible.
        * --------------------------------------------------------------- */

        int unreduced[4];
        int count_unreduced = stride;
        int num_segments;

        for (int i=0; i<5; i++)
        {
            // ------------ Increasing ------------- //

            // --- Load shared memory into local threads
            BlockLoad_I(temp_storage.load_i).Load(sm_unreduced_values, unreduced, count_unreduced);

            // --- Generate Head Flags
            BlockDiscontinuity(temp_storage.heads).FlagHeads(flag_heads, unreduced, Increasing(i));
            num_segments = BlockReduce(temp_storage.reduce).Sum(flag_heads);

            // --- Segmented Reduction
            //      Reduce data loaded from shared memory by segment, and write reduced sums back to shared memory
            BlockReduce(temp_storage.reduce).Reduce(unreduced, cub::Sum(), flag_heads, sm_reduced_values, num_segments);


            // ------------ Decreasing ------------- //

            // --- Load shared memory into local threads
            BlockLoad_I(temp_storage.load_i).Load(sm_reduced_values, unreduced, num_segments);

            // --- Generate head flags
            BlockDiscontinuity(temp_storage.heads).FlagHeads(flag_heads, unreduced, Decreasing(-1 * i));
            num_segments = BlockReduce(temp_storage.reduce).Sum(flag_heads);

            // --- Segmented Reduction
            //      Reduce data loaded from shared memory by segment, and write reduced sums back to shared memory
            BlockReduce(temp_storage.reduce).Reduce(unreduced, cub::Sum(), flag_heads, sm_unreduced_values, num_segments);
        }

        // --- Save last position for future iterations
        if (tid == (BLOCK_SIZE - 1))
            previous_stride_end = ratio[array_size - 1];
        __syncthreads();

        // --- Write Results to global memory
        BlockStore(temp_storage.store).Store(&results[write_index], adj_diff, valid_items);
        __syncthreads();

        // --- Update Pointers
        write_index += stride;
        valid_items -= stride;
    }
}
  1. Are you wanting this to happen in the scope of a single block (i.e. each block does its own segmented reduce, independent of other blocks) or are you wanting many blocks to cooperate on a single segmented reduce (which is more-or-less what the cub::Device… function does).

  2. Are you looking for only an implementation using cub? Or any available library implementation? Or any implementation (including straight CUDA code)?

  3. You mention “I’d like to use data that’s already been loaded in from global memory.” What do you mean by that? Can the data be in global memory? If not in global memory, where is it? (shared memory?) Or is the only expectation that you can embed this in device code (not necessary to call from host)?

  4. You say: " This Stack Overflow Q.A gives a decent iterative solution." That’s puzzling to me. You’re asking about a segmented reduce, that is effectively a segmented scan. What do you mean by “iterative”? It has multiple steps, to be sure, but how does that relate to your question? How does that statement relate to “I’d like to utilize a library”? (It uses a library!?) Do you mean you want a single library call to do all the work?

  5. It might be important to indicate the expected range of the average run length. A run is a sequence of adjacent values that have identical keys, i.e. would be reduced to the same result element. It might also be useful to know typical data set lengths. Obviously you can write an algorithm that is independent of these considerations, but an optimal approach (better relative performance) might possibly be different depending on these data set characteristics.

Are you looking for a segmented reduction (given segment begin and end, reduce all elements in the segment) , or are you looking for a reduction by key (given key-value pairs, reduce all values of consecutive equal keys) ?

Reduce-by-key can be implemented as a prefix-scan with custom scan operation.

@Robert_Crovella thanks for asking great clarifying questions. When I’ve been strugling with the details for a while, its easy to forget which parts of the broader context aren’t clear from my question.

1. Are you wanting this to happen in the scope of a single block?

Yes. Ideally this would happen within a single block. The way the rest of the project is currently structured, each block is responsible for one pair of input float arrays. The floats represent asset prices that are evaluated to create the eventual point-and-figure chart. Each asset in the list is paired with each other asset in the list. I’m trying to structure it such that one block does the analysis for one pair.

The example kernel I gave would only run one block, but in the full version there would be ~1,000 - 10,000 blocks, depending on the number of asset pairs.

2. Are you looking for an implementation using cub?

No, I’m open to whatever solution is both feasible and performant. My original implementation used thrust exclusively.

// --- pseudo algorithm
for(i=0; i<5; i++) {
    // generate keys
    thrust::transform(data.begin(), data.end(), keys.begin, functor(i));
    // reduce
    thrust::reduce_by_key(keys.begin(), keys.end(), data.begin(), key_out.begin(), val_out.begin();
}

Custom functors are used to determine if data is increasing or decreasing with a fluctuation >= i (1-4)

3. You mention “I’d like to use data that’s already been loaded in from global memory.” What do you mean by that?

You are correct that it is loaded from global memory the first time, but I’d like to only pay that cost once. Once the data is loaded into the block, I need to be able to run multiple reduce_by_key operations on the same data; incrementally reducing fluctuations to determine the increasing / decreasing trends and eliminate noise.

If I understand correctly, the thrust functions can only accept a thrust vector iterator as input and output. So when I call reduce_by_key(data.begin(), ...). The function loads the data from global memory, operates on the data, and then writes it back to global memory before doing the next round of reductions. This seems a high cost to pay since I need to then do another reduction on the output that was just generated.

4. What do you mean by “iterative”?

You are again correct that I was referring to the multiple steps. I should have been more clear. The problem is not that there are multiple steps, but that the later steps require the prior results from previous segments to be available.

  • In the example given, the segment bounds are known ahead of time.
  • In my case, the segments are determined at runtime after doing
ratio[i] = price_1[i] / price_2[i];
cub::BlockAdjacenctDifference<>(ratio, adj_diff);

This means that one segment could end up spanning more data than I can load into the block at a time using cub::BlockLoad. I tried accounting for this and stored all the previous results in shared memory. Since the cub functions rely on shared memory as well for temp_storage the algorithm I tried to write was severely limited by the memory available.

Reducing the amount of data loaded at one time reduced the shared memory required for temp_storage, but that increased the likelihood that a segment would span multiple blocks, which increased the demand for shared memory. I wasn’t able to find a solution that was stable without running into memory limits.

That’s the main reason I wanted to utilize a library as much as possible. I know the algorithm can be solved, but I also know that developers “much smarter than me” have written and tuned a lot of the library functions and have probably already considered that case and found good solutions. Since that QA was given almost 5 years ago, I was hoping there was a library function available similar to thrust’s reduce_by_key which handles arbitrarily long segments of data so that I wouldn’t have to reinvent the wheel. I’m also perfectly happy to utilize straight CUDA code, but given the trouble I’ve had getting the algorithm to work on arbitrarily long segments I thought a library call would be more stable.

5. Expected Ranges

  • Total array length : 10K → 30K floats
  • Average run length : 1 → 500

The run length can vary significantly. Comparing any two assets might have fluctuations and noise almost constantly, while another pair of assets might have multiple years of steady growth or decline or no change at all relative to each other.

6. Additional note

I realize my question was quite verbose, and I appreciate you taking the time to read and ask good clarifying questions. Given the amount of time I’ve spent trying to find a better solution that is more stable I wanted to lay out as many of the issues I’ve run into as possible.

I don’t know if the best path forward will be a library function that already exists that I’m unaware of, or if there’s a straight CUDA solution I’ve missed. I don’t mean to devalue your time by asking too many questions; I wanted to give as much background as possible for what I’ve tried, what’s worked, and where I’ve gotten stuck. I am well aware that often those of us will relatively less experience will get stuck at something glaringly obvious. I’ve tried to get as far as I can on my own with this problem and I am beyond grateful for the questions and answers you’ve provided on this and other forums that have answered the multitude of other questions I’ve had over the years.

One possible approach:

Given a value array:

{0, 2, 1, -2, 0, 3, 4}

and a flag array which marks the end of each segment (inclusive):

{0, 1, 1, 0, 1, 0, 1}

we could perform an ordinary prefix sum on the value array:

{0, 2, 3, 1, 1, 4, 8}

and then compute the segment sum results by subtracting values at the flag positions:

prefix sum:                {0, 2, 3, 1, 1, 4, 8}
flags:                     {0, 1, 1, 0, 1, 0, 1}
segment sums:              {   2, 1,   -2,    7}

it might not be obvious how to get to the segment sums. You could do a typical parallel stream-compaction operation on the prefix sums, selecting the values indicated in the flag array. Then do an adjacent difference, prepending 0 to the stream-compacted array.

prefix sum:                {0, 2, 3, 1, 1, 4, 8}
flags:                     {0, 1, 1, 0, 1, 0, 1}
stream-compacted:          {2, 3, 1, 8}
prepend 0:                 {0, 2, 3, 1, 8}
adjacent diff:             {   2, 1,-2, 7}

Therefore, given a parallel prefix sum implementation at the block level (which cub provides) and using it for both the values prefix-sum as well as the stream compaction op, that could be a roadmap to create a block-level segmented sum using other “primitives” - prefix sum, indexed copy, and adjacent difference.

striker159 may know of a more elegant approach

I would suggest following the approach used in cub for the device-wide reduce-by-key. https://github.com/NVIDIA/cccl/blob/main/cub/cub/agent/agent_reduce_by_key.cuh#L486

The approach is similar to Robert_Crovella’s with the improvement that the output offsets of compacted elements can be computed in the same prefix scan operation that computes the segment sums.

Keys are adjacent_differences. Values is a constant array of ones.

keys :    {0,2,1,-2,0,3,4}
values :   {1,1,1,1,1,1,1}
head flags:{1,0,1,1,0,1,0}

You can create (head flag, value) pairs and compute an exclusive scan with the following scan op (https://github.com/NVIDIA/cccl/blob/main/cub/cub/thread/thread_operators.cuh#L327)

scan_op(pairA, pairB){
  result.flag = pairA.flag + pairB.flag;
  if(pairB.flag == 1) //B is new segment
      result.value = pairB.value;
  else // same segment
      result.value = pairA.value + pairB.value
}

This would give

keys :          {0,2,1,-2,0,3,4}
values :        {1,1,1,1,1,1,1}
head flags:     {1,0,1,1,0,1,0}
scanned_values: {1,2,1,1,2,1,2}
scanned_flags:  {0,1,1,2,3,3,4}

Then if headflags[i] == 1 , copy scanned_values[i] to output[scanned_flags[i]]
(edit: I think I made a mistake computing scanned_values. They seem to be offset by 1 position)

The tricky part is how to handle more elements than fit into the registers of a block. You need to do multiple iterations and communicate some data between iterations. If the first key of the current iteration is equal to the last key of the previous iteration the first head flag must be set to 0 and the value must be initialized with the previous count. The final count must overwrite the count that was previously written to output.

@striker159 and @Robert_Crovella.
Thank you both for the suggestions and clarification. I was able to get the code working with the help of your suggestions, and the examples provided in CUB.

Clarification

I ran into some behavior I didn’t understand. I wonder if you could add some clarification.

When I used the scan_op as suggested, it combined segments together even if there was a head flag in between, indicating that they should be two separate segments.

Problem

struct custom_inequality{
    __device__
    bool operator()(const int &a, const int &b) {
        return ((a > 0) && (b <= 0) || (b > 0) && (a <= 0));
    }
};

__device__ 
void block_segmented_reduce_kernel(...) {
    ...
    BlockLoad().Load(p_input, input);

    // --- Section Flag Heads [+/-]
    BlockDiscontinuity().FlagHeads(flags, input, custom_ineq());

    // --- Zip items
    for (i : items_per_thread)
        scan_items[item] = { flags[item], input[item] };

    BlockScan().InclusiveScan(scan_items, scan_results, scan_op);
input    : [ 1  1 -1 -1 ][ 1  1  1  1 ][-1  1  1  1 ][ 1  1  1  1 ]
flags    : [ 1  0  1  0 ][ 1  0  0  0 ][ 1  1  0  0 ][ 0  0  0  0 ]

segment  : [ 1  1  2  2 ][ 3  3  3  3 ][ 4  5  5  5 ][ 5  5  5  5 ]
scan_val : [ 1  2 -1 -2 ][ 1  2  3  4 ][-1  1  2  3 ][ 8  9 10 11 ]

Notice section 5 (thread 3) continues the sum from section 3 (thread 2).

I was able to get the correct result by doing 1 of 2 things.

1. Prefix scan the flags first

  • prefix scan the flags
    • this gives the segment for each item
  • prefix scan the pairs
    • if keys match, sum values
scan_op(A, B) 
    result.key = B.key;

    if (A.key == B.key)
        result.value == A.value + B.value;
    else
        result.value = B.value;

2. Replace if(B.key == 1) with if(B.key)
This second solution is what I’d like clarification on.

When I changed the scan_op functor slightly, I got the correct results without needing to do a prefix scan on the flags first.

This updated functor produces the correct result every time:

typedef cub::KeyValuePair<int, int> kv_pair;
__device__ 
kv_pair scan_op(kv_pair A, kv_pair B) {
    kv_pair result;
    result.key = A.key + B.key;

    if (B.key)
        result.value = B.value;
    else
        result.value = A.value + B.value;

    return result;
}

By only changing the conditional check, the results are always correct. Segments are not combined.

// if (pair_B.key == 1)
if (pair_B.key)

Can you help me understand why evaluating B.key == 1 behaved differently than evaluating B.key ?

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.