cuCollections
still draft, to be updated 实现了一个dynamic_map的retrieve_all 访存算子,经典过滤问题,predict为true的元素拷贝到out数组,重点是需要维护一个 atomic 的out index, https://developer.nvidia.com/blog/cuda-pro-tip-optimized-filtering-warp-aggregated-atomics/, 翻译:https://zhuanlan.zhihu.com/p/581078557 host端: template <typename Key, typename Value, cuda::thread_scope Scope, typename Allocator> template <typename KeyOut, typename ValueOut> std::pair<KeyOut, ValueOut> dynamic_map<Key, Value, Scope, Allocator>::retrieve_all( KeyOut keys_out, ValueOut values_out, cudaStream_t stream) const { auto constexpr block_size = 128; auto constexpr stride = 1; auto const capacity = get_capacity(); auto grid_size = (capacity + stride * block_size - 1) / (stride * block_size); std::vector<size_t> submap_cap_prefix(submaps_.size()); std::inclusive_scan(submaps_.begin(), submaps_.end(), submap_cap_prefix.begin(), [](auto const& sum, auto const& submap) { return sum + submap->get_capacity(); }, (size_t)0); thrust::device_vector<size_t> submap_cap_prefix_d(submap_cap_prefix); // 复用alloc_(用于slots_的alloc_)会比直接cudaMalloc快一个数量级,不需要重新分配内存 // 单纯cudaMalloc会触发GPU driver/runtime 的 allocation 初始化、页表建立等 using temp_allocator_type = typename std::allocator_traits<Allocator>::template rebind_alloc<char>; auto temp_allocator = temp_allocator_type{alloc_}; auto d_num_out = reinterpret_cast<unsigned long long*>( std::allocator_traits<temp_allocator_type>::allocate(temp_allocator, sizeof(unsigned long long))); CUCO_CUDA_TRY(cudaMemsetAsync(d_num_out, 0, sizeof(unsigned long long), stream)); detail::retrieve_all<block_size><<<grid_size, block_size, 0, stream>>>( keys_out, values_out, submap_views_.data().get(), submaps_.size(), capacity, d_num_out, submap_cap_prefix_d.data().get(), empty_key_sentinel_, erased_key_sentinel_); size_t h_num_out; CUCO_CUDA_TRY( cudaMemcpyAsync(&h_num_out, d_num_out, sizeof(size_t), cudaMemcpyDeviceToHost, stream)); CUCO_CUDA_TRY(cudaStreamSynchronize(stream)); CUCO_CUDA_TRY(cudaFree(d_num_out)) return {keys_out + h_num_out, values_out + h_num_out}; } naive实现,一个全局atomic template <uint32_t block_size, typename OutputIt, typename viewT, typename PrefixT, typename Key> CUCO_KERNEL void retrieve_all(OutputIt keys_out, OutputIt values_out, viewT* submap_views, uint32_t num_submaps, uint64_t capacity, unsigned long long* d_num_out, PrefixT* prefix_sum, Key empty_key_sentinel, Key erased_key_sentinel) { auto tid = blockDim.x * blockIdx.x + threadIdx.x; auto stride = blockDim.x * gridDim.x; for (; tid < capacity; tid += stride) { uint32_t submap_idx = 0; uint32_t submap_offset = tid; // prefix_sum长度一般就10以内,不需要二分之类的操作 while (tid >= prefix_sum[submap_idx] && submap_idx < num_submaps ) ++submap_idx; if (submap_idx > 0) { submap_offset = tid - prefix_sum[submap_idx - 1]; } auto const ¤t_slot = submap_views[submap_idx].get_slots()[submap_offset]; Key const existing_key = current_slot.first.load(cuda::std::memory_order_relaxed); auto const is_filled = not(cuco::detail::bitwise_compare(existing_key, empty_key_sentinel) or cuco::detail::bitwise_compare(existing_key, erased_key_sentinel)); if (is_filled) { auto idx = atomicAdd(d_num_out, static_cast<unsigned long long>(1)); auto value = current_slot.second.load(cuda::std::memory_order_relaxed); keys_out[idx] = existing_key; values_out[idx] = value; } } } block内atomicAdd + 全局atomicAdd // 一个block内用一个__shared__ local_count表示这个block中predict为true的数量 // local_pos表示当前线程在该block内第几个predict为true template <uint32_t block_size, typename OutputIt, typename viewT, typename PrefixT, typename Key> CUCO_KERNEL void retrieve_all(OutputIt keys_out, OutputIt values_out, viewT* submap_views, uint32_t num_submaps, uint64_t capacity, unsigned long long* d_num_out, PrefixT* prefix_sum, Key empty_key_sentinel, Key erased_key_sentinel) { auto tid = blockDim.x * blockIdx.x + threadIdx.x; auto stride = blockDim.x * gridDim.x; __shared__ unsigned int local_count; if (threadIdx.x == 0) { local_count = 0; } __syncthreads(); for (; tid < capacity; tid += stride) { uint32_t submap_idx = 0; uint32_t submap_offset = tid; while (tid >= prefix_sum[submap_idx] && submap_idx < num_submaps ) ++submap_idx; if (submap_idx > 0) { submap_offset = tid - prefix_sum[submap_idx - 1]; } auto const ¤t_slot = submap_views[submap_idx].get_slots()[submap_offset]; Key const existing_key = current_slot.first.load(cuda::std::memory_order_relaxed); auto const is_filled = not(cuco::detail::bitwise_compare(existing_key, empty_key_sentinel) or cuco::detail::bitwise_compare(existing_key, erased_key_sentinel)); unsigned int local_pos = 0; if (is_filled) { local_pos = atomicAdd_block(&local_count, 1); } __syncthreads(); if (threadIdx.x == 0) { local_count = atomicAdd(d_num_out, local_count); } __syncthreads(); if (is_filled) { auto value = current_slot.second.load(cuda::std::memory_order_relaxed); keys_out[local_count + local_pos] = existing_key; values_out[local_count + local_pos] = value; } } } // 类似原理,但是用cub::BlockScan实现 template <uint32_t block_size, typename OutputIt, typename viewT, typename PrefixT, typename Key> CUCO_KERNEL void retrieve_all(OutputIt keys_out, OutputIt values_out, viewT* submap_views, uint32_t num_submaps, uint64_t capacity, unsigned long long* d_num_out, PrefixT* prefix_sum, Key empty_key_sentinel, Key erased_key_sentinel) { using BlockScan = cub::BlockScan<unsigned int, block_size>; // Shared memory __shared__ typename BlockScan::TempStorage scan_temp_storage; __shared__ unsigned int block_base; auto tid = blockDim.x * blockIdx.x + threadIdx.x; auto stride = blockDim.x * gridDim.x; for (; tid < capacity; tid += stride) { // Compute submap index and offset uint32_t submap_idx = 0; uint32_t submap_offset = tid; while (tid >= prefix_sum[submap_idx] && submap_idx < num_submaps) ++submap_idx; if (submap_idx > 0) { submap_offset = tid - prefix_sum[submap_idx - 1]; } auto const& current_slot = submap_views[submap_idx].get_slots()[submap_offset]; Key const existing_key = current_slot.first.load(cuda::std::memory_order_relaxed); // Check key validity bool is_filled = not(cuco::detail::bitwise_compare(existing_key, empty_key_sentinel) || cuco::detail::bitwise_compare(existing_key, erased_key_sentinel)); // Perform block-wide exclusive scan to compute local write index unsigned int local_idx = 0; unsigned int total_valid = 0; BlockScan(scan_temp_storage).ExclusiveSum(is_filled ? 1u : 0u, local_idx, total_valid); // Block leader calculates global offset if (threadIdx.x == 0) { block_base = atomicAdd(d_num_out, total_valid); } __syncthreads(); if (is_filled) { auto value = current_slot.second.load(cuda::std::memory_order_relaxed); keys_out[block_base + local_idx] = existing_key; values_out[block_base + local_idx] = value; } } } 3. warp-aggregated atomics: warp(or cooperative group)粒度atomicAdd + block内atomicAdd+全局atomicAdd ```cpp template <uint32_t block_size, typename OutputIt, typename viewT, typename PrefixT, typename Key> CUCO_KERNEL void retrieve_all(OutputIt keys_out, OutputIt values_out, viewT* submap_views, uint32_t num_submaps, uint64_t capacity, unsigned long long* d_num_out, PrefixT* prefix_sum, Key empty_key_sentinel, Key erased_key_sentinel) { auto tid = blockDim.x * blockIdx.x + threadIdx.x; auto stride = blockDim.x * gridDim.x; __shared__ unsigned int block_count; __shared__ unsigned int block_base; if (threadIdx.x == 0) { block_count = 0; block_base = 0; } __syncthreads(); unsigned int local_idx = 0; for (; tid < capacity; tid += stride) { uint32_t submap_idx = 0; uint32_t submap_offset = tid; while (tid >= prefix_sum[submap_idx] && submap_idx < num_submaps ) ++submap_idx; if (submap_idx > 0) { submap_offset = tid - prefix_sum[submap_idx - 1]; } auto const ¤t_slot = submap_views[submap_idx].get_slots()[submap_offset]; Key const existing_key = current_slot.first.load(cuda::std::memory_order_relaxed); auto const is_filled = not(cuco::detail::bitwise_compare(existing_key, empty_key_sentinel) or cuco::detail::bitwise_compare(existing_key, erased_key_sentinel)); unsigned mask = __ballot_sync(0xffffffff, is_filled); int lane = threadIdx.x & 0x1f; int warp_prefix = __popc(mask & ((1u << lane) - 1)); if (is_filled) local_idx = warp_prefix; unsigned int warp_vote = __popc(mask); unsigned int warp_base = 0; if (lane == 0 && warp_vote) { warp_base = atomicAdd_block(&block_count, warp_vote); } warp_base = __shfl_sync(0xffffffff, warp_base, 0); __syncthreads(); if (threadIdx.x == 0) { block_base = atomicAdd(d_num_out, block_count); } __syncthreads(); if (is_filled) { auto value = current_slot.second.load(cuda::std::memory_order_relaxed); keys_out[block_base + warp_base + local_idx] = existing_key; values_out[block_base + warp_base + local_idx] = value; } } } // 类似的,但是用cooperative group实现,实测还是tile_size=32最快,和warp没区别 // 写起来更modern一点 template <uint32_t block_size, uint32_t tile_size, typename OutputIt, typename viewT, typename PrefixT, typename Key> CUCO_KERNEL void retrieve_all(OutputIt keys_out, OutputIt values_out, viewT* submap_views, uint32_t num_submaps, uint64_t capacity, unsigned long long* d_num_out, PrefixT* prefix_sum, Key empty_key_sentinel, Key erased_key_sentinel) { auto tile = cg::tiled_partition<tile_size>(cg::this_thread_block()); auto block = cg::this_thread_block(); auto tid = blockDim.x * blockIdx.x + threadIdx.x; auto stride = blockDim.x * gridDim.x; __shared__ unsigned int block_count; __shared__ unsigned int block_base; if (threadIdx.x == 0) { block_count = 0; block_base = 0; } block.sync(); for (; tid < capacity; tid += stride) { uint32_t submap_idx = 0; uint32_t submap_offset = tid; while (tid >= prefix_sum[submap_idx] && submap_idx < num_submaps ) ++submap_idx; if (submap_idx > 0) { submap_offset = tid - prefix_sum[submap_idx - 1]; } auto const ¤t_slot = submap_views[submap_idx].get_slots()[submap_offset]; Key const existing_key = current_slot.first.load(cuda::std::memory_order_relaxed); auto const is_filled = not(cuco::detail::bitwise_compare(existing_key, empty_key_sentinel) or cuco::detail::bitwise_compare(existing_key, erased_key_sentinel)); unsigned int tile_mask = tile.ballot(is_filled); unsigned int tile_rank = tile.thread_rank(); unsigned int tile_vote = __popc(tile_mask); unsigned int tile_prefix = __popc(tile_mask & ((1u << tile_rank) - 1)); unsigned int tile_base = 0; if (tile_rank == 0 && tile_mask) { tile_base = atomicAdd_block(&block_count, tile_vote); } tile_base = tile.shfl(tile_base, 0); block.sync(); if (block.thread_rank() == 0) { block_base = atomicAdd(d_num_out, block_count); } block.sync(); if (is_filled) { auto value = current_slot.second.load(cuda::std::memory_order_relaxed); keys_out[block_base + tile_base + tile_prefix] = existing_key; values_out[block_base + tile_base + tile_prefix] = value; } } } 实测2/3速度差不多,在插入1亿数据后(实际总cap达到2亿),<key, value>都是cuda::atomic<int64_t>的情况下,retrieve_all cost 3ms左右; ...