still draft, to be updated

实现了一个dynamic_map的retrieve_all

访存算子,经典过滤问题,predict为true的元素拷贝到out数组,重点是需要维护一个 atomic 的out index, https://developer.nvidia.com/blog/cuda-pro-tip-optimized-filtering-warp-aggregated-atomics/, 翻译:https://zhuanlan.zhihu.com/p/581078557

host端:

template <typename Key, typename Value, cuda::thread_scope Scope, typename Allocator>
template <typename KeyOut, typename ValueOut>
std::pair<KeyOut, ValueOut> dynamic_map<Key, Value, Scope, Allocator>::retrieve_all(
  KeyOut keys_out, ValueOut values_out, cudaStream_t stream) const
{
  auto constexpr block_size = 128;
  auto constexpr stride     = 1;

  auto const capacity       = get_capacity();
  auto grid_size            = (capacity + stride * block_size - 1) / (stride * block_size);

  std::vector<size_t> submap_cap_prefix(submaps_.size());
  std::inclusive_scan(submaps_.begin(), submaps_.end(), submap_cap_prefix.begin(),
                      [](auto const& sum, auto const& submap) { return sum + submap->get_capacity(); }, 
                      (size_t)0);
  thrust::device_vector<size_t> submap_cap_prefix_d(submap_cap_prefix);

  // 复用alloc_(用于slots_的alloc_)会比直接cudaMalloc快一个数量级,不需要重新分配内存
  // 单纯cudaMalloc会触发GPU driver/runtime 的 allocation 初始化、页表建立等
  using temp_allocator_type =
    typename std::allocator_traits<Allocator>::template rebind_alloc<char>;
  auto temp_allocator = temp_allocator_type{alloc_};
  auto d_num_out      = reinterpret_cast<unsigned long long*>(
    std::allocator_traits<temp_allocator_type>::allocate(temp_allocator, sizeof(unsigned long long)));
  CUCO_CUDA_TRY(cudaMemsetAsync(d_num_out, 0, sizeof(unsigned long long), stream));
  
  detail::retrieve_all<block_size><<<grid_size, block_size, 0, stream>>>(
    keys_out, values_out, submap_views_.data().get(), submaps_.size(), 
    capacity, d_num_out, submap_cap_prefix_d.data().get(), 
    empty_key_sentinel_, erased_key_sentinel_);

  size_t h_num_out;
  CUCO_CUDA_TRY(
    cudaMemcpyAsync(&h_num_out, d_num_out, sizeof(size_t), cudaMemcpyDeviceToHost, stream));
  CUCO_CUDA_TRY(cudaStreamSynchronize(stream));
  CUCO_CUDA_TRY(cudaFree(d_num_out))
  return {keys_out + h_num_out, values_out + h_num_out};
}
  1. naive实现,一个全局atomic
template <uint32_t block_size,
          typename OutputIt,
          typename viewT,
          typename PrefixT,
          typename Key>
CUCO_KERNEL void retrieve_all(OutputIt keys_out,
                              OutputIt values_out,
                              viewT* submap_views,
                              uint32_t num_submaps,
                              uint64_t capacity,
                              unsigned long long* d_num_out,
                              PrefixT* prefix_sum,
                              Key empty_key_sentinel,
                              Key erased_key_sentinel)
{
  auto tid    = blockDim.x * blockIdx.x + threadIdx.x;
  auto stride = blockDim.x * gridDim.x;

  for (; tid < capacity; tid += stride) {
    uint32_t submap_idx = 0;
    uint32_t submap_offset = tid;
    // prefix_sum长度一般就10以内,不需要二分之类的操作
    while (tid >= prefix_sum[submap_idx] && submap_idx < num_submaps ) ++submap_idx;

    if (submap_idx > 0) {
      submap_offset = tid - prefix_sum[submap_idx - 1];
    }
    auto const &current_slot  = submap_views[submap_idx].get_slots()[submap_offset];
    Key const existing_key    = current_slot.first.load(cuda::std::memory_order_relaxed);
    auto const is_filled =
      not(cuco::detail::bitwise_compare(existing_key, empty_key_sentinel) or
          cuco::detail::bitwise_compare(existing_key, erased_key_sentinel));

    if (is_filled) {
      auto idx   = atomicAdd(d_num_out, static_cast<unsigned long long>(1));
      auto value = current_slot.second.load(cuda::std::memory_order_relaxed);
      keys_out[idx]    = existing_key;
      values_out[idx]  = value;
    }
  }
}
  1. block内atomicAdd + 全局atomicAdd
// 一个block内用一个__shared__ local_count表示这个block中predict为true的数量
// local_pos表示当前线程在该block内第几个predict为true

template <uint32_t block_size,
          typename OutputIt,
          typename viewT,
          typename PrefixT,
          typename Key>
CUCO_KERNEL void retrieve_all(OutputIt keys_out,
                              OutputIt values_out,
                              viewT* submap_views,
                              uint32_t num_submaps,
                              uint64_t capacity,
                              unsigned long long* d_num_out,
                              PrefixT* prefix_sum,
                              Key empty_key_sentinel,
                              Key erased_key_sentinel)
{
  auto tid    = blockDim.x * blockIdx.x + threadIdx.x;
  auto stride = blockDim.x * gridDim.x;

  __shared__ unsigned int local_count;
  if (threadIdx.x == 0) {
    local_count = 0;
  }
  __syncthreads();

  for (; tid < capacity; tid += stride) {
    uint32_t submap_idx = 0;
    uint32_t submap_offset = tid;
    while (tid >= prefix_sum[submap_idx] && submap_idx < num_submaps ) ++submap_idx;

    if (submap_idx > 0) {
      submap_offset = tid - prefix_sum[submap_idx - 1];
    }
    auto const &current_slot  = submap_views[submap_idx].get_slots()[submap_offset];
    Key const existing_key    = current_slot.first.load(cuda::std::memory_order_relaxed);
    auto const is_filled =
      not(cuco::detail::bitwise_compare(existing_key, empty_key_sentinel) or
          cuco::detail::bitwise_compare(existing_key, erased_key_sentinel));

    unsigned int local_pos = 0;
    if (is_filled) {
      local_pos = atomicAdd_block(&local_count, 1);
    }
    __syncthreads();

    if (threadIdx.x == 0) {
      local_count = atomicAdd(d_num_out, local_count);
    }
    __syncthreads();

    if (is_filled) {
      auto value = current_slot.second.load(cuda::std::memory_order_relaxed);
      keys_out[local_count + local_pos]    = existing_key;
      values_out[local_count + local_pos]  = value;
    }
  }
}


// 类似原理,但是用cub::BlockScan实现
template <uint32_t block_size,
          typename OutputIt,
          typename viewT,
          typename PrefixT,
          typename Key>
CUCO_KERNEL void retrieve_all(OutputIt keys_out,
                              OutputIt values_out,
                              viewT* submap_views,
                              uint32_t num_submaps,
                              uint64_t capacity,
                              unsigned long long* d_num_out,
                              PrefixT* prefix_sum,
                              Key empty_key_sentinel,
                              Key erased_key_sentinel)
{
  using BlockScan = cub::BlockScan<unsigned int, block_size>;

  // Shared memory
  __shared__ typename BlockScan::TempStorage scan_temp_storage;
  __shared__ unsigned int block_base;

  auto tid    = blockDim.x * blockIdx.x + threadIdx.x;
  auto stride = blockDim.x * gridDim.x;

  for (; tid < capacity; tid += stride) {
    // Compute submap index and offset
    uint32_t submap_idx = 0;
    uint32_t submap_offset = tid;
    while (tid >= prefix_sum[submap_idx] && submap_idx < num_submaps) ++submap_idx;
    if (submap_idx > 0) {
      submap_offset = tid - prefix_sum[submap_idx - 1];
    }

    auto const& current_slot = submap_views[submap_idx].get_slots()[submap_offset];
    Key const existing_key   = current_slot.first.load(cuda::std::memory_order_relaxed);

    // Check key validity
    bool is_filled = not(cuco::detail::bitwise_compare(existing_key, empty_key_sentinel) ||
                         cuco::detail::bitwise_compare(existing_key, erased_key_sentinel));

    // Perform block-wide exclusive scan to compute local write index
    unsigned int local_idx = 0;
    unsigned int total_valid = 0;
    BlockScan(scan_temp_storage).ExclusiveSum(is_filled ? 1u : 0u, local_idx, total_valid);

    // Block leader calculates global offset
    if (threadIdx.x == 0) {
      block_base = atomicAdd(d_num_out, total_valid);
    }
    __syncthreads();

    if (is_filled) {
      auto value = current_slot.second.load(cuda::std::memory_order_relaxed);
      keys_out[block_base + local_idx]   = existing_key;
      values_out[block_base + local_idx] = value;
    }
  }
}

3. warp-aggregated atomics: warp(or cooperative group)粒度atomicAdd + block内atomicAdd+全局atomicAdd
```cpp
template <uint32_t block_size,
          typename OutputIt,
          typename viewT,
          typename PrefixT,
          typename Key>
CUCO_KERNEL void retrieve_all(OutputIt keys_out,
                              OutputIt values_out,
                              viewT* submap_views,
                              uint32_t num_submaps,
                              uint64_t capacity,
                              unsigned long long* d_num_out,
                              PrefixT* prefix_sum,
                              Key empty_key_sentinel,
                              Key erased_key_sentinel)
{
  auto tid    = blockDim.x * blockIdx.x + threadIdx.x;
  auto stride = blockDim.x * gridDim.x;

  __shared__ unsigned int block_count;
  __shared__ unsigned int block_base;
  if (threadIdx.x == 0) {
    block_count = 0;
    block_base = 0;
  }
  __syncthreads();

  unsigned int local_idx = 0;
  for (; tid < capacity; tid += stride) {
    uint32_t submap_idx = 0;
    uint32_t submap_offset = tid;
    while (tid >= prefix_sum[submap_idx] && submap_idx < num_submaps ) ++submap_idx;

    if (submap_idx > 0) {
      submap_offset = tid - prefix_sum[submap_idx - 1];
    }
    auto const &current_slot  = submap_views[submap_idx].get_slots()[submap_offset];
    Key const existing_key    = current_slot.first.load(cuda::std::memory_order_relaxed);
    auto const is_filled =
      not(cuco::detail::bitwise_compare(existing_key, empty_key_sentinel) or
          cuco::detail::bitwise_compare(existing_key, erased_key_sentinel));

    unsigned mask = __ballot_sync(0xffffffff, is_filled);
    int lane = threadIdx.x & 0x1f;
    int warp_prefix = __popc(mask & ((1u << lane) - 1));
    if (is_filled) local_idx = warp_prefix;

    unsigned int warp_vote = __popc(mask);
    unsigned int warp_base = 0;
    if (lane == 0 && warp_vote) {
      warp_base = atomicAdd_block(&block_count, warp_vote);
    }
    warp_base = __shfl_sync(0xffffffff, warp_base, 0);
    __syncthreads();

    if (threadIdx.x == 0) {
      block_base = atomicAdd(d_num_out, block_count);
    }
    __syncthreads();

    if (is_filled) {
      auto value = current_slot.second.load(cuda::std::memory_order_relaxed);
      keys_out[block_base + warp_base + local_idx]    = existing_key;
      values_out[block_base + warp_base + local_idx]  = value;
    }
  }
}

// 类似的,但是用cooperative group实现,实测还是tile_size=32最快,和warp没区别
// 写起来更modern一点
template <uint32_t block_size,
          uint32_t tile_size,
          typename OutputIt,
          typename viewT,
          typename PrefixT,
          typename Key>
CUCO_KERNEL void retrieve_all(OutputIt keys_out,
                              OutputIt values_out,
                              viewT* submap_views,
                              uint32_t num_submaps,
                              uint64_t capacity,
                              unsigned long long* d_num_out,
                              PrefixT* prefix_sum,
                              Key empty_key_sentinel,
                              Key erased_key_sentinel)
{
  auto tile   = cg::tiled_partition<tile_size>(cg::this_thread_block());
  auto block  = cg::this_thread_block();
  auto tid    = blockDim.x * blockIdx.x + threadIdx.x;
  auto stride = blockDim.x * gridDim.x;

  __shared__ unsigned int block_count;
  __shared__ unsigned int block_base;
  if (threadIdx.x == 0) {
    block_count = 0;
    block_base = 0;
  }
  block.sync();

  for (; tid < capacity; tid += stride) {
    uint32_t submap_idx = 0;
    uint32_t submap_offset = tid;
    while (tid >= prefix_sum[submap_idx] && submap_idx < num_submaps ) ++submap_idx;

    if (submap_idx > 0) {
      submap_offset = tid - prefix_sum[submap_idx - 1];
    }
    auto const &current_slot  = submap_views[submap_idx].get_slots()[submap_offset];
    Key const existing_key    = current_slot.first.load(cuda::std::memory_order_relaxed);
    auto const is_filled =
      not(cuco::detail::bitwise_compare(existing_key, empty_key_sentinel) or
          cuco::detail::bitwise_compare(existing_key, erased_key_sentinel));

    unsigned int tile_mask = tile.ballot(is_filled);
    unsigned int tile_rank = tile.thread_rank();
    unsigned int tile_vote = __popc(tile_mask);
    unsigned int tile_prefix = __popc(tile_mask & ((1u << tile_rank) - 1));
    
    unsigned int tile_base = 0;
    if (tile_rank == 0 && tile_mask) {
      tile_base = atomicAdd_block(&block_count, tile_vote);
    }
    tile_base = tile.shfl(tile_base, 0);
    block.sync();

    if (block.thread_rank() == 0) {
      block_base = atomicAdd(d_num_out, block_count);
    }
    block.sync();

    if (is_filled) {
      auto value = current_slot.second.load(cuda::std::memory_order_relaxed);
      keys_out[block_base + tile_base + tile_prefix]    = existing_key;
      values_out[block_base + tile_base + tile_prefix]  = value;
    }
  }
}

实测2/3速度差不多,在插入1亿数据后(实际总cap达到2亿),<key, value>都是cuda::atomic<int64_t>的情况下,retrieve_all cost 3ms左右;