nppiCountInRange returns NPP_RANGE_ERROR when called from multiple threads (CUDA 9.1)

I am working on an image processing application which makes liberal use of CPU threads, CUDA streams and async memcpy operations to keep the GPU utilization high. One part uses a thread pool (or std::async in the reproduce case) to count the non-zero pixels in tiles of an image using nppiCountInRange.

This code runs fine when:

  • CPU threading is disabled
  • NPP uses the default stream (nppSetStream(cudaStreamPerThread); is not called after context creation)

Minimal reproduce case:

#include <algorithm>
#include <cassert>
#include <cuda.h>
#include <future>
#include <iostream>
#include <nppcore.h>
#include <nppi.h>
#include <tuple>
#include <vector>

std::ostream& operator<<(std::ostream& os, const NppiSize& sz)
{
  os << "(" << sz.width << "," << sz.height << ")";
  return os;
}

std::ostream& operator<<(std::ostream& os, const NppiPoint& p)
{
  os << "(" << p.x << "," << p.y << ")";
  return os;
}


#define CHECK_NPP(rc)                                                                                                  \
  do                                                                                                                   \
  {                                                                                                                    \
    const auto _rc = rc;                                                                                               \
    assert(_rc == NPP_NO_ERROR);                                                                                       \
  } while (false);

#define CHECK_CU(rc)                                                                                                   \
  do                                                                                                                   \
  {                                                                                                                    \
    const auto _rc = rc;                                                                                               \
    assert(_rc == CUDA_SUCCESS);                                                                                       \
  } while (false)


int main(int argc, char** argv)
{
  if (argc < 2)
  {
    std::cerr << "Usage: " << argv[0] << " D" << std::endl << " where D is the CUDA device ID" << std::endl;
    return 1;
  }

  CHECK_CU(cuInit(0));

  CUdevice cuDevice;
  CHECK_CU(cuDeviceGet(&cuDevice, atoi(argv[3])));
  CUcontext cuContext;
  CHECK_CU(cuCtxCreate(&cuContext, CU_CTX_SCHED_AUTO, cuDevice));

  nppSetStream(cudaStreamPerThread);

  const auto inputDim = NppiSize{1920, 1088};
  int inputPitch;
  auto inputData = nppiMalloc_8u_C1(inputDim.width, inputDim.height, &inputPitch);
  assert(inputData);
  std::cout << "Input image: " << inputDim << ", pitch: " << inputPitch << std::endl;
  CHECK_NPP(nppiSet_8u_C1R(0, inputData, inputPitch, inputDim));

  const auto tileDim = NppiSize{16, 16};

  std::vector<std::tuple<NppiPoint, std::future<int32_t>>> results;

  for (int y = 0; y < inputDim.height; y += tileDim.height)
  {
    for (int x = 0; x < inputDim.width; x += tileDim.width)
    {
      const auto pt = NppiPoint{x, y};
      results.push_back(std::make_tuple(
        pt, std::async([=]() {
          CHECK_CU(cuCtxPushCurrent(cuContext));

          const auto clampedDim = NppiSize{std::min(tileDim.width, inputDim.width - pt.x),
                                           std::min(tileDim.height, inputDim.height - pt.y)};
          auto tileData = inputData + pt.y * inputPitch + pt.x;

          int bufferSize;
          CHECK_NPP(nppiCountInRangeGetBufferHostSize_8u_C1R(inputDim, &bufferSize));
          CUdeviceptr buffer_d;
          CHECK_CU(cuMemAlloc(&buffer_d, bufferSize));

          CUdeviceptr result_d;
          CHECK_CU(cuMemAlloc(&result_d, sizeof(int)));
          CHECK_NPP(nppiCountInRange_8u_C1R(tileData, inputPitch, clampedDim, reinterpret_cast<int*>(result_d), 1,
                                            255, reinterpret_cast<Npp8u*>(buffer_d)));

          int result;
          CHECK_CU(cuMemcpyDtoH(&result, result_d, sizeof(int)));

          CHECK_CU(cuMemFree(result_d));
          CHECK_CU(cuMemFree(buffer_d));

          return result;
        })));
    }
  }

  for (auto& r : results)
  {
    std::cout << std::get<NppiPoint>(r) << ": " << std::get<std::future<int32_t>>(r).get() << std::endl;
  }

  nppiFree(inputData);

  return 0;
}

Updated reproduction code in response to reply on associated bug ticket (https://developer.nvidia.com/nvidia_bug/2085264)

Am now manually creating a CUDA stream per thread, and using an explicit thread pool based on boost.asio (in place of std::async). The thread pool ensures the current stream is set (as per suggestion) by doing:

current Npp stream = nppGetStream();
if (new Npp stream != current Npp stream)
{
cudaStreamSynchronize(current Npp stream);
nppSetStream(new Npp stream);
}

The nppiCountInRange_8u_C1R call still returns NPP_RANGE_ERROR. Uncommenting the USE_DEFAULT_STREAM #define (everything executes on stream 0) still works as before.

#include <algorithm>
#include <atomic>
#include <boost/asio.hpp>
#include <boost/functional/hash.hpp>
#include <boost/thread.hpp>
#include <cassert>
#include <cuda.h>
#include <future>
#include <iostream>
#include <nppcore.h>
#include <nppi.h>
#include <thread>
#include <tuple>
#include <type_traits>
#include <unordered_map>

//#define USE_DEFAULT_STREAM

#define CHECK_NPP(rc)                                                                                                  \
  do                                                                                                                   \
  {                                                                                                                    \
    const auto _rc = rc;                                                                                               \
    assert(_rc == NPP_NO_ERROR);                                                                                       \
  } while (false);

#define CHECK_CU(rc)                                                                                                   \
  do                                                                                                                   \
  {                                                                                                                    \
    const auto _rc = rc;                                                                                               \
    assert(_rc == CUDA_SUCCESS);                                                                                       \
  } while (false)

inline bool operator==(const NppiPoint& lhs, const NppiPoint& rhs)
{
  return lhs.x == rhs.x && lhs.y == rhs.y;
}

namespace std
{
template<>
struct hash<NppiPoint>
{
  using argument_type = NppiPoint;
  using result_type = std::size_t;

  result_type operator()(argument_type const& s) const noexcept
  {
    return boost::hash_value(std::make_pair(s.x, s.y));
  }
};
}

class CudaThreadPool
{
  class Worker
  {
  private:
    boost::asio::io_service& ioService_;
    const std::atomic<bool>& stop_;

  public:
    Worker(boost::asio::io_service& ioService, const std::atomic<bool>& stop) : ioService_(ioService), stop_(stop)
    {
    }

    void run()
    {
      while (!stop_)
      {
        try
        {
          ioService_.run();
          break;
        }
        catch (const std::exception& e)
        {
          std::cerr << "Unhandled exception in worker: " << e.what() << "\n";
          assert(false);
        }
      }
    }
  };

  std::atomic<bool> stop_{false};

  boost::thread_group workers_;
  boost::asio::io_service ioService_;
  CUcontext cuContext_;
  std::unique_ptr<boost::asio::io_service::work> idleWork_;

  std::mutex m_;
  std::unordered_map<std::thread::id, CUstream> streamMap_;

public:
  CudaThreadPool(CUcontext ctx)
    : cuContext_{ctx}, idleWork_(std::make_unique<boost::asio::io_service::work>(ioService_))
  {
    for (size_t i = 0; i < std::thread::hardware_concurrency(); ++i)
    {
      auto worker = std::make_shared<Worker>(ioService_, stop_);
      workers_.create_thread([=]() { return worker->run(); });
    }
  }

  ~CudaThreadPool()
  {
    stop_ = true;
    idleWork_.reset();
    ioService_.stop();
    workers_.join_all();
    for (auto& kv : streamMap_)
    {
      std::cout << "destroying cuda stream: " << kv.second << "\n";
      CHECK_CU(cuStreamDestroy(kv.second));
    }
  }

  cudaStream_t getCudaStream(std::thread::id tid)
  {
#ifdef USE_DEFAULT_STREAM
    return 0;
#else
    std::unique_lock<std::mutex> l{m_};
    auto[it, inserted] = streamMap_.insert(std::make_pair(tid, cudaStream_t{}));
    cudaStream_t* stream = &it->second;
    l.unlock();

    if (inserted)
    {
      CHECK_CU(cuStreamCreate(stream, CU_STREAM_NON_BLOCKING));
      std::cout << "created cuda stream: " << *stream << "\n";
    }

    return *stream;
#endif
  }

  template<typename _Fn, typename... _Args>
  std::future<typename std::invoke_result_t<_Fn, _Args...>> async(_Fn&& __fn, _Args&&... __args)
  {
    using Task = std::packaged_task<typename std::invoke_result_t<_Fn, _Args...>()>;
    auto task = std::make_shared<Task>(std::bind(std::forward<_Fn>(__fn), std::forward<_Args>(__args)...));
    auto result = task->get_future();

    auto taskWrapper = [=]() {
      CHECK_CU(cuCtxPushCurrent(cuContext_));
      const auto tid = std::this_thread::get_id();
      auto currentNppStream = nppGetStream();
      auto threadCudaStream = getCudaStream(tid);
      if (currentNppStream != threadCudaStream)
      {
        std::cout << "tid stream: " << currentNppStream << " -> " << threadCudaStream << "\n";
        cudaStreamSynchronize(currentNppStream);
        nppSetStream(threadCudaStream);
      }
      (*task)();
    };

    ioService_.post(taskWrapper);
    return result;
  }
};

std::ostream& operator<<(std::ostream& os, const NppiSize& sz)
{
  os << "(" << sz.width << "," << sz.height << ")";
  return os;
}

std::ostream& operator<<(std::ostream& os, const NppiPoint& p)
{
  os << "(" << p.x << "," << p.y << ")";
  return os;
}

int main(int argc, char** argv)
{
  if (argc < 2)
  {
    std::cerr << "Usage: " << argv[0] << " D" << std::endl << " where D is the CUDA device ID" << std::endl;
    return 1;
  }

  CHECK_CU(cuInit(0));

  CUdevice cuDevice;
  CHECK_CU(cuDeviceGet(&cuDevice, atoi(argv[3])));
  CUcontext cuContext;
  CHECK_CU(cuCtxCreate(&cuContext, CU_CTX_SCHED_AUTO, cuDevice));

  auto threadPool = CudaThreadPool{cuContext};

  const auto inputDim = NppiSize{1920, 1088};
  int inputPitch;
  auto inputData = nppiMalloc_8u_C1(inputDim.width, inputDim.height, &inputPitch);
  assert(inputData);
  std::cout << "Input image: " << inputDim << ", pitch: " << inputPitch << std::endl;
  CHECK_NPP(nppiSet_8u_C1R(0, inputData, inputPitch, inputDim));

  const auto tileDim = NppiSize{16, 16};

  std::unordered_map<NppiPoint, std::future<std::tuple<int32_t, std::thread::id>>> results;

  for (int y = 0; y < inputDim.height; y += tileDim.height)
  {
    for (int x = 0; x < inputDim.width; x += tileDim.width)
    {
      const auto pt = NppiPoint{x, y};
      results.insert(std::make_pair(pt, threadPool.async([=]() {
        const auto clampedDim =
          NppiSize{std::min(tileDim.width, inputDim.width - pt.x), std::min(tileDim.height, inputDim.height - pt.y)};
        auto tileData = inputData + pt.y * inputPitch + pt.x;

        int bufferSize;
        CHECK_NPP(nppiCountInRangeGetBufferHostSize_8u_C1R(inputDim, &bufferSize));
        CUdeviceptr buffer_d;
        CHECK_CU(cuMemAlloc(&buffer_d, bufferSize));

        CUdeviceptr result_d;
        CHECK_CU(cuMemAlloc(&result_d, sizeof(int)));
        CHECK_NPP(nppiCountInRange_8u_C1R(tileData, inputPitch, clampedDim, reinterpret_cast<int*>(result_d), 1, 255,
                                          reinterpret_cast<Npp8u*>(buffer_d)));

        int result;
        CHECK_CU(cuMemcpyDtoH(&result, result_d, sizeof(int)));

        CHECK_CU(cuMemFree(result_d));
        CHECK_CU(cuMemFree(buffer_d));

        return std::make_tuple(result, std::this_thread::get_id());
      })));
    }
  }

  for (auto& kv : results)
  {
    const auto[result, tid] = kv.second.get();
    auto iosState = boost::io::ios_all_saver{std::cout};
    std::cout << kv.first << ": " << result << ", tid: 0x" << std::hex << tid << std::endl;
  }

  nppiFree(inputData);

  return 0;
}

The above examples are using std=c++17, see below for c++14 adapted version of the second one.

This was compiled and linked on my Ubuntu 16.04 machine with:

g++-5 nppiCountInRange_bug_cpp14.cc -std=c++14 -I /usr/local/cuda-9.1/include -lcuda -lcudart -lnppist -lnppc -lnppidei -lnppisu -L/usr/local/cuda-9.1/lib64 -lboost_system -lboost_thread
#include <algorithm>
#include <atomic>
#include <boost/asio.hpp>
#include <boost/functional/hash.hpp>
#include <boost/thread.hpp>
#include <cassert>
#include <cuda.h>
#include <future>
#include <iostream>
#include <nppcore.h>
#include <nppi.h>
#include <thread>
#include <tuple>
#include <type_traits>
#include <unordered_map>

//#define USE_DEFAULT_STREAM

#define CHECK_NPP(rc)                                                                                                  \
  do                                                                                                                   \
  {                                                                                                                    \
    const auto _rc = rc;                                                                                               \
    assert(_rc == NPP_NO_ERROR);                                                                                       \
  } while (false);

#define CHECK_CU(rc)                                                                                                   \
  do                                                                                                                   \
  {                                                                                                                    \
    const auto _rc = rc;                                                                                               \
    assert(_rc == CUDA_SUCCESS);                                                                                       \
  } while (false)


inline bool operator==(const NppiPoint& lhs, const NppiPoint& rhs)
{
  return lhs.x == rhs.x && lhs.y == rhs.y;
}


namespace std
{
template<>
struct hash<NppiPoint>
{
  using argument_type = NppiPoint;
  using result_type = std::size_t;

  result_type operator()(argument_type const& s) const noexcept
  {
    return boost::hash_value(std::make_pair(s.x, s.y));
  }
};
}


class CudaThreadPool
{
  class Worker
  {
  private:
    boost::asio::io_service& ioService_;
    const std::atomic<bool>& stop_;

  public:
    Worker(boost::asio::io_service& ioService, const std::atomic<bool>& stop) : ioService_(ioService), stop_(stop)
    {
    }

    void run()
    {
      while (!stop_)
      {
        try
        {
          ioService_.run();
          break;
        }
        catch (const std::exception& e)
        {
          std::cerr << "Unhandled exception in worker: " << e.what() << "\n";
          assert(false);
        }
      }
    }
  };

  std::atomic<bool> stop_{false};

  boost::thread_group workers_;
  boost::asio::io_service ioService_;
  CUcontext cuContext_;
  std::unique_ptr<boost::asio::io_service::work> idleWork_;

  std::mutex m_;
  using ThreadCUStreamMap = std::unordered_map<std::thread::id, CUstream>;
  ThreadCUStreamMap streamMap_;

public:
  CudaThreadPool(CUcontext ctx)
    : cuContext_{ctx}, idleWork_(std::make_unique<boost::asio::io_service::work>(ioService_))
  {
    for (size_t i = 0; i < std::thread::hardware_concurrency(); ++i)
    {
      auto worker = std::make_shared<Worker>(ioService_, stop_);
      workers_.create_thread([=]() { return worker->run(); });
    }
  }

  ~CudaThreadPool()
  {
    stop_ = true;
    idleWork_.reset();
    ioService_.stop();
    workers_.join_all();
    for (auto& kv : streamMap_)
    {
      std::cout << "destroying cuda stream: " << kv.second << "\n";
      CHECK_CU(cuStreamDestroy(kv.second));
    }
  }

  cudaStream_t getCudaStream(std::thread::id tid)
  {
#ifdef USE_DEFAULT_STREAM
    return 0;
#else
    std::unique_lock<std::mutex> l{m_};
    ThreadCUStreamMap::iterator it;
    bool inserted;
    std::tie(it, inserted) = streamMap_.insert(std::make_pair(tid, cudaStream_t{}));
    cudaStream_t* stream = &it->second;
    l.unlock();

    if (inserted)
    {
      CHECK_CU(cuStreamCreate(stream, CU_STREAM_NON_BLOCKING));
      std::cout << "created cuda stream: " << *stream << "\n";
    }

    return *stream;
#endif
  }

  template<typename _Fn, typename... _Args>
  std::future<typename std::result_of_t<_Fn(_Args...)>> async(_Fn&& __fn, _Args&&... __args)
  {
    using Task = std::packaged_task<typename std::result_of_t<_Fn(_Args...)>()>;
    auto task = std::make_shared<Task>(std::bind(std::forward<_Fn>(__fn), std::forward<_Args>(__args)...));
    auto result = task->get_future();

    auto taskWrapper = [=]() {
      CHECK_CU(cuCtxPushCurrent(cuContext_));
      const auto tid = std::this_thread::get_id();
      auto currentNppStream = nppGetStream();
      auto threadCudaStream = getCudaStream(tid);
      if (currentNppStream != threadCudaStream)
      {
        std::cout << "tid stream: " << currentNppStream << " -> " << threadCudaStream << "\n";
        cudaStreamSynchronize(currentNppStream);
        nppSetStream(threadCudaStream);
      }
      (*task)();
    };

    ioService_.post(taskWrapper);
    return result;
  }
};


std::ostream& operator<<(std::ostream& os, const NppiSize& sz)
{
  os << "(" << sz.width << "," << sz.height << ")";
  return os;
}


std::ostream& operator<<(std::ostream& os, const NppiPoint& p)
{
  os << "(" << p.x << "," << p.y << ")";
  return os;
}


int main(int argc, char** argv)
{
  if (argc < 2)
  {
    std::cerr << "Usage: " << argv[0] << " D" << std::endl << " where D is the CUDA device ID" << std::endl;
    return 1;
  }

  CHECK_CU(cuInit(0));

  CUdevice cuDevice;
  CHECK_CU(cuDeviceGet(&cuDevice, atoi(argv[3])));
  CUcontext cuContext;
  CHECK_CU(cuCtxCreate(&cuContext, CU_CTX_SCHED_AUTO, cuDevice));

  CudaThreadPool threadPool{cuContext};

  const auto inputDim = NppiSize{1920, 1088};
  int inputPitch;
  auto inputData = nppiMalloc_8u_C1(inputDim.width, inputDim.height, &inputPitch);
  assert(inputData);
  std::cout << "Input image: " << inputDim << ", pitch: " << inputPitch << std::endl;
  CHECK_NPP(nppiSet_8u_C1R(0, inputData, inputPitch, inputDim));

  const auto tileDim = NppiSize{16, 16};

  std::unordered_map<NppiPoint, std::future<std::tuple<int32_t, std::thread::id>>> results;

  for (int y = 0; y < inputDim.height; y += tileDim.height)
  {
    for (int x = 0; x < inputDim.width; x += tileDim.width)
    {
      const auto pt = NppiPoint{x, y};
      results.insert(std::make_pair(pt, threadPool.async([=]() {
        const auto clampedDim =
          NppiSize{std::min(tileDim.width, inputDim.width - pt.x), std::min(tileDim.height, inputDim.height - pt.y)};
        auto tileData = inputData + pt.y * inputPitch + pt.x;

        int bufferSize;
        CHECK_NPP(nppiCountInRangeGetBufferHostSize_8u_C1R(inputDim, &bufferSize));
        CUdeviceptr buffer_d;
        CHECK_CU(cuMemAlloc(&buffer_d, bufferSize));

        CUdeviceptr result_d;
        CHECK_CU(cuMemAlloc(&result_d, sizeof(int)));
        CHECK_NPP(nppiCountInRange_8u_C1R(tileData, inputPitch, clampedDim, reinterpret_cast<int*>(result_d), 1, 255,
                                          reinterpret_cast<Npp8u*>(buffer_d)));

        int result;
        CHECK_CU(cuMemcpyDtoH(&result, result_d, sizeof(int)));

        CHECK_CU(cuMemFree(result_d));
        CHECK_CU(cuMemFree(buffer_d));

        return std::make_tuple(result, std::this_thread::get_id());
      })));
    }
  }

  for (auto& kv : results)
  {
    int32_t result;
    std::thread::id tid;
    std::tie(result, tid) = kv.second.get();
    auto iosState = boost::io::ios_all_saver{std::cout};
    std::cout << kv.first << ": " << result << ", tid: 0x" << std::hex << tid << std::endl;
  }

  nppiFree(inputData);

  return 0;
}