still draft, to be updated
实现了一个dynamic_map的retrieve_all
访存算子,经典过滤问题,predict为true的元素拷贝到out数组,重点是需要维护一个 atomic 的out index, https://developer.nvidia.com/blog/cuda-pro-tip-optimized-filtering-warp-aggregated-atomics/, 翻译:https://zhuanlan.zhihu.com/p/581078557
host端:
template <typename Key, typename Value, cuda::thread_scope Scope, typename Allocator>
template <typename KeyOut, typename ValueOut>
std::pair<KeyOut, ValueOut> dynamic_map<Key, Value, Scope, Allocator>::retrieve_all(
KeyOut keys_out, ValueOut values_out, cudaStream_t stream) const
{
auto constexpr block_size = 128;
auto constexpr stride = 1;
auto const capacity = get_capacity();
auto grid_size = (capacity + stride * block_size - 1) / (stride * block_size);
std::vector<size_t> submap_cap_prefix(submaps_.size());
std::inclusive_scan(submaps_.begin(), submaps_.end(), submap_cap_prefix.begin(),
[](auto const& sum, auto const& submap) { return sum + submap->get_capacity(); },
(size_t)0);
thrust::device_vector<size_t> submap_cap_prefix_d(submap_cap_prefix);
// 复用alloc_(用于slots_的alloc_)会比直接cudaMalloc快一个数量级,不需要重新分配内存
// 单纯cudaMalloc会触发GPU driver/runtime 的 allocation 初始化、页表建立等
using temp_allocator_type =
typename std::allocator_traits<Allocator>::template rebind_alloc<char>;
auto temp_allocator = temp_allocator_type{alloc_};
auto d_num_out = reinterpret_cast<unsigned long long*>(
std::allocator_traits<temp_allocator_type>::allocate(temp_allocator, sizeof(unsigned long long)));
CUCO_CUDA_TRY(cudaMemsetAsync(d_num_out, 0, sizeof(unsigned long long), stream));
detail::retrieve_all<block_size><<<grid_size, block_size, 0, stream>>>(
keys_out, values_out, submap_views_.data().get(), submaps_.size(),
capacity, d_num_out, submap_cap_prefix_d.data().get(),
empty_key_sentinel_, erased_key_sentinel_);
size_t h_num_out;
CUCO_CUDA_TRY(
cudaMemcpyAsync(&h_num_out, d_num_out, sizeof(size_t), cudaMemcpyDeviceToHost, stream));
CUCO_CUDA_TRY(cudaStreamSynchronize(stream));
CUCO_CUDA_TRY(cudaFree(d_num_out))
return {keys_out + h_num_out, values_out + h_num_out};
}
- naive实现,一个全局atomic
template <uint32_t block_size,
typename OutputIt,
typename viewT,
typename PrefixT,
typename Key>
CUCO_KERNEL void retrieve_all(OutputIt keys_out,
OutputIt values_out,
viewT* submap_views,
uint32_t num_submaps,
uint64_t capacity,
unsigned long long* d_num_out,
PrefixT* prefix_sum,
Key empty_key_sentinel,
Key erased_key_sentinel)
{
auto tid = blockDim.x * blockIdx.x + threadIdx.x;
auto stride = blockDim.x * gridDim.x;
for (; tid < capacity; tid += stride) {
uint32_t submap_idx = 0;
uint32_t submap_offset = tid;
// prefix_sum长度一般就10以内,不需要二分之类的操作
while (tid >= prefix_sum[submap_idx] && submap_idx < num_submaps ) ++submap_idx;
if (submap_idx > 0) {
submap_offset = tid - prefix_sum[submap_idx - 1];
}
auto const ¤t_slot = submap_views[submap_idx].get_slots()[submap_offset];
Key const existing_key = current_slot.first.load(cuda::std::memory_order_relaxed);
auto const is_filled =
not(cuco::detail::bitwise_compare(existing_key, empty_key_sentinel) or
cuco::detail::bitwise_compare(existing_key, erased_key_sentinel));
if (is_filled) {
auto idx = atomicAdd(d_num_out, static_cast<unsigned long long>(1));
auto value = current_slot.second.load(cuda::std::memory_order_relaxed);
keys_out[idx] = existing_key;
values_out[idx] = value;
}
}
}
- block内atomicAdd + 全局atomicAdd
// 一个block内用一个__shared__ local_count表示这个block中predict为true的数量
// local_pos表示当前线程在该block内第几个predict为true
template <uint32_t block_size,
typename OutputIt,
typename viewT,
typename PrefixT,
typename Key>
CUCO_KERNEL void retrieve_all(OutputIt keys_out,
OutputIt values_out,
viewT* submap_views,
uint32_t num_submaps,
uint64_t capacity,
unsigned long long* d_num_out,
PrefixT* prefix_sum,
Key empty_key_sentinel,
Key erased_key_sentinel)
{
auto tid = blockDim.x * blockIdx.x + threadIdx.x;
auto stride = blockDim.x * gridDim.x;
__shared__ unsigned int local_count;
if (threadIdx.x == 0) {
local_count = 0;
}
__syncthreads();
for (; tid < capacity; tid += stride) {
uint32_t submap_idx = 0;
uint32_t submap_offset = tid;
while (tid >= prefix_sum[submap_idx] && submap_idx < num_submaps ) ++submap_idx;
if (submap_idx > 0) {
submap_offset = tid - prefix_sum[submap_idx - 1];
}
auto const ¤t_slot = submap_views[submap_idx].get_slots()[submap_offset];
Key const existing_key = current_slot.first.load(cuda::std::memory_order_relaxed);
auto const is_filled =
not(cuco::detail::bitwise_compare(existing_key, empty_key_sentinel) or
cuco::detail::bitwise_compare(existing_key, erased_key_sentinel));
unsigned int local_pos = 0;
if (is_filled) {
local_pos = atomicAdd_block(&local_count, 1);
}
__syncthreads();
if (threadIdx.x == 0) {
local_count = atomicAdd(d_num_out, local_count);
}
__syncthreads();
if (is_filled) {
auto value = current_slot.second.load(cuda::std::memory_order_relaxed);
keys_out[local_count + local_pos] = existing_key;
values_out[local_count + local_pos] = value;
}
}
}
// 类似原理,但是用cub::BlockScan实现
template <uint32_t block_size,
typename OutputIt,
typename viewT,
typename PrefixT,
typename Key>
CUCO_KERNEL void retrieve_all(OutputIt keys_out,
OutputIt values_out,
viewT* submap_views,
uint32_t num_submaps,
uint64_t capacity,
unsigned long long* d_num_out,
PrefixT* prefix_sum,
Key empty_key_sentinel,
Key erased_key_sentinel)
{
using BlockScan = cub::BlockScan<unsigned int, block_size>;
// Shared memory
__shared__ typename BlockScan::TempStorage scan_temp_storage;
__shared__ unsigned int block_base;
auto tid = blockDim.x * blockIdx.x + threadIdx.x;
auto stride = blockDim.x * gridDim.x;
for (; tid < capacity; tid += stride) {
// Compute submap index and offset
uint32_t submap_idx = 0;
uint32_t submap_offset = tid;
while (tid >= prefix_sum[submap_idx] && submap_idx < num_submaps) ++submap_idx;
if (submap_idx > 0) {
submap_offset = tid - prefix_sum[submap_idx - 1];
}
auto const& current_slot = submap_views[submap_idx].get_slots()[submap_offset];
Key const existing_key = current_slot.first.load(cuda::std::memory_order_relaxed);
// Check key validity
bool is_filled = not(cuco::detail::bitwise_compare(existing_key, empty_key_sentinel) ||
cuco::detail::bitwise_compare(existing_key, erased_key_sentinel));
// Perform block-wide exclusive scan to compute local write index
unsigned int local_idx = 0;
unsigned int total_valid = 0;
BlockScan(scan_temp_storage).ExclusiveSum(is_filled ? 1u : 0u, local_idx, total_valid);
// Block leader calculates global offset
if (threadIdx.x == 0) {
block_base = atomicAdd(d_num_out, total_valid);
}
__syncthreads();
if (is_filled) {
auto value = current_slot.second.load(cuda::std::memory_order_relaxed);
keys_out[block_base + local_idx] = existing_key;
values_out[block_base + local_idx] = value;
}
}
}
3. warp-aggregated atomics: warp(or cooperative group)粒度atomicAdd + block内atomicAdd+全局atomicAdd
```cpp
template <uint32_t block_size,
typename OutputIt,
typename viewT,
typename PrefixT,
typename Key>
CUCO_KERNEL void retrieve_all(OutputIt keys_out,
OutputIt values_out,
viewT* submap_views,
uint32_t num_submaps,
uint64_t capacity,
unsigned long long* d_num_out,
PrefixT* prefix_sum,
Key empty_key_sentinel,
Key erased_key_sentinel)
{
auto tid = blockDim.x * blockIdx.x + threadIdx.x;
auto stride = blockDim.x * gridDim.x;
__shared__ unsigned int block_count;
__shared__ unsigned int block_base;
if (threadIdx.x == 0) {
block_count = 0;
block_base = 0;
}
__syncthreads();
unsigned int local_idx = 0;
for (; tid < capacity; tid += stride) {
uint32_t submap_idx = 0;
uint32_t submap_offset = tid;
while (tid >= prefix_sum[submap_idx] && submap_idx < num_submaps ) ++submap_idx;
if (submap_idx > 0) {
submap_offset = tid - prefix_sum[submap_idx - 1];
}
auto const ¤t_slot = submap_views[submap_idx].get_slots()[submap_offset];
Key const existing_key = current_slot.first.load(cuda::std::memory_order_relaxed);
auto const is_filled =
not(cuco::detail::bitwise_compare(existing_key, empty_key_sentinel) or
cuco::detail::bitwise_compare(existing_key, erased_key_sentinel));
unsigned mask = __ballot_sync(0xffffffff, is_filled);
int lane = threadIdx.x & 0x1f;
int warp_prefix = __popc(mask & ((1u << lane) - 1));
if (is_filled) local_idx = warp_prefix;
unsigned int warp_vote = __popc(mask);
unsigned int warp_base = 0;
if (lane == 0 && warp_vote) {
warp_base = atomicAdd_block(&block_count, warp_vote);
}
warp_base = __shfl_sync(0xffffffff, warp_base, 0);
__syncthreads();
if (threadIdx.x == 0) {
block_base = atomicAdd(d_num_out, block_count);
}
__syncthreads();
if (is_filled) {
auto value = current_slot.second.load(cuda::std::memory_order_relaxed);
keys_out[block_base + warp_base + local_idx] = existing_key;
values_out[block_base + warp_base + local_idx] = value;
}
}
}
// 类似的,但是用cooperative group实现,实测还是tile_size=32最快,和warp没区别
// 写起来更modern一点
template <uint32_t block_size,
uint32_t tile_size,
typename OutputIt,
typename viewT,
typename PrefixT,
typename Key>
CUCO_KERNEL void retrieve_all(OutputIt keys_out,
OutputIt values_out,
viewT* submap_views,
uint32_t num_submaps,
uint64_t capacity,
unsigned long long* d_num_out,
PrefixT* prefix_sum,
Key empty_key_sentinel,
Key erased_key_sentinel)
{
auto tile = cg::tiled_partition<tile_size>(cg::this_thread_block());
auto block = cg::this_thread_block();
auto tid = blockDim.x * blockIdx.x + threadIdx.x;
auto stride = blockDim.x * gridDim.x;
__shared__ unsigned int block_count;
__shared__ unsigned int block_base;
if (threadIdx.x == 0) {
block_count = 0;
block_base = 0;
}
block.sync();
for (; tid < capacity; tid += stride) {
uint32_t submap_idx = 0;
uint32_t submap_offset = tid;
while (tid >= prefix_sum[submap_idx] && submap_idx < num_submaps ) ++submap_idx;
if (submap_idx > 0) {
submap_offset = tid - prefix_sum[submap_idx - 1];
}
auto const ¤t_slot = submap_views[submap_idx].get_slots()[submap_offset];
Key const existing_key = current_slot.first.load(cuda::std::memory_order_relaxed);
auto const is_filled =
not(cuco::detail::bitwise_compare(existing_key, empty_key_sentinel) or
cuco::detail::bitwise_compare(existing_key, erased_key_sentinel));
unsigned int tile_mask = tile.ballot(is_filled);
unsigned int tile_rank = tile.thread_rank();
unsigned int tile_vote = __popc(tile_mask);
unsigned int tile_prefix = __popc(tile_mask & ((1u << tile_rank) - 1));
unsigned int tile_base = 0;
if (tile_rank == 0 && tile_mask) {
tile_base = atomicAdd_block(&block_count, tile_vote);
}
tile_base = tile.shfl(tile_base, 0);
block.sync();
if (block.thread_rank() == 0) {
block_base = atomicAdd(d_num_out, block_count);
}
block.sync();
if (is_filled) {
auto value = current_slot.second.load(cuda::std::memory_order_relaxed);
keys_out[block_base + tile_base + tile_prefix] = existing_key;
values_out[block_base + tile_base + tile_prefix] = value;
}
}
}
实测2/3速度差不多,在插入1亿数据后(实际总cap达到2亿),<key, value>
都是cuda::atomic<int64_t>
的情况下,retrieve_all cost 3ms左右;