diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 77f9959fdbe..ec6cada1cda 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -393,7 +393,6 @@ if(ARROW_COMPUTE) compute/exec/key_encode.cc compute/exec/key_hash.cc compute/exec/key_map.cc - compute/exec/options.cc compute/exec/order_by_impl.cc compute/exec/partition_util.cc compute/exec/options.cc diff --git a/cpp/src/arrow/compute/exec/bloom_filter.cc b/cpp/src/arrow/compute/exec/bloom_filter.cc index 61031725457..7b348ff687c 100644 --- a/cpp/src/arrow/compute/exec/bloom_filter.cc +++ b/cpp/src/arrow/compute/exec/bloom_filter.cc @@ -311,21 +311,22 @@ Status BloomFilterBuilder_SingleThreaded::Begin(size_t /*num_threads*/, } Status BloomFilterBuilder_SingleThreaded::PushNextBatch(size_t /*thread_index*/, - int num_rows, + int64_t num_rows, const uint32_t* hashes) { PushNextBatchImp(num_rows, hashes); return Status::OK(); } Status BloomFilterBuilder_SingleThreaded::PushNextBatch(size_t /*thread_index*/, - int num_rows, + int64_t num_rows, const uint64_t* hashes) { PushNextBatchImp(num_rows, hashes); return Status::OK(); } template -void BloomFilterBuilder_SingleThreaded::PushNextBatchImp(int num_rows, const T* hashes) { +void BloomFilterBuilder_SingleThreaded::PushNextBatchImp(int64_t num_rows, + const T* hashes) { build_target_->Insert(hardware_flags_, num_rows, hashes); } @@ -340,29 +341,40 @@ Status BloomFilterBuilder_Parallel::Begin(size_t num_threads, int64_t hardware_f log_num_prtns_ = std::min(kMaxLogNumPrtns, bit_util::Log2(num_threads)); thread_local_states_.resize(num_threads); - prtn_locks_.Init(1 << log_num_prtns_); + prtn_locks_.Init(num_threads, 1 << log_num_prtns_); RETURN_NOT_OK(build_target->CreateEmpty(num_rows, pool)); return Status::OK(); } -Status BloomFilterBuilder_Parallel::PushNextBatch(size_t thread_id, int num_rows, +Status BloomFilterBuilder_Parallel::PushNextBatch(size_t thread_id, int64_t num_rows, const uint32_t* hashes) { PushNextBatchImp(thread_id, num_rows, hashes); return Status::OK(); } -Status BloomFilterBuilder_Parallel::PushNextBatch(size_t thread_id, int num_rows, +Status BloomFilterBuilder_Parallel::PushNextBatch(size_t thread_id, int64_t num_rows, const uint64_t* hashes) { PushNextBatchImp(thread_id, num_rows, hashes); return Status::OK(); } template -void BloomFilterBuilder_Parallel::PushNextBatchImp(size_t thread_id, int num_rows, +void BloomFilterBuilder_Parallel::PushNextBatchImp(size_t thread_id, int64_t num_rows, const T* hashes) { - int num_prtns = 1 << log_num_prtns_; + // Partition IDs are calculated using the higher bits of the block ID. This + // ensures that each block is contained entirely within a partition and prevents + // concurrent access to a block. + constexpr int kLogBlocksKeptTogether = 7; + constexpr int kPrtnIdBitOffset = + BloomFilterMasks::kLogNumMasks + 6 + kLogBlocksKeptTogether; + + const int log_num_prtns_max = + std::max(0, build_target_->log_num_blocks() - kLogBlocksKeptTogether); + const int log_num_prtns_mod = std::min(log_num_prtns_, log_num_prtns_max); + int num_prtns = 1 << log_num_prtns_mod; + ThreadLocalState& local_state = thread_local_states_[thread_id]; local_state.partition_ranges.resize(num_prtns + 1); local_state.partitioned_hashes_64.resize(num_rows); @@ -373,13 +385,10 @@ void BloomFilterBuilder_Parallel::PushNextBatchImp(size_t thread_id, int num_row PartitionSort::Eval( num_rows, num_prtns, partition_ranges, - [hashes, num_prtns](int row_id) { - constexpr int kLogBlocksKeptTogether = 7; - constexpr int kPrtnIdBitOffset = - BloomFilterMasks::kLogNumMasks + 6 + kLogBlocksKeptTogether; + [=](int64_t row_id) { return (hashes[row_id] >> (kPrtnIdBitOffset)) & (num_prtns - 1); }, - [hashes, partitioned_hashes](int row_id, int output_pos) { + [=](int64_t row_id, int output_pos) { partitioned_hashes[output_pos] = hashes[row_id]; }); @@ -393,7 +402,7 @@ void BloomFilterBuilder_Parallel::PushNextBatchImp(size_t thread_id, int num_row while (num_unprocessed_partitions > 0) { int locked_prtn_id; int locked_prtn_id_pos; - prtn_locks_.AcquirePartitionLock(num_unprocessed_partitions, + prtn_locks_.AcquirePartitionLock(thread_id, num_unprocessed_partitions, unprocessed_partition_ids, /*limit_retries=*/false, /*max_retries=*/-1, &locked_prtn_id, &locked_prtn_id_pos); diff --git a/cpp/src/arrow/compute/exec/bloom_filter.h b/cpp/src/arrow/compute/exec/bloom_filter.h index b89343373ae..06920c6c14f 100644 --- a/cpp/src/arrow/compute/exec/bloom_filter.h +++ b/cpp/src/arrow/compute/exec/bloom_filter.h @@ -261,49 +261,51 @@ class ARROW_EXPORT BloomFilterBuilder { int64_t num_rows, int64_t num_batches, BlockedBloomFilter* build_target) = 0; virtual int64_t num_tasks() const { return 0; } - virtual Status PushNextBatch(size_t thread_index, int num_rows, + virtual Status PushNextBatch(size_t thread_index, int64_t num_rows, const uint32_t* hashes) = 0; - virtual Status PushNextBatch(size_t thread_index, int num_rows, + virtual Status PushNextBatch(size_t thread_index, int64_t num_rows, const uint64_t* hashes) = 0; virtual void CleanUp() {} static std::unique_ptr Make(BloomFilterBuildStrategy strategy); }; -class BloomFilterBuilder_SingleThreaded : public BloomFilterBuilder { +class ARROW_EXPORT BloomFilterBuilder_SingleThreaded : public BloomFilterBuilder { public: Status Begin(size_t num_threads, int64_t hardware_flags, MemoryPool* pool, int64_t num_rows, int64_t num_batches, BlockedBloomFilter* build_target) override; - Status PushNextBatch(size_t /*thread_index*/, int num_rows, + Status PushNextBatch(size_t /*thread_index*/, int64_t num_rows, const uint32_t* hashes) override; - Status PushNextBatch(size_t /*thread_index*/, int num_rows, + Status PushNextBatch(size_t /*thread_index*/, int64_t num_rows, const uint64_t* hashes) override; private: template - void PushNextBatchImp(int num_rows, const T* hashes); + void PushNextBatchImp(int64_t num_rows, const T* hashes); int64_t hardware_flags_; BlockedBloomFilter* build_target_; }; -class BloomFilterBuilder_Parallel : public BloomFilterBuilder { +class ARROW_EXPORT BloomFilterBuilder_Parallel : public BloomFilterBuilder { public: Status Begin(size_t num_threads, int64_t hardware_flags, MemoryPool* pool, int64_t num_rows, int64_t num_batches, BlockedBloomFilter* build_target) override; - Status PushNextBatch(size_t thread_id, int num_rows, const uint32_t* hashes) override; + Status PushNextBatch(size_t thread_id, int64_t num_rows, + const uint32_t* hashes) override; - Status PushNextBatch(size_t thread_id, int num_rows, const uint64_t* hashes) override; + Status PushNextBatch(size_t thread_id, int64_t num_rows, + const uint64_t* hashes) override; void CleanUp() override; private: template - void PushNextBatchImp(size_t thread_id, int num_rows, const T* hashes); + void PushNextBatchImp(size_t thread_id, int64_t num_rows, const T* hashes); int64_t hardware_flags_; BlockedBloomFilter* build_target_; diff --git a/cpp/src/arrow/compute/exec/bloom_filter_test.cc b/cpp/src/arrow/compute/exec/bloom_filter_test.cc index a3b5cded153..5e1dd0218cf 100644 --- a/cpp/src/arrow/compute/exec/bloom_filter_test.cc +++ b/cpp/src/arrow/compute/exec/bloom_filter_test.cc @@ -24,6 +24,7 @@ #include #include "arrow/compute/exec/bloom_filter.h" #include "arrow/compute/exec/key_hash.h" +#include "arrow/compute/exec/task_util.h" #include "arrow/compute/exec/test_util.h" #include "arrow/compute/exec/util.h" #include "arrow/util/bitmap_ops.h" @@ -32,39 +33,106 @@ namespace arrow { namespace compute { -Status BuildBloomFilter(BloomFilterBuildStrategy strategy, int64_t hardware_flags, - MemoryPool* pool, int64_t num_rows, - std::function get_hash32_impl, - std::function get_hash64_impl, - BlockedBloomFilter* target) { - constexpr int batch_size_max = 32 * 1024; - int64_t num_batches = bit_util::CeilDiv(num_rows, batch_size_max); - - auto builder = BloomFilterBuilder::Make(strategy); - - std::vector thread_local_hashes32; - std::vector thread_local_hashes64; - thread_local_hashes32.resize(batch_size_max); - thread_local_hashes64.resize(batch_size_max); - - RETURN_NOT_OK(builder->Begin(/*num_threads=*/1, hardware_flags, pool, num_rows, - bit_util::CeilDiv(num_rows, batch_size_max), target)); - - for (int64_t i = 0; i < num_batches; ++i) { +constexpr int kBatchSizeMax = 32 * 1024; +Status BuildBloomFilter_Serial( + std::unique_ptr& builder, int64_t num_rows, int64_t num_batches, + std::function get_hash32_impl, + std::function get_hash64_impl, + BlockedBloomFilter* target) { + std::vector hashes32(kBatchSizeMax); + std::vector hashes64(kBatchSizeMax); + for (int64_t i = 0; i < num_batches; i++) { size_t thread_index = 0; int batch_size = static_cast( - std::min(num_rows - i * batch_size_max, static_cast(batch_size_max))); + std::min(num_rows - i * kBatchSizeMax, static_cast(kBatchSizeMax))); if (target->NumHashBitsUsed() > 32) { - uint64_t* hashes = thread_local_hashes64.data(); - get_hash64_impl(i * batch_size_max, batch_size, hashes); + uint64_t* hashes = hashes64.data(); + get_hash64_impl(i * kBatchSizeMax, batch_size, hashes); RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes)); } else { - uint32_t* hashes = thread_local_hashes32.data(); - get_hash32_impl(i * batch_size_max, batch_size, hashes); + uint32_t* hashes = hashes32.data(); + get_hash32_impl(i * kBatchSizeMax, batch_size, hashes); RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes)); } } + return Status::OK(); +} + +Status BuildBloomFilter_Parallel( + std::unique_ptr& builder, size_t num_threads, int64_t num_rows, + int64_t num_batches, std::function get_hash32_impl, + std::function get_hash64_impl, + BlockedBloomFilter* target) { + ThreadIndexer thread_indexer; + std::unique_ptr scheduler = TaskScheduler::Make(); + std::vector> thread_local_hashes32(num_threads); + std::vector> thread_local_hashes64(num_threads); + for (std::vector& h : thread_local_hashes32) h.resize(kBatchSizeMax); + for (std::vector& h : thread_local_hashes64) h.resize(kBatchSizeMax); + + std::condition_variable cv; + std::mutex mutex; + auto group = scheduler->RegisterTaskGroup( + [&](size_t thread_index, int64_t task_id) -> Status { + int batch_size = static_cast(std::min(num_rows - task_id * kBatchSizeMax, + static_cast(kBatchSizeMax))); + if (target->NumHashBitsUsed() > 32) { + uint64_t* hashes = thread_local_hashes64[thread_index].data(); + get_hash64_impl(task_id * kBatchSizeMax, batch_size, hashes); + RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes)); + } else { + uint32_t* hashes = thread_local_hashes32[thread_index].data(); + get_hash32_impl(task_id * kBatchSizeMax, batch_size, hashes); + RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes)); + } + return Status::OK(); + }, + [&](size_t thread_index) -> Status { + std::unique_lock lk(mutex); + cv.notify_all(); + return Status::OK(); + }); + scheduler->RegisterEnd(); + auto tp = arrow::internal::GetCpuThreadPool(); + RETURN_NOT_OK(scheduler->StartScheduling( + 0, + [&](std::function func) -> Status { + return tp->Spawn([&, func]() { + size_t tid = thread_indexer(); + ARROW_DCHECK_OK(func(tid)); + }); + }, + static_cast(num_threads), false)); + { + std::unique_lock lk(mutex); + RETURN_NOT_OK(scheduler->StartTaskGroup(0, group, num_batches)); + cv.wait(lk); + } + return Status::OK(); +} + +Status BuildBloomFilter(BloomFilterBuildStrategy strategy, int64_t hardware_flags, + MemoryPool* pool, int64_t num_rows, + std::function get_hash32_impl, + std::function get_hash64_impl, + BlockedBloomFilter* target) { + int64_t num_batches = bit_util::CeilDiv(num_rows, kBatchSizeMax); + + bool is_serial = strategy == BloomFilterBuildStrategy::SINGLE_THREADED; + auto builder = BloomFilterBuilder::Make(strategy); + size_t num_threads = is_serial ? 1 : arrow::GetCpuThreadPoolCapacity(); + RETURN_NOT_OK(builder->Begin(num_threads, hardware_flags, pool, num_rows, + bit_util::CeilDiv(num_rows, kBatchSizeMax), target)); + + if (is_serial) + RETURN_NOT_OK(BuildBloomFilter_Serial(builder, num_rows, num_batches, + std::move(get_hash32_impl), + std::move(get_hash64_impl), target)); + else + RETURN_NOT_OK(BuildBloomFilter_Parallel(builder, num_threads, num_rows, num_batches, + std::move(get_hash32_impl), + std::move(get_hash64_impl), target)); builder->CleanUp(); return Status::OK(); diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index b7a9c7e1bb0..bb197f8db84 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -98,6 +98,9 @@ struct ExecPlanImpl : public ExecPlan { // producers precede consumers sorted_nodes_ = TopoSort(); + for (ExecNode* node : sorted_nodes_) { + RETURN_NOT_OK(node->PrepareToProduce()); + } std::vector> futures; diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index be2f23ad24b..dcf271bd360 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -212,6 +212,16 @@ class ARROW_EXPORT ExecNode { // A node with multiple outputs will also need to ensure it is applying backpressure if // any of its outputs is asking to pause + /// \brief Perform any needed initialization + /// + /// This hook performs any actions in between creation of ExecPlan and the call to + /// StartProducing. An example could be Bloom filter pushdown. The order of ExecNodes + /// that executes this method is undefined, but the calls are made synchronously. + /// + /// At this point a node can rely on all inputs & outputs (and the input schemas) + /// being well defined. + virtual Status PrepareToProduce() { return Status::OK(); } + /// \brief Start producing /// /// This must only be called once. If this fails, then other lifecycle diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc index 3207bb96984..15a006c81d5 100644 --- a/cpp/src/arrow/compute/exec/hash_join.cc +++ b/cpp/src/arrow/compute/exec/hash_join.cc @@ -26,6 +26,7 @@ #include #include "arrow/compute/exec/hash_join_dict.h" +#include "arrow/compute/exec/key_hash.h" #include "arrow/compute/exec/task_util.h" #include "arrow/compute/kernels/row_encoder.h" @@ -39,13 +40,16 @@ class HashJoinBasicImpl : public HashJoinImpl { struct ThreadLocalState; public: + HashJoinBasicImpl() : num_expected_bloom_filters_(0) {} + Status InputReceived(size_t thread_index, int side, ExecBatch batch) override { if (cancelled_) { return Status::Cancelled("Hash join cancelled"); } EVENT(span_, "InputReceived"); - if (QueueBatchIfNeeded(side, batch)) { + ARROW_ASSIGN_OR_RAISE(bool queued, QueueBatchIfNeeded(thread_index, side, batch)); + if (queued) { return Status::OK(); } else { ARROW_DCHECK(side == 0); @@ -62,7 +66,7 @@ class HashJoinBasicImpl : public HashJoinImpl { bool proceed; { std::lock_guard lock(finished_mutex_); - proceed = !left_side_finished_ && left_queue_finished_; + proceed = !left_side_finished_ && left_queue_probe_finished_; left_side_finished_ = true; } if (proceed) { @@ -83,50 +87,70 @@ class HashJoinBasicImpl : public HashJoinImpl { } Status Init(ExecContext* ctx, JoinType join_type, bool use_sync_execution, - size_t num_threads, HashJoinSchema* schema_mgr, + size_t /*num_threads*/, HashJoinSchema* schema_mgr, std::vector key_cmp, Expression filter, OutputBatchCallback output_batch_callback, FinishedCallback finished_callback, - TaskScheduler::ScheduleImpl schedule_task_callback) override { - num_threads = std::max(num_threads, static_cast(1)); + TaskScheduler::ScheduleImpl schedule_task_callback, + HashJoinImpl* pushdown_target, std::vector column_map) override { + // TODO(ARROW-15732) + // Each side of join might have an IO thread being called from. + // As of right now, we ignore the `num_threads` argument, so later we will have to + // re-add `num_threads_ = num_threads;` + num_threads_ = GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1; START_COMPUTE_SPAN(span_, "HashJoinBasicImpl", {{"detail", filter.ToString()}, {"join.kind", ToString(join_type)}, - {"join.threads", static_cast(num_threads)}}); + {"join.threads", static_cast(num_threads_)}}); ctx_ = ctx; join_type_ = join_type; - num_threads_ = num_threads; schema_mgr_ = schema_mgr; key_cmp_ = std::move(key_cmp); filter_ = std::move(filter); output_batch_callback_ = std::move(output_batch_callback); finished_callback_ = std::move(finished_callback); - // TODO(ARROW-15732) - // Each side of join might have an IO thread being called from. - local_states_.resize(GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1); + local_states_.resize(num_threads_); for (size_t i = 0; i < local_states_.size(); ++i) { local_states_[i].is_initialized = false; local_states_[i].is_has_match_initialized = false; } - dict_probe_.Init(GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1); + dict_probe_.Init(num_threads_); + + pushdown_target_ = pushdown_target; + column_map_ = std::move(column_map); + if (pushdown_target_) pushdown_target_->ExpectBloomFilter(); + + right_input_row_count_ = 0; has_hash_table_ = false; num_batches_produced_.store(0); cancelled_ = false; right_side_finished_ = false; left_side_finished_ = false; - left_queue_finished_ = false; + bloom_filters_ready_ = false; + left_queue_bloom_finished_ = false; + left_queue_probe_finished_ = false; scheduler_ = TaskScheduler::Make(); + if (pushdown_target_) { + bloom_filter_ = arrow::internal::make_unique(); + bloom_filter_builder_ = BloomFilterBuilder::Make( + use_sync_execution ? BloomFilterBuildStrategy::SINGLE_THREADED + : BloomFilterBuildStrategy::PARALLEL); + } + + RegisterBuildBloomFilter(); RegisterBuildHashTable(); + RegisterBloomFilterQueuedBatches(); RegisterProbeQueuedBatches(); RegisterScanHashTable(); scheduler_->RegisterEnd(); + RETURN_NOT_OK(scheduler_->StartScheduling( 0 /*thread index*/, std::move(schedule_task_callback), - static_cast(2 * num_threads) /*concurrent tasks*/, use_sync_execution)); + static_cast(2 * num_threads_) /*concurrent tasks*/, use_sync_execution)); return Status::OK(); } @@ -138,6 +162,32 @@ class HashJoinBasicImpl : public HashJoinImpl { scheduler_->Abort(std::move(pos_abort_callback)); } + // Called by a downstream node after they have constructed a bloom filter + // that this node can use to filter inputs. + Status PushBloomFilter(size_t thread_index, std::unique_ptr filter, + std::vector column_map) override { + bool proceed; + { + std::lock_guard lock_bloom(bloom_filters_mutex_); + pushed_bloom_filters_.emplace_back(std::move(filter)); + bloom_filter_column_maps_.emplace_back(std::move(column_map)); + proceed = pushed_bloom_filters_.size() == num_expected_bloom_filters_; + ARROW_DCHECK(pushed_bloom_filters_.size() <= num_expected_bloom_filters_); + } + if (proceed) { + size_t num_batches; + { + std::lock_guard lock(left_batches_mutex_); + num_batches = left_batches_.size(); + bloom_filters_ready_ = true; + } + RETURN_NOT_OK(BloomFilterQueuedBatches(thread_index, num_batches)); + } + return Status::OK(); + } + + void ExpectBloomFilter() override { num_expected_bloom_filters_ += 1; } + private: void InitEncoder(int side, HashJoinProjection projection_handle, RowEncoder* encoder) { std::vector data_types; @@ -152,7 +202,7 @@ class HashJoinBasicImpl : public HashJoinImpl { encoder->Clear(); } - void InitLocalStateIfNeeded(size_t thread_index) { + Status InitLocalStateIfNeeded(size_t thread_index) { DCHECK_LT(thread_index, local_states_.size()); ThreadLocalState& local_state = local_states_[thread_index]; if (!local_state.is_initialized) { @@ -162,9 +212,11 @@ class HashJoinBasicImpl : public HashJoinImpl { if (has_payload) { InitEncoder(0, HashJoinProjection::PAYLOAD, &local_state.exec_batch_payloads); } - + RETURN_NOT_OK(local_state.temp_stack.Init( + ctx_->memory_pool(), 4 * util::MiniBatch::kMiniBatchLength * sizeof(uint32_t))); local_state.is_initialized = true; } + return Status::OK(); } Status EncodeBatch(int side, HashJoinProjection projection_handle, RowEncoder* encoder, @@ -432,7 +484,7 @@ class HashJoinBasicImpl : public HashJoinImpl { (schema_mgr_->proj_maps[1].num_cols(HashJoinProjection::PAYLOAD) > 0); ThreadLocalState& local_state = local_states_[thread_index]; - InitLocalStateIfNeeded(thread_index); + RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index)); ExecBatch left_key; ExecBatch left_payload; @@ -544,7 +596,7 @@ class HashJoinBasicImpl : public HashJoinImpl { Status ProbeBatch(size_t thread_index, const ExecBatch& batch) { ThreadLocalState& local_state = local_states_[thread_index]; - InitLocalStateIfNeeded(thread_index); + RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index)); local_state.exec_batch_keys.Clear(); @@ -604,8 +656,84 @@ class HashJoinBasicImpl : public HashJoinImpl { return Status::OK(); } + Status ApplyBloomFiltersToBatch(size_t thread_index, ExecBatch& batch) { + if (batch.length == 0) return Status::OK(); + int64_t bit_vector_bytes = bit_util::BytesForBits(batch.length); + std::vector selected(bit_vector_bytes); + std::vector hashes(batch.length); + std::vector bv(bit_vector_bytes); + + RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index)); + // Start with full selection for the current batch + memset(selected.data(), 0xff, bit_vector_bytes); + for (size_t ifilter = 0; ifilter < num_expected_bloom_filters_; ifilter++) { + std::vector keys(bloom_filter_column_maps_[ifilter].size()); + for (size_t i = 0; i < keys.size(); i++) { + int input_idx = bloom_filter_column_maps_[ifilter][i]; + keys[i] = batch[input_idx]; + } + ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(std::move(keys))); + RETURN_NOT_OK(Hashing32::HashBatch( + key_batch, hashes.data(), ctx_->cpu_info()->hardware_flags(), + &local_states_[thread_index].temp_stack, 0, key_batch.length)); + + pushed_bloom_filters_[ifilter]->Find(ctx_->cpu_info()->hardware_flags(), + key_batch.length, hashes.data(), bv.data()); + arrow::internal::BitmapAnd(bv.data(), 0, selected.data(), 0, key_batch.length, 0, + selected.data()); + } + auto selected_buffer = + arrow::internal::make_unique(selected.data(), bit_vector_bytes); + ArrayData selected_arraydata(boolean(), batch.length, + {nullptr, std::move(selected_buffer)}); + Datum selected_datum(selected_arraydata); + FilterOptions options; + size_t first_nonscalar = batch.values.size(); + for (size_t i = 0; i < batch.values.size(); i++) { + if (!batch.values[i].is_scalar()) { + ARROW_ASSIGN_OR_RAISE(batch.values[i], + Filter(batch.values[i], selected_datum, options, ctx_)); + first_nonscalar = std::min(first_nonscalar, i); + ARROW_DCHECK_EQ(batch.values[i].length(), batch.values[first_nonscalar].length()); + } + } + // If they're all Scalar, then the length of the batch is the number of set bits + if (first_nonscalar == batch.values.size()) + batch.length = arrow::internal::CountSetBits(selected.data(), 0, batch.length); + else + batch.length = batch.values[first_nonscalar].length(); + return Status::OK(); + } + int64_t BuildHashTable_num_tasks() { return 1; } + Status BuildBloomFilter_exec_task(size_t thread_index, int64_t task_id) { + const ExecBatch& input_batch = right_batches_[task_id]; + SchemaProjectionMap key_to_in = + schema_mgr_->proj_maps[1].map(HashJoinProjection::KEY, HashJoinProjection::INPUT); + std::vector key_columns(key_to_in.num_cols); + for (size_t i = 0; i < key_columns.size(); i++) { + int input_idx = key_to_in.get(static_cast(i)); + key_columns[i] = input_batch[input_idx]; + } + ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(std::move(key_columns))); + + RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index)); + ThreadLocalState& tls = local_states_[thread_index]; + util::TempVectorHolder hash_holder(&tls.temp_stack, + util::MiniBatch::kMiniBatchLength); + uint32_t* hashes = hash_holder.mutable_data(); + for (int64_t i = 0; i < key_batch.length; i += util::MiniBatch::kMiniBatchLength) { + int64_t length = std::min(static_cast(key_batch.length - i), + static_cast(util::MiniBatch::kMiniBatchLength)); + RETURN_NOT_OK(Hashing32::HashBatch(key_batch, hashes, + ctx_->cpu_info()->hardware_flags(), + &tls.temp_stack, i, length)); + RETURN_NOT_OK(bloom_filter_builder_->PushNextBatch(thread_index, length, hashes)); + } + return Status::OK(); + } + Status BuildHashTable_exec_task(size_t thread_index, int64_t /*task_id*/) { const std::vector& batches = right_batches_; if (batches.empty()) { @@ -651,23 +779,45 @@ class HashJoinBasicImpl : public HashJoinImpl { return Status::OK(); } + Status BuildBloomFilter_on_finished(size_t thread_index) { + if (cancelled_) return Status::Cancelled("Hash join cancelled"); + ARROW_DCHECK(pushdown_target_); + RETURN_NOT_OK(pushdown_target_->PushBloomFilter( + thread_index, std::move(bloom_filter_), std::move(column_map_))); + return BuildHashTable(thread_index); + } + Status BuildHashTable_on_finished(size_t thread_index) { if (cancelled_) { return Status::Cancelled("Hash join cancelled"); } + right_batches_.clear(); + + bool proceed; { std::lock_guard lock(left_batches_mutex_); + std::lock_guard lock_finish(finished_mutex_); + left_queue_bloom_finished_ = + left_queue_bloom_finished_ || num_expected_bloom_filters_ == 0; + proceed = !has_hash_table_ && left_queue_bloom_finished_; has_hash_table_ = true; } - - right_batches_.clear(); - - RETURN_NOT_OK(ProbeQueuedBatches(thread_index)); + if (proceed) RETURN_NOT_OK(ProbeQueuedBatches(thread_index)); return Status::OK(); } + void RegisterBuildBloomFilter() { + task_group_bloom_ = scheduler_->RegisterTaskGroup( + [this](size_t thread_index, int64_t task_id) -> Status { + return BuildBloomFilter_exec_task(thread_index, task_id); + }, + [this](size_t thread_index) -> Status { + return BuildBloomFilter_on_finished(thread_index); + }); + } + void RegisterBuildHashTable() { task_group_build_ = scheduler_->RegisterTaskGroup( [this](size_t thread_index, int64_t task_id) -> Status { @@ -678,11 +828,63 @@ class HashJoinBasicImpl : public HashJoinImpl { }); } + Status BuildBloomFilter(size_t thread_index) { + RETURN_NOT_OK(bloom_filter_builder_->Begin( + num_threads_, ctx_->cpu_info()->hardware_flags(), ctx_->memory_pool(), + right_input_row_count_, right_batches_.size(), bloom_filter_.get())); + + return scheduler_->StartTaskGroup(thread_index, task_group_bloom_, + right_batches_.size()); + } + Status BuildHashTable(size_t thread_index) { return scheduler_->StartTaskGroup(thread_index, task_group_build_, BuildHashTable_num_tasks()); } + Status BloomFilterQueuedBatches_exec_task(size_t thread_index, int64_t task_id) { + if (cancelled_) return Status::Cancelled("Hash join cancelled"); + ExecBatch batch; + { + std::lock_guard lock(left_batches_mutex_); + batch = std::move(left_batches_[task_id]); + ARROW_DCHECK(!batch.values.empty()); + } + RETURN_NOT_OK(ApplyBloomFiltersToBatch(thread_index, batch)); + { + std::lock_guard lock(left_batches_mutex_); + left_batches_[task_id] = std::move(batch); + } + return Status::OK(); + } + + Status BloomFilterQueuedBatches_on_finished(size_t thread_index) { + if (cancelled_) return Status::Cancelled("Hash join cancelled"); + bool proceed; + { + std::lock_guard lock(finished_mutex_); + proceed = !left_queue_bloom_finished_ && has_hash_table_; + left_queue_bloom_finished_ = true; + } + if (proceed) return ProbeQueuedBatches(thread_index); + return Status::OK(); + } + + void RegisterBloomFilterQueuedBatches() { + task_group_bloom_filter_queued_ = scheduler_->RegisterTaskGroup( + [this](size_t thread_index, int64_t task_id) -> Status { + return BloomFilterQueuedBatches_exec_task(thread_index, task_id); + }, + [this](size_t thread_index) -> Status { + return BloomFilterQueuedBatches_on_finished(thread_index); + }); + } + + Status BloomFilterQueuedBatches(size_t thread_index, size_t num_batches) { + return scheduler_->StartTaskGroup(thread_index, task_group_bloom_filter_queued_, + num_batches); + } + int64_t ProbeQueuedBatches_num_tasks() { return static_cast(left_batches_.size()); } @@ -704,8 +906,9 @@ class HashJoinBasicImpl : public HashJoinImpl { bool proceed; { std::lock_guard lock(finished_mutex_); - proceed = left_side_finished_ && !left_queue_finished_; - left_queue_finished_ = true; + ARROW_DCHECK(left_queue_bloom_finished_); + proceed = left_side_finished_ && !left_queue_probe_finished_; + left_queue_probe_finished_ = true; } if (proceed) { RETURN_NOT_OK(OnLeftSideAndQueueFinished(thread_index)); @@ -751,7 +954,7 @@ class HashJoinBasicImpl : public HashJoinImpl { hash_table_scan_unit_ * (task_id + 1))); ThreadLocalState& local_state = local_states_[thread_index]; - InitLocalStateIfNeeded(thread_index); + RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index)); std::vector& id_left = local_state.no_match; std::vector& id_right = local_state.match; @@ -809,22 +1012,40 @@ class HashJoinBasicImpl : public HashJoinImpl { ScanHashTable_num_tasks()); } - bool QueueBatchIfNeeded(int side, ExecBatch batch) { + Result QueueBatchIfNeeded(size_t thread_index, int side, ExecBatch& batch) { if (side == 0) { - std::lock_guard lock(left_batches_mutex_); - if (has_hash_table_) { - return false; + // We don't want to do the filtering while holding the lock, since that can get + // expensive. + bool needs_filtering; + { + std::lock_guard lock(left_batches_mutex_); + bloom_filters_ready_ = bloom_filters_ready_ || num_expected_bloom_filters_ == 0; + needs_filtering = bloom_filters_ready_ && num_expected_bloom_filters_ != 0; } - left_batches_.emplace_back(std::move(batch)); - return true; + if (needs_filtering) RETURN_NOT_OK(ApplyBloomFiltersToBatch(thread_index, batch)); + + bool queued; + { + std::lock_guard lock(left_batches_mutex_); + queued = !bloom_filters_ready_ || !has_hash_table_; + if (queued) left_batches_.emplace_back(std::move(batch)); + } + return queued; } else { std::lock_guard lock(right_batches_mutex_); + right_input_row_count_ += batch.length; right_batches_.emplace_back(std::move(batch)); return true; } } - Status OnRightSideFinished(size_t thread_index) { return BuildHashTable(thread_index); } + Status OnRightSideFinished(size_t thread_index) { + if (pushdown_target_ == nullptr) { + return BuildHashTable(thread_index); + } else { + return BuildBloomFilter(thread_index); + } + } Status OnLeftSideAndQueueFinished(size_t thread_index) { return ScanHashTable(thread_index); @@ -875,7 +1096,9 @@ class HashJoinBasicImpl : public HashJoinImpl { std::vector key_cmp_; Expression filter_; std::unique_ptr scheduler_; + int task_group_bloom_; int task_group_build_; + int task_group_bloom_filter_queued_; int task_group_queued_; int task_group_scan_; @@ -896,6 +1119,7 @@ class HashJoinBasicImpl : public HashJoinImpl { std::vector match_right; bool is_has_match_initialized; std::vector has_match; + util::TempVectorStack temp_stack; }; std::vector local_states_; @@ -916,15 +1140,29 @@ class HashJoinBasicImpl : public HashJoinImpl { bool has_hash_table_; std::mutex left_batches_mutex_; + size_t right_input_row_count_; // Sum of the lengths of ExecBatches in right_batches_ std::vector right_batches_; std::mutex right_batches_mutex_; + // Bloom filter stuff + // + std::unique_ptr bloom_filter_builder_; + std::unique_ptr bloom_filter_; + std::vector column_map_; + std::vector> pushed_bloom_filters_; + std::vector> bloom_filter_column_maps_; + std::mutex bloom_filters_mutex_; + size_t num_expected_bloom_filters_; + HashJoinImpl* pushdown_target_; + std::atomic num_batches_produced_; bool cancelled_; + bool bloom_filters_ready_; bool right_side_finished_; bool left_side_finished_; - bool left_queue_finished_; + bool left_queue_bloom_finished_; + bool left_queue_probe_finished_; std::mutex finished_mutex_; }; diff --git a/cpp/src/arrow/compute/exec/hash_join.h b/cpp/src/arrow/compute/exec/hash_join.h index 12455f0c6d0..9739cbc6436 100644 --- a/cpp/src/arrow/compute/exec/hash_join.h +++ b/cpp/src/arrow/compute/exec/hash_join.h @@ -21,6 +21,7 @@ #include #include +#include "arrow/compute/exec/bloom_filter.h" #include "arrow/compute/exec/options.h" #include "arrow/compute/exec/schema_util.h" #include "arrow/compute/exec/task_util.h" @@ -107,7 +108,12 @@ class HashJoinImpl { std::vector key_cmp, Expression filter, OutputBatchCallback output_batch_callback, FinishedCallback finished_callback, - TaskScheduler::ScheduleImpl schedule_task_callback) = 0; + TaskScheduler::ScheduleImpl schedule_task_callback, + HashJoinImpl* pushdown_target, std::vector column_map) = 0; + virtual void ExpectBloomFilter() = 0; + virtual Status PushBloomFilter(size_t thread_index, + std::unique_ptr filter, + std::vector column_map) = 0; virtual Status InputReceived(size_t thread_index, int side, ExecBatch batch) = 0; virtual Status InputFinished(size_t thread_index, int side) = 0; virtual void Abort(TaskScheduler::AbortContinuationImpl pos_abort_callback) = 0; diff --git a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc index 3d4271b6cb9..8d8be7f904f 100644 --- a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc +++ b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc @@ -36,6 +36,7 @@ namespace arrow { namespace compute { struct BenchmarkSettings { + bool bloom_filter = false; int num_threads = 1; JoinType join_type = JoinType::INNER; int batch_size = 1024; @@ -57,6 +58,7 @@ class JoinBenchmark { SchemaBuilder l_schema_builder, r_schema_builder; std::vector left_keys, right_keys; + std::vector key_cmp; for (size_t i = 0; i < settings.key_types.size(); i++) { std::string l_name = "lk" + std::to_string(i); std::string r_name = "rk" + std::to_string(i); @@ -93,6 +95,7 @@ class JoinBenchmark { left_keys.push_back(FieldRef(l_name)); right_keys.push_back(FieldRef(r_name)); + key_cmp.push_back(JoinKeyCmp::EQ); } for (size_t i = 0; i < settings.build_payload_types.size(); i++) { @@ -126,6 +129,22 @@ class JoinBenchmark { join_ = *HashJoinImpl::MakeBasic(); + HashJoinImpl* bloom_filter_pushdown_target = nullptr; + std::vector key_input_map; + + bool bloom_filter_does_not_apply_to_join = + settings.join_type == JoinType::LEFT_ANTI || + settings.join_type == JoinType::LEFT_OUTER || + settings.join_type == JoinType::FULL_OUTER; + if (settings.bloom_filter && !bloom_filter_does_not_apply_to_join) { + bloom_filter_pushdown_target = join_.get(); + SchemaProjectionMap probe_key_to_input = schema_mgr_->proj_maps[0].map( + HashJoinProjection::KEY, HashJoinProjection::INPUT); + int num_keys = probe_key_to_input.num_cols; + for (int i = 0; i < num_keys; i++) + key_input_map.push_back(probe_key_to_input.get(i)); + } + omp_set_num_threads(settings.num_threads); auto schedule_callback = [](std::function func) -> Status { #pragma omp task @@ -135,8 +154,9 @@ class JoinBenchmark { DCHECK_OK(join_->Init( ctx_.get(), settings.join_type, !is_parallel, settings.num_threads, - schema_mgr_.get(), {JoinKeyCmp::EQ}, std::move(filter), [](ExecBatch) {}, - [](int64_t x) {}, schedule_callback)); + schema_mgr_.get(), std::move(key_cmp), std::move(filter), [](ExecBatch) {}, + [](int64_t x) {}, schedule_callback, bloom_filter_pushdown_target, + std::move(key_input_map))); } void RunJoin() { @@ -268,6 +288,16 @@ static void BM_HashJoinBasic_NullPercentage(benchmark::State& st) { HashJoinBasicBenchmarkImpl(st, settings); } + +static void BM_HashJoinBasic_BloomFilter(benchmark::State& st, bool bloom_filter) { + BenchmarkSettings settings; + settings.bloom_filter = bloom_filter; + settings.selectivity = static_cast(st.range(0)) / 100.0; + settings.num_build_batches = static_cast(st.range(1)); + settings.num_probe_batches = settings.num_build_batches; + + HashJoinBasicBenchmarkImpl(st, settings); +} #endif std::vector hashtable_krows = benchmark::CreateRange(1, 4096, 8); @@ -395,6 +425,16 @@ BENCHMARK(BM_HashJoinBasic_BuildParallelism) BENCHMARK(BM_HashJoinBasic_NullPercentage) ->ArgNames({"Null Percentage"}) ->DenseRange(0, 100, 10); + +std::vector bloomfilter_argnames = {"Selectivity", "HashTable krows"}; +std::vector> bloomfilter_args = { + benchmark::CreateDenseRange(0, 100, 10), hashtable_krows}; +BENCHMARK_CAPTURE(BM_HashJoinBasic_BloomFilter, "Bloom Filter", true) + ->ArgNames(bloomfilter_argnames) + ->ArgsProduct(selectivity_args); +BENCHMARK_CAPTURE(BM_HashJoinBasic_BloomFilter, "No Bloom Filter", false) + ->ArgNames(bloomfilter_argnames) + ->ArgsProduct(selectivity_args); #else BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int32}", {int32()}) diff --git a/cpp/src/arrow/compute/exec/hash_join_graphs.py b/cpp/src/arrow/compute/exec/hash_join_graphs.py index ff7ab63187d..16b4c3e5fd4 100755 --- a/cpp/src/arrow/compute/exec/hash_join_graphs.py +++ b/cpp/src/arrow/compute/exec/hash_join_graphs.py @@ -148,7 +148,7 @@ def plot_3d(test, sorted_argnames): num_cols = int(math.ceil(num_graphs / num_rows)) graphs = set(test.args[sorted_argnames[0]]) - for j, graph in enumerate(sorted(graphs, key=string_numeric_sort_key)): + for j, graph in enumerate(sorted(graphs, key=try_as_numeric)): ax = plt.subplot(num_rows, num_cols, j + 1) filtered_test = Test() indices = range(len(test.times)) diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc index 0282e387c42..d8d729dd1ae 100644 --- a/cpp/src/arrow/compute/exec/hash_join_node.cc +++ b/cpp/src/arrow/compute/exec/hash_join_node.cc @@ -16,6 +16,7 @@ // under the License. #include +#include #include "arrow/compute/exec/exec_plan.h" #include "arrow/compute/exec/hash_join.h" @@ -466,7 +467,8 @@ class HashJoinNode : public ExecNode { key_cmp_(join_options.key_cmp), filter_(std::move(filter)), schema_mgr_(std::move(schema_mgr)), - impl_(std::move(impl)) { + impl_(std::move(impl)), + disable_bloom_filter_(join_options.disable_bloom_filter) { complete_.store(false); } @@ -571,24 +573,138 @@ class HashJoinNode : public ExecNode { } } - Status StartProducing() override { - START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}}); - END_SPAN_ON_FUTURE_COMPLETION(span_, finished(), this); + // The Bloom filter is built on the build side of some upstream join. For a join to + // evaluate the Bloom filter on its input columns, it has to rearrange its input columns + // to match the column order of the Bloom filter. + // + // The first part of the pair is the HashJoin to actually perform the pushdown into. + // The second part is a mapping such that column_map[i] is the index of key i in + // the first part's input. + // If we should disable Bloom filter, returns nullptr and an empty vector, and sets + // the disable_bloom_filter_ flag. + std::pair> GetPushdownTarget() { +#if !ARROW_LITTLE_ENDIAN + // TODO (ARROW-16591): Debug bloom_filter.cc to enable on Big endian. It probably just + // needs a few byte swaps in the proper spots. + disable_bloom_filter_ = true; + return {nullptr, {}}; +#else + // A build-side Bloom filter tells us if a row is definitely not in the build side. + // This allows us to early-eliminate rows or early-accept rows depending on the type + // of join. Left Outer Join and Full Outer Join output all rows, so a build-side Bloom + // filter would only allow us to early-output. Left Antijoin outputs only if there is + // no match, so again early output. We don't implement early output for now, so we + // must disallow these types of joins. + bool bloom_filter_does_not_apply_to_join = join_type_ == JoinType::LEFT_ANTI || + join_type_ == JoinType::LEFT_OUTER || + join_type_ == JoinType::FULL_OUTER; + disable_bloom_filter_ = disable_bloom_filter_ || bloom_filter_does_not_apply_to_join; + + SchemaProjectionMap build_keys_to_input = + schema_mgr_->proj_maps[1].map(HashJoinProjection::KEY, HashJoinProjection::INPUT); + // Bloom filter currently doesn't support dictionaries. + for (int i = 0; i < build_keys_to_input.num_cols; i++) { + int idx = build_keys_to_input.get(i); + bool is_dict = + inputs_[1]->output_schema()->field(idx)->type()->id() == Type::DICTIONARY; + if (is_dict) { + disable_bloom_filter_ = true; + break; + } + } + + bool all_comparisons_is = true; + for (JoinKeyCmp cmp : key_cmp_) all_comparisons_is &= (cmp == JoinKeyCmp::IS); + + if ((join_type_ == JoinType::RIGHT_OUTER || join_type_ == JoinType::FULL_OUTER) && + all_comparisons_is) + disable_bloom_filter_ = true; + + if (disable_bloom_filter_) return {nullptr, {}}; + + // We currently only push Bloom filters on the probe side, and only if that input is + // also a join. + SchemaProjectionMap probe_key_to_input = + schema_mgr_->proj_maps[0].map(HashJoinProjection::KEY, HashJoinProjection::INPUT); + int num_keys = probe_key_to_input.num_cols; + + // A mapping such that bloom_to_target[i] is the index of key i in the pushdown + // target's input + std::vector bloom_to_target(num_keys); + HashJoinNode* pushdown_target = this; + for (int i = 0; i < num_keys; i++) bloom_to_target[i] = probe_key_to_input.get(i); + + for (ExecNode* candidate = inputs()[0]; candidate->kind_name() == this->kind_name(); + candidate = candidate->inputs()[0]) { + auto* candidate_as_join = checked_cast(candidate); + SchemaProjectionMap candidate_output_to_input = + candidate_as_join->schema_mgr_->proj_maps[0].map(HashJoinProjection::OUTPUT, + HashJoinProjection::INPUT); + + // Check if any of the keys are missing, if they are, break + bool break_outer = false; + for (int i = 0; i < num_keys; i++) { + // Since all of the probe side columns are before the build side columns, + // if the index of an output is greater than the number of probe-side input + // columns, it must have come from the candidate's build side. + if (bloom_to_target[i] >= candidate_output_to_input.num_cols) { + break_outer = true; + break; + } + int candidate_input_idx = candidate_output_to_input.get(bloom_to_target[i]); + // The output column has to have come from somewhere... + ARROW_DCHECK_NE(candidate_input_idx, schema_mgr_->kMissingField()); + } + if (break_outer) break; + + // The Bloom filter will filter out nulls, which may cause a Right/Full Outer Join + // to incorrectly output some rows with nulls padding the probe-side rows. This may + // cause a row with all null keys to be emitted. This is normally not an issue + // with EQ, but if all comparisons are IS (i.e. all-null is accepted), this could + // produce incorrect rows. + bool can_produce_build_side_nulls = + candidate_as_join->join_type_ == JoinType::RIGHT_OUTER || + candidate_as_join->join_type_ == JoinType::FULL_OUTER; + + if (all_comparisons_is || can_produce_build_side_nulls) break; + + // All keys are present, we can update the mapping + for (int i = 0; i < num_keys; i++) { + int candidate_input_idx = candidate_output_to_input.get(bloom_to_target[i]); + bloom_to_target[i] = candidate_input_idx; + } + pushdown_target = candidate_as_join; + } + return std::make_pair(pushdown_target->impl_.get(), std::move(bloom_to_target)); +#endif // ARROW_LITTLE_ENDIAN + } + Status PrepareToProduce() override { bool use_sync_execution = !(plan_->exec_context()->executor()); size_t num_threads = use_sync_execution ? 1 : thread_indexer_.Capacity(); - RETURN_NOT_OK(impl_->Init( + HashJoinImpl* pushdown_target = nullptr; + std::vector column_map; + std::tie(pushdown_target, column_map) = GetPushdownTarget(); + + return impl_->Init( plan_->exec_context(), join_type_, use_sync_execution, num_threads, schema_mgr_.get(), key_cmp_, filter_, [this](ExecBatch batch) { this->OutputBatchCallback(batch); }, [this](int64_t total_num_batches) { this->FinishedCallback(total_num_batches); }, [this](std::function func) -> Status { return this->ScheduleTaskCallback(std::move(func)); - })); + }, + pushdown_target, std::move(column_map)); + } + + Status StartProducing() override { + START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}}); + END_SPAN_ON_FUTURE_COMPLETION(span_, finished(), this); + return Status::OK(); } @@ -662,6 +778,7 @@ class HashJoinNode : public ExecNode { std::unique_ptr schema_mgr_; std::unique_ptr impl_; util::AsyncTaskGroup task_group_; + bool disable_bloom_filter_; }; namespace internal { diff --git a/cpp/src/arrow/compute/exec/hash_join_node_test.cc b/cpp/src/arrow/compute/exec/hash_join_node_test.cc index 99a8fc01e0a..f8da71c7b54 100644 --- a/cpp/src/arrow/compute/exec/hash_join_node_test.cc +++ b/cpp/src/arrow/compute/exec/hash_join_node_test.cc @@ -1000,11 +1000,14 @@ TEST(HashJoin, Random) { Random64Bit rng(42); #if defined(THREAD_SANITIZER) || defined(ARROW_VALGRIND) const int num_tests = 15; +#elif defined(ADDRESS_SANITIZER) + const int num_tests = 25; #else const int num_tests = 100; #endif for (int test_id = 0; test_id < num_tests; ++test_id) { bool parallel = (rng.from_range(0, 1) == 1); + bool disable_bloom_filter = (rng.from_range(0, 1) == 1); auto exec_ctx = arrow::internal::make_unique( default_memory_pool(), parallel ? arrow::internal::GetCpuThreadPool() : nullptr); @@ -1110,7 +1113,8 @@ TEST(HashJoin, Random) { } ARROW_SCOPED_TRACE(join_type_name, " ", key_cmp_str, - " parallel = ", (parallel ? "true" : "false")); + " parallel = ", (parallel ? "true" : "false"), + " bloom_filter = ", (disable_bloom_filter ? "false" : "true")); // Run reference join implementation std::vector null_in_key_vectors[2]; @@ -1130,7 +1134,7 @@ TEST(HashJoin, Random) { // Turn the last key comparison into a residual filter expression Expression filter = literal(true); - if (key_cmp.size() > 1 && rng.from_range(0, 1) == 0) { + if (key_cmp.size() > 1 && rng.from_range(0, 4) == 0) { for (size_t i = 0; i < key_cmp.size(); i++) { FieldRef left = key_fields[0][i]; FieldRef right = key_fields[1][i]; @@ -1158,6 +1162,7 @@ TEST(HashJoin, Random) { HashJoinNodeOptions join_options{ join_type, key_fields[0], key_fields[1], output_fields[0], output_fields[1], key_cmp, filter}; + join_options.disable_bloom_filter = disable_bloom_filter; std::vector> output_schema_fields; for (int i = 0; i < 2; ++i) { for (size_t col = 0; col < output_fields[i].size(); ++col) { @@ -1168,6 +1173,7 @@ TEST(HashJoin, Random) { } std::shared_ptr output_schema = std::make_shared(std::move(output_schema_fields)); + ASSERT_OK_AND_ASSIGN( auto batches, HashJoinWithExecPlan( rng, parallel, join_options, output_schema, @@ -1901,5 +1907,150 @@ TEST(HashJoin, TrivialResidualFilter) { } } +HashJoinNodeOptions GenerateHashJoinNodeOptions(Random64Bit& rng, int num_left_cols, + int num_right_cols) { + HashJoinNodeOptions opts; + opts.join_type = static_cast(rng.from_range(0, 7)); + bool is_left_join = opts.join_type == JoinType::LEFT_SEMI || + opts.join_type == JoinType::LEFT_ANTI || + opts.join_type == JoinType::LEFT_OUTER; + bool is_right_join = opts.join_type == JoinType::RIGHT_SEMI || + opts.join_type == JoinType::RIGHT_ANTI || + opts.join_type == JoinType::RIGHT_OUTER; + + int num_keys = rng.from_range(1, std::min(num_left_cols, num_right_cols)); + for (int i = 0; i < num_left_cols; i++) { + bool is_out = rng.from_range(0, 2) != 2; + if (is_out && !is_right_join) opts.left_output.push_back(FieldRef(i)); + } + for (int i = 0; i < num_right_cols; i++) { + bool is_out = rng.from_range(0, 2) == 2; + if (is_out && !is_left_join) opts.right_output.push_back(FieldRef(i)); + } + // We need at least one output + if (opts.right_output.empty() && opts.left_output.empty()) { + if (is_left_join) { + int col = rng.from_range(0, num_left_cols - 1); + opts.left_output.push_back(FieldRef(col)); + } else if (is_right_join) { + int col = rng.from_range(0, num_right_cols - 1); + opts.right_output.push_back(FieldRef(col)); + } else { + if (rng.from_range(0, 1) == 0) { + int col = rng.from_range(0, num_left_cols - 1); + opts.left_output.push_back(FieldRef(col)); + } else { + int col = rng.from_range(0, num_right_cols - 1); + opts.right_output.push_back(FieldRef(col)); + } + } + } + + for (int i = 0; i < num_keys; i++) { + int left = rng.from_range(0, num_left_cols - 1); + int right = rng.from_range(0, num_right_cols - 1); + bool is_or_eq = rng.from_range(0, 1) == 0; + opts.left_keys.push_back(FieldRef(left)); + opts.right_keys.push_back(FieldRef(right)); + opts.key_cmp.push_back(is_or_eq ? JoinKeyCmp::IS : JoinKeyCmp::EQ); + } + return opts; +} + +void TestSingleChainOfHashJoins(Random64Bit& rng) { + int num_joins = rng.from_range(2, 5); + std::vector opts; + int num_left_cols = rng.from_range(1, 8); + int num_right_cols = rng.from_range(1, 8); + HashJoinNodeOptions first_opt = + GenerateHashJoinNodeOptions(rng, num_left_cols, num_right_cols); + opts.push_back(std::move(first_opt)); + + std::unordered_map metadata_map; + metadata_map["min"] = "0"; + metadata_map["max"] = "10"; + auto metadata = key_value_metadata(metadata_map); + std::vector> left_fields; + for (int i = 0; i < num_left_cols; i++) + left_fields.push_back(field(std::string("l") + std::to_string(i), int32(), metadata)); + std::vector> first_right_fields; + for (int i = 0; i < num_right_cols; i++) + first_right_fields.push_back( + field(std::string("r_0_") + std::to_string(i), int32(), metadata)); + + BatchesWithSchema input_left = MakeRandomBatches(schema(std::move(left_fields))); + std::vector input_right; + input_right.push_back(MakeRandomBatches(schema(std::move(first_right_fields)))); + + for (int i = 1; i < num_joins; i++) { + int num_right_cols = rng.from_range(1, 8); + HashJoinNodeOptions opt = + GenerateHashJoinNodeOptions(rng, + static_cast(opts[i - 1].left_output.size() + + opts[i - 1].right_output.size()), + num_right_cols); + opts.push_back(std::move(opt)); + + std::vector> right_fields; + for (int j = 0; j < num_right_cols; j++) + right_fields.push_back( + field(std::string("r_") + std::to_string(i) + "_" + std::to_string(j), int32(), + metadata)); + BatchesWithSchema input = MakeRandomBatches(schema(std::move(right_fields))); + input_right.push_back(std::move(input)); + } + + std::vector reference; + for (bool bloom_filters : {false, true}) { + bool kParallel = true; + ARROW_SCOPED_TRACE(bloom_filters ? "bloom filtered" : "unfiltered"); + auto exec_ctx = arrow::internal::make_unique( + default_memory_pool(), kParallel ? arrow::internal::GetCpuThreadPool() : nullptr); + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get())); + + ExecNode* left_source; + ASSERT_OK_AND_ASSIGN( + left_source, + MakeExecNode("source", plan.get(), {}, + SourceNodeOptions{input_left.schema, + input_left.gen(kParallel, /*slow=*/false)})); + std::vector joins(num_joins); + for (int i = 0; i < num_joins; i++) { + opts[i].disable_bloom_filter = !bloom_filters; + ExecNode* right_source; + ASSERT_OK_AND_ASSIGN( + right_source, + MakeExecNode("source", plan.get(), {}, + SourceNodeOptions{input_right[i].schema, + input_right[i].gen(kParallel, /*slow=*/false)})); + + std::vector inputs; + if (i == 0) + inputs = {left_source, right_source}; + else + inputs = {joins[i - 1], right_source}; + ASSERT_OK_AND_ASSIGN(joins[i], + MakeExecNode("hashjoin", plan.get(), inputs, opts[i])); + } + AsyncGenerator> sink_gen; + ASSERT_OK( + MakeExecNode("sink", plan.get(), {joins.back()}, SinkNodeOptions{&sink_gen})); + ASSERT_FINISHES_OK_AND_ASSIGN(auto result, StartAndCollect(plan.get(), sink_gen)); + if (!bloom_filters) + reference = std::move(result); + else + AssertExecBatchesEqual(joins.back()->output_schema(), reference, result); + } +} + +TEST(HashJoin, ChainedIntegerHashJoins) { + Random64Bit rng(42); + int num_tests = 30; + for (int i = 0; i < num_tests; i++) { + ARROW_SCOPED_TRACE("Test ", std::to_string(i)); + TestSingleChainOfHashJoins(rng); + } +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/key_encode.h b/cpp/src/arrow/compute/exec/key_encode.h index f9de31d9c21..58d8fb233f0 100644 --- a/cpp/src/arrow/compute/exec/key_encode.h +++ b/cpp/src/arrow/compute/exec/key_encode.h @@ -21,6 +21,8 @@ #include #include +#include "arrow/array/data.h" +#include "arrow/compute/exec.h" #include "arrow/compute/exec/util.h" #include "arrow/compute/light_array.h" #include "arrow/memory_pool.h" diff --git a/cpp/src/arrow/compute/exec/key_hash.cc b/cpp/src/arrow/compute/exec/key_hash.cc index 125fd3912e1..e81ed64b6f2 100644 --- a/cpp/src/arrow/compute/exec/key_hash.cc +++ b/cpp/src/arrow/compute/exec/key_hash.cc @@ -22,6 +22,7 @@ #include #include +#include "arrow/compute/exec/key_encode.h" #include "arrow/util/bit_util.h" #include "arrow/util/ubsan.h" @@ -456,6 +457,19 @@ void Hashing32::HashMultiColumn(const std::vector& cols, } } +Status Hashing32::HashBatch(const ExecBatch& key_batch, uint32_t* hashes, + int64_t hardware_flags, util::TempVectorStack* temp_stack, + int64_t offset, int64_t length) { + std::vector column_arrays; + RETURN_NOT_OK(ColumnArraysFromExecBatch(key_batch, offset, length, &column_arrays)); + + KeyEncoder::KeyEncoderContext ctx; + ctx.hardware_flags = hardware_flags; + ctx.stack = temp_stack; + HashMultiColumn(column_arrays, &ctx, hashes); + return Status::OK(); +} + inline uint64_t Hashing64::Avalanche(uint64_t acc) { acc ^= (acc >> 33); acc *= PRIME64_2; @@ -875,5 +889,18 @@ void Hashing64::HashMultiColumn(const std::vector& cols, } } +Status Hashing64::HashBatch(const ExecBatch& key_batch, uint64_t* hashes, + int64_t hardware_flags, util::TempVectorStack* temp_stack, + int64_t offset, int64_t length) { + std::vector column_arrays; + RETURN_NOT_OK(ColumnArraysFromExecBatch(key_batch, offset, length, &column_arrays)); + + KeyEncoder::KeyEncoderContext ctx; + ctx.hardware_flags = hardware_flags; + ctx.stack = temp_stack; + HashMultiColumn(column_arrays, &ctx, hashes); + return Status::OK(); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/key_hash.h b/cpp/src/arrow/compute/exec/key_hash.h index 719f3dfd460..05f39bb729a 100644 --- a/cpp/src/arrow/compute/exec/key_hash.h +++ b/cpp/src/arrow/compute/exec/key_hash.h @@ -48,6 +48,10 @@ class ARROW_EXPORT Hashing32 { static void HashMultiColumn(const std::vector& cols, KeyEncoder::KeyEncoderContext* ctx, uint32_t* out_hash); + static Status HashBatch(const ExecBatch& key_batch, uint32_t* hashes, + int64_t hardware_flags, util::TempVectorStack* temp_stack, + int64_t offset, int64_t length); + private: static const uint32_t PRIME32_1 = 0x9E3779B1; static const uint32_t PRIME32_2 = 0x85EBCA77; @@ -156,6 +160,10 @@ class ARROW_EXPORT Hashing64 { static void HashMultiColumn(const std::vector& cols, KeyEncoder::KeyEncoderContext* ctx, uint64_t* hashes); + static Status HashBatch(const ExecBatch& key_batch, uint64_t* hashes, + int64_t hardware_flags, util::TempVectorStack* temp_stack, + int64_t offset, int64_t length); + private: static const uint64_t PRIME64_1 = 0x9E3779B185EBCA87ULL; static const uint64_t PRIME64_2 = 0xC2B2AE3D27D4EB4FULL; diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 3c901be0d2e..48cbf9d3710 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -281,14 +281,16 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions { JoinType in_join_type, std::vector in_left_keys, std::vector in_right_keys, Expression filter = literal(true), std::string output_suffix_for_left = default_output_suffix_for_left, - std::string output_suffix_for_right = default_output_suffix_for_right) + std::string output_suffix_for_right = default_output_suffix_for_right, + bool disable_bloom_filter = false) : join_type(in_join_type), left_keys(std::move(in_left_keys)), right_keys(std::move(in_right_keys)), output_all(true), output_suffix_for_left(std::move(output_suffix_for_left)), output_suffix_for_right(std::move(output_suffix_for_right)), - filter(std::move(filter)) { + filter(std::move(filter)), + disable_bloom_filter(disable_bloom_filter) { this->key_cmp.resize(this->left_keys.size()); for (size_t i = 0; i < this->left_keys.size(); ++i) { this->key_cmp[i] = JoinKeyCmp::EQ; @@ -299,7 +301,8 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions { std::vector right_keys, std::vector left_output, std::vector right_output, Expression filter = literal(true), std::string output_suffix_for_left = default_output_suffix_for_left, - std::string output_suffix_for_right = default_output_suffix_for_right) + std::string output_suffix_for_right = default_output_suffix_for_right, + bool disable_bloom_filter = false) : join_type(join_type), left_keys(std::move(left_keys)), right_keys(std::move(right_keys)), @@ -308,7 +311,8 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions { right_output(std::move(right_output)), output_suffix_for_left(std::move(output_suffix_for_left)), output_suffix_for_right(std::move(output_suffix_for_right)), - filter(std::move(filter)) { + filter(std::move(filter)), + disable_bloom_filter(disable_bloom_filter) { this->key_cmp.resize(this->left_keys.size()); for (size_t i = 0; i < this->left_keys.size(); ++i) { this->key_cmp[i] = JoinKeyCmp::EQ; @@ -320,7 +324,8 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions { std::vector right_output, std::vector key_cmp, Expression filter = literal(true), std::string output_suffix_for_left = default_output_suffix_for_left, - std::string output_suffix_for_right = default_output_suffix_for_right) + std::string output_suffix_for_right = default_output_suffix_for_right, + bool disable_bloom_filter = false) : join_type(join_type), left_keys(std::move(left_keys)), right_keys(std::move(right_keys)), @@ -330,17 +335,20 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions { key_cmp(std::move(key_cmp)), output_suffix_for_left(std::move(output_suffix_for_left)), output_suffix_for_right(std::move(output_suffix_for_right)), - filter(std::move(filter)) {} + filter(std::move(filter)), + disable_bloom_filter(disable_bloom_filter) {} + + HashJoinNodeOptions() = default; // type of join (inner, left, semi...) - JoinType join_type; + JoinType join_type = JoinType::INNER; // key fields from left input std::vector left_keys; // key fields from right input std::vector right_keys; // if set all valid fields from both left and right input will be output // (and field ref vectors for output fields will be ignored) - bool output_all; + bool output_all = false; // output fields passed from left input std::vector left_output; // output fields passed from right input @@ -358,7 +366,9 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions { // the filter are not included. The filter is applied against the // concatenated input schema (left fields then right fields) and can reference // fields that are not included in the output. - Expression filter; + Expression filter = literal(true); + // whether or not to disable Bloom filters in this join + bool disable_bloom_filter = false; }; /// \brief Make a node which select top_k/bottom_k rows passed through it diff --git a/cpp/src/arrow/compute/exec/partition_util.cc b/cpp/src/arrow/compute/exec/partition_util.cc index ed5e37edca3..e99007c45a3 100644 --- a/cpp/src/arrow/compute/exec/partition_util.cc +++ b/cpp/src/arrow/compute/exec/partition_util.cc @@ -21,24 +21,25 @@ namespace arrow { namespace compute { -PartitionLocks::PartitionLocks() - : num_prtns_(0), - locks_(nullptr), - rand_seed_{0, 0, 0, 0, 0, 0, 0, 0}, - rand_engine_(rand_seed_) {} +PartitionLocks::PartitionLocks() : num_prtns_(0), locks_(nullptr), rngs_(nullptr) {} PartitionLocks::~PartitionLocks() { CleanUp(); } -void PartitionLocks::Init(int num_prtns) { +void PartitionLocks::Init(size_t num_threads, int num_prtns) { num_prtns_ = num_prtns; locks_.reset(new PartitionLock[num_prtns]); + rngs_.reset(new arrow::random::pcg32_fast[num_threads]); for (int i = 0; i < num_prtns; ++i) { locks_[i].lock.store(false); } + arrow::random::pcg32_fast seed_gen(0); + std::uniform_int_distribution seed_dist; + for (size_t i = 0; i < num_threads; i++) rngs_[i].seed(seed_dist(seed_gen)); } void PartitionLocks::CleanUp() { locks_.reset(); + rngs_.reset(); num_prtns_ = 0; } @@ -48,23 +49,23 @@ std::atomic* PartitionLocks::lock_ptr(int prtn_id) { return &(locks_[prtn_id].lock); } -int PartitionLocks::random_int(int num_values) { - return rand_distribution_(rand_engine_) % num_values; +int PartitionLocks::random_int(size_t thread_id, int num_values) { + return std::uniform_int_distribution{0, num_values - 1}(rngs_[thread_id]); } -bool PartitionLocks::AcquirePartitionLock(int num_prtns_to_try, +bool PartitionLocks::AcquirePartitionLock(size_t thread_id, int num_prtns_to_try, const int* prtn_ids_to_try, bool limit_retries, int max_retries, int* locked_prtn_id, int* locked_prtn_id_pos) { int trial = 0; while (!limit_retries || trial <= max_retries) { - int prtn_id_pos = random_int(num_prtns_to_try); + int prtn_id_pos = random_int(thread_id, num_prtns_to_try); int prtn_id = prtn_ids_to_try[prtn_id_pos]; std::atomic* lock = lock_ptr(prtn_id); bool expected = false; - if (lock->compare_exchange_weak(expected, true)) { + if (lock->compare_exchange_weak(expected, true, std::memory_order_acquire)) { *locked_prtn_id = prtn_id; *locked_prtn_id_pos = prtn_id_pos; return true; @@ -81,7 +82,7 @@ bool PartitionLocks::AcquirePartitionLock(int num_prtns_to_try, void PartitionLocks::ReleasePartitionLock(int prtn_id) { ARROW_DCHECK(prtn_id >= 0 && prtn_id < num_prtns_); std::atomic* lock = lock_ptr(prtn_id); - lock->store(false); + lock->store(false, std::memory_order_release); } } // namespace compute diff --git a/cpp/src/arrow/compute/exec/partition_util.h b/cpp/src/arrow/compute/exec/partition_util.h index 6efda4aeeb0..07fb91f2f1a 100644 --- a/cpp/src/arrow/compute/exec/partition_util.h +++ b/cpp/src/arrow/compute/exec/partition_util.h @@ -24,6 +24,7 @@ #include #include "arrow/buffer.h" #include "arrow/compute/exec/util.h" +#include "arrow/util/pcg_random.h" namespace arrow { namespace compute { @@ -59,14 +60,14 @@ class PartitionSort { /// out_arr: [2, 5, 3, 5, 4, 7] /// prtn_ranges: [0, 1, 5, 6] template - static void Eval(int num_rows, int num_prtns, uint16_t* prtn_ranges, + static void Eval(int64_t num_rows, int num_prtns, uint16_t* prtn_ranges, INPUT_PRTN_ID_FN prtn_id_impl, OUTPUT_POS_FN output_pos_impl) { ARROW_DCHECK(num_rows > 0 && num_rows <= (1 << 15)); ARROW_DCHECK(num_prtns >= 1 && num_prtns <= (1 << 15)); memset(prtn_ranges, 0, (num_prtns + 1) * sizeof(uint16_t)); - for (int i = 0; i < num_rows; ++i) { + for (int64_t i = 0; i < num_rows; ++i) { int prtn_id = static_cast(prtn_id_impl(i)); ++prtn_ranges[prtn_id + 1]; } @@ -78,7 +79,7 @@ class PartitionSort { sum = sum_next; } - for (int i = 0; i < num_rows; ++i) { + for (int64_t i = 0; i < num_rows; ++i) { int prtn_id = static_cast(prtn_id_impl(i)); int pos = prtn_ranges[prtn_id + 1]++; output_pos_impl(i, pos); @@ -93,12 +94,14 @@ class PartitionLocks { ~PartitionLocks(); /// \brief Initializes the control, must be called before use /// + /// \param num_threads Maximum number of threads that will access the partitions /// \param num_prtns Number of partitions to synchronize - void Init(int num_prtns); + void Init(size_t num_threads, int num_prtns); /// \brief Cleans up the control, it should not be used after this call void CleanUp(); /// \brief Acquire a partition to work on one /// + /// \param thread_id The index of the thread trying to acquire the partition lock /// \param num_prtns Length of prtns_to_try, must be <= num_prtns used in Init /// \param prtns_to_try An array of partitions that still have remaining work /// \param limit_retries If false, this method will spinwait forever until success @@ -109,15 +112,15 @@ class PartitionLocks { /// without successfully acquiring a lock /// /// This method is thread safe - bool AcquirePartitionLock(int num_prtns, const int* prtns_to_try, bool limit_retries, - int max_retries, int* locked_prtn_id, + bool AcquirePartitionLock(size_t thread_id, int num_prtns, const int* prtns_to_try, + bool limit_retries, int max_retries, int* locked_prtn_id, int* locked_prtn_id_pos); /// \brief Release a partition so that other threads can work on it void ReleasePartitionLock(int prtn_id); private: std::atomic* lock_ptr(int prtn_id); - int random_int(int num_values); + int random_int(size_t thread_id, int num_values); struct PartitionLock { static constexpr int kCacheLineBytes = 64; @@ -126,10 +129,7 @@ class PartitionLocks { }; int num_prtns_; std::unique_ptr locks_; - - std::seed_seq rand_seed_; - std::mt19937 rand_engine_; - std::uniform_int_distribution rand_distribution_; + std::unique_ptr rngs_; }; } // namespace compute diff --git a/cpp/src/arrow/compute/exec/util.cc b/cpp/src/arrow/compute/exec/util.cc index f6ac70ad45a..ae70cfcd46f 100644 --- a/cpp/src/arrow/compute/exec/util.cc +++ b/cpp/src/arrow/compute/exec/util.cc @@ -29,6 +29,38 @@ using bit_util::CountTrailingZeros; namespace util { +inline uint64_t bit_util::SafeLoadUpTo8Bytes(const uint8_t* bytes, int num_bytes) { + // This will not be correct on big-endian architectures. +#if !ARROW_LITTLE_ENDIAN + ARROW_DCHECK(false); +#endif + ARROW_DCHECK(num_bytes >= 0 && num_bytes <= 8); + if (num_bytes == 8) { + return util::SafeLoad(reinterpret_cast(bytes)); + } else { + uint64_t word = 0; + for (int i = 0; i < num_bytes; ++i) { + word |= static_cast(bytes[i]) << (8 * i); + } + return word; + } +} + +inline void bit_util::SafeStoreUpTo8Bytes(uint8_t* bytes, int num_bytes, uint64_t value) { + // This will not be correct on big-endian architectures. +#if !ARROW_LITTLE_ENDIAN + ARROW_DCHECK(false); +#endif + ARROW_DCHECK(num_bytes >= 0 && num_bytes <= 8); + if (num_bytes == 8) { + util::SafeStore(reinterpret_cast(bytes), value); + } else { + for (int i = 0; i < num_bytes; ++i) { + bytes[i] = static_cast(value >> (8 * i)); + } + } +} + inline void bit_util::bits_to_indexes_helper(uint64_t word, uint16_t base_index, int* num_indexes, uint16_t* indexes) { int n = *num_indexes; @@ -86,8 +118,8 @@ void bit_util::bits_to_indexes_internal(int64_t hardware_flags, const int num_bi #endif // Optionally process the last partial word with masking out bits outside range if (tail) { - uint64_t word = - util::SafeLoad(&reinterpret_cast(bits)[num_bits / unroll]); + const uint8_t* bits_tail = bits + (num_bits - tail) / 8; + uint64_t word = SafeLoadUpTo8Bytes(bits_tail, (tail + 7) / 8); if (bit_to_search == 0) { word = ~word; } @@ -109,8 +141,7 @@ void bit_util::bits_to_indexes(int bit_to_search, int64_t hardware_flags, int nu *num_indexes = 0; uint16_t base_index = 0; if (bit_offset != 0) { - uint64_t bits_head = - util::SafeLoad(reinterpret_cast(bits)) >> bit_offset; + uint64_t bits_head = bits[0] >> bit_offset; int bits_in_first_byte = std::min(num_bits, 8 - bit_offset); bits_to_indexes(bit_to_search, hardware_flags, bits_in_first_byte, reinterpret_cast(&bits_head), num_indexes, indexes); @@ -143,8 +174,7 @@ void bit_util::bits_filter_indexes(int bit_to_search, int64_t hardware_flags, bit_offset %= 8; if (bit_offset != 0) { int num_indexes_head = 0; - uint64_t bits_head = - util::SafeLoad(reinterpret_cast(bits)) >> bit_offset; + uint64_t bits_head = bits[0] >> bit_offset; int bits_in_first_byte = std::min(num_bits, 8 - bit_offset); bits_filter_indexes(bit_to_search, hardware_flags, bits_in_first_byte, reinterpret_cast(&bits_head), input_indexes, @@ -185,8 +215,7 @@ void bit_util::bits_to_bytes(int64_t hardware_flags, const int num_bits, bits += bit_offset / 8; bit_offset %= 8; if (bit_offset != 0) { - uint64_t bits_head = - util::SafeLoad(reinterpret_cast(bits)) >> bit_offset; + uint64_t bits_head = bits[0] >> bit_offset; int bits_in_first_byte = std::min(num_bits, 8 - bit_offset); bits_to_bytes(hardware_flags, bits_in_first_byte, reinterpret_cast(&bits_head), bytes); @@ -207,7 +236,7 @@ void bit_util::bits_to_bytes(int64_t hardware_flags, const int num_bits, #endif // Processing 8 bits at a time constexpr int unroll = 8; - for (int i = num_processed / unroll; i < (num_bits + unroll - 1) / unroll; ++i) { + for (int i = num_processed / unroll; i < num_bits / unroll; ++i) { uint8_t bits_next = bits[i]; // Clear the lowest bit and then make 8 copies of remaining 7 bits, each 7 bits apart // from the previous. @@ -219,6 +248,19 @@ void bit_util::bits_to_bytes(int64_t hardware_flags, const int num_bits, unpacked *= 255; util::SafeStore(&reinterpret_cast(bytes)[i], unpacked); } + int tail = num_bits % unroll; + if (tail) { + uint8_t bits_next = bits[(num_bits - tail) / unroll]; + // Clear the lowest bit and then make 8 copies of remaining 7 bits, each 7 bits apart + // from the previous. + uint64_t unpacked = static_cast(bits_next & 0xfe) * + ((1ULL << 7) | (1ULL << 14) | (1ULL << 21) | (1ULL << 28) | + (1ULL << 35) | (1ULL << 42) | (1ULL << 49)); + unpacked |= (bits_next & 1); + unpacked &= 0x0101010101010101ULL; + unpacked *= 255; + SafeStoreUpTo8Bytes(bytes + num_bits - tail, tail, unpacked); + } } void bit_util::bytes_to_bits(int64_t hardware_flags, const int num_bits, @@ -250,7 +292,7 @@ void bit_util::bytes_to_bits(int64_t hardware_flags, const int num_bits, #endif // Process 8 bits at a time constexpr int unroll = 8; - for (int i = num_processed / unroll; i < (num_bits + unroll - 1) / unroll; ++i) { + for (int i = num_processed / unroll; i < num_bits / unroll; ++i) { uint64_t bytes_next = util::SafeLoad(&reinterpret_cast(bytes)[i]); bytes_next &= 0x0101010101010101ULL; bytes_next |= (bytes_next >> 7); // Pairs of adjacent output bits in individual bytes @@ -258,6 +300,15 @@ void bit_util::bytes_to_bits(int64_t hardware_flags, const int num_bits, bytes_next |= (bytes_next >> 28); // All 8 output bits in the lowest byte bits[i] = static_cast(bytes_next & 0xff); } + int tail = num_bits % unroll; + if (tail) { + uint64_t bytes_next = SafeLoadUpTo8Bytes(bytes + num_bits - tail, tail); + bytes_next &= 0x0101010101010101ULL; + bytes_next |= (bytes_next >> 7); // Pairs of adjacent output bits in individual bytes + bytes_next |= (bytes_next >> 14); // 4 adjacent output bits in individual bytes + bytes_next |= (bytes_next >> 28); // All 8 output bits in the lowest byte + bits[num_bits / 8] = static_cast(bytes_next & 0xff); + } } bool bit_util::are_all_bytes_zero(int64_t hardware_flags, const uint8_t* bytes, diff --git a/cpp/src/arrow/compute/exec/util.h b/cpp/src/arrow/compute/exec/util.h index 4e7550582a4..839a8a7d29c 100644 --- a/cpp/src/arrow/compute/exec/util.h +++ b/cpp/src/arrow/compute/exec/util.h @@ -92,7 +92,7 @@ class TempVectorStack { Status Init(MemoryPool* pool, int64_t size) { num_vectors_ = 0; top_ = 0; - buffer_size_ = size; + buffer_size_ = PaddedAllocationSize(size) + kPadding + 2 * sizeof(uint64_t); ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(size, pool)); // Ensure later operations don't accidentally read uninitialized memory. std::memset(buffer->mutable_data(), 0xFF, size); @@ -193,6 +193,8 @@ class bit_util { uint32_t num_bytes); private: + inline static uint64_t SafeLoadUpTo8Bytes(const uint8_t* bytes, int num_bytes); + inline static void SafeStoreUpTo8Bytes(uint8_t* bytes, int num_bytes, uint64_t value); inline static void bits_to_indexes_helper(uint64_t word, uint16_t base_index, int* num_indexes, uint16_t* indexes); inline static void bits_filter_indexes_helper(uint64_t word, diff --git a/cpp/src/arrow/compute/light_array.cc b/cpp/src/arrow/compute/light_array.cc index 390dcc6cbf1..4fae01eb6bf 100644 --- a/cpp/src/arrow/compute/light_array.cc +++ b/cpp/src/arrow/compute/light_array.cc @@ -138,7 +138,7 @@ Result ColumnMetadataFromDataType( } Result ColumnArrayFromArrayData( - const std::shared_ptr& array_data, int start_row, int num_rows) { + const std::shared_ptr& array_data, int64_t start_row, int64_t num_rows) { ARROW_ASSIGN_OR_RAISE(KeyColumnMetadata metadata, ColumnMetadataFromDataType(array_data->type)); KeyColumnArray column_array = KeyColumnArray( @@ -165,7 +165,8 @@ Status ColumnMetadatasFromExecBatch(const ExecBatch& batch, return Status::OK(); } -Status ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row, int num_rows, +Status ColumnArraysFromExecBatch(const ExecBatch& batch, int64_t start_row, + int64_t num_rows, std::vector* column_arrays) { int num_columns = static_cast(batch.values.size()); column_arrays->resize(num_columns); diff --git a/cpp/src/arrow/compute/light_array.h b/cpp/src/arrow/compute/light_array.h index 0856e3e8aa5..dd13aa0647f 100644 --- a/cpp/src/arrow/compute/light_array.h +++ b/cpp/src/arrow/compute/light_array.h @@ -171,7 +171,7 @@ ARROW_EXPORT Result ColumnMetadataFromDataType( /// The caller should ensure this is only called on "key" columns. /// \see ColumnMetadataFromDataType for details ARROW_EXPORT Result ColumnArrayFromArrayData( - const std::shared_ptr& array_data, int start_row, int num_rows); + const std::shared_ptr& array_data, int64_t start_row, int64_t num_rows); /// \brief Create KeyColumnMetadata instances from an ExecBatch /// @@ -188,8 +188,8 @@ ARROW_EXPORT Status ColumnMetadatasFromExecBatch( /// /// All columns in `batch` must be eligible "key" columns and have an array shape /// \see ColumnArrayFromArrayData for more details -ARROW_EXPORT Status ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row, - int num_rows, +ARROW_EXPORT Status ColumnArraysFromExecBatch(const ExecBatch& batch, int64_t start_row, + int64_t num_rows, std::vector* column_arrays); /// \brief Create KeyColumnArray instances from an ExecBatch