diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 15d8574c6e1..a90f3fe49c5 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -381,6 +381,7 @@ if(ARROW_COMPUTE) compute/cast.cc compute/exec.cc compute/exec/aggregate.cc + compute/exec/accumulation_queue.cc compute/exec/aggregate_node.cc compute/exec/bloom_filter.cc compute/exec/exec_plan.cc diff --git a/cpp/src/arrow/compute/exec/accumulation_queue.cc b/cpp/src/arrow/compute/exec/accumulation_queue.cc new file mode 100644 index 00000000000..192db529428 --- /dev/null +++ b/cpp/src/arrow/compute/exec/accumulation_queue.cc @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/exec/accumulation_queue.h" + +#include + +namespace arrow { +namespace util { +using arrow::compute::ExecBatch; +AccumulationQueue::AccumulationQueue(AccumulationQueue&& that) { + this->batches_ = std::move(that.batches_); + this->row_count_ = that.row_count_; + that.Clear(); +} + +AccumulationQueue& AccumulationQueue::operator=(AccumulationQueue&& that) { + this->batches_ = std::move(that.batches_); + this->row_count_ = that.row_count_; + that.Clear(); + return *this; +} + +void AccumulationQueue::Concatenate(AccumulationQueue&& that) { + this->batches_.reserve(this->batches_.size() + that.batches_.size()); + std::move(that.batches_.begin(), that.batches_.end(), + std::back_inserter(this->batches_)); + this->row_count_ += that.row_count_; + that.Clear(); +} + +void AccumulationQueue::InsertBatch(ExecBatch batch) { + row_count_ += batch.length; + batches_.emplace_back(std::move(batch)); +} + +void AccumulationQueue::Clear() { + row_count_ = 0; + batches_.clear(); +} + +ExecBatch& AccumulationQueue::operator[](size_t i) { return batches_[i]; } +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/accumulation_queue.h b/cpp/src/arrow/compute/exec/accumulation_queue.h new file mode 100644 index 00000000000..4b23e5ffcac --- /dev/null +++ b/cpp/src/arrow/compute/exec/accumulation_queue.h @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "arrow/compute/exec.h" + +namespace arrow { +namespace util { +using arrow::compute::ExecBatch; + +/// \brief A container that accumulates batches until they are ready to +/// be processed. +class AccumulationQueue { + public: + AccumulationQueue() : row_count_(0) {} + ~AccumulationQueue() = default; + + // We should never be copying ExecBatch around + AccumulationQueue(const AccumulationQueue&) = delete; + AccumulationQueue& operator=(const AccumulationQueue&) = delete; + + AccumulationQueue(AccumulationQueue&& that); + AccumulationQueue& operator=(AccumulationQueue&& that); + + void Concatenate(AccumulationQueue&& that); + void InsertBatch(ExecBatch batch); + int64_t row_count() { return row_count_; } + size_t batch_count() { return batches_.size(); } + bool empty() const { return batches_.empty(); } + void Clear(); + ExecBatch& operator[](size_t i); + + private: + int64_t row_count_; + std::vector batches_; +}; + +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc index 9c124674d75..1ebe11e7046 100644 --- a/cpp/src/arrow/compute/exec/hash_join.cc +++ b/cpp/src/arrow/compute/exec/hash_join.cc @@ -40,70 +40,16 @@ class HashJoinBasicImpl : public HashJoinImpl { struct ThreadLocalState; public: - HashJoinBasicImpl() : num_expected_bloom_filters_(0) {} - - Status InputReceived(size_t thread_index, int side, ExecBatch batch) override { - if (cancelled_) { - return Status::Cancelled("Hash join cancelled"); - } - EVENT(span_, "InputReceived"); - - ARROW_ASSIGN_OR_RAISE(bool queued, QueueBatchIfNeeded(thread_index, side, batch)); - if (queued) { - return Status::OK(); - } else { - ARROW_DCHECK(side == 0); - return ProbeBatch(thread_index, batch); - } - } - - Status InputFinished(size_t thread_index, int side) override { - if (cancelled_) { - return Status::Cancelled("Hash join cancelled"); - } - EVENT(span_, "InputFinished", {{"side", side}}); - if (side == 0) { - bool proceed; - { - std::lock_guard lock(finished_mutex_); - proceed = !left_side_finished_ && left_queue_probe_finished_; - left_side_finished_ = true; - } - if (proceed) { - RETURN_NOT_OK(OnLeftSideAndQueueFinished(thread_index)); - } - } else { - bool proceed; - { - std::lock_guard lock(finished_mutex_); - proceed = !right_side_finished_; - right_side_finished_ = true; - } - if (proceed) { - RETURN_NOT_OK(OnRightSideFinished(thread_index)); - } - } - return Status::OK(); - } - - Status Init(ExecContext* ctx, JoinType join_type, bool use_sync_execution, - size_t /*num_threads*/, HashJoinSchema* schema_mgr, - std::vector key_cmp, Expression filter, - OutputBatchCallback output_batch_callback, - FinishedCallback finished_callback, - TaskScheduler::ScheduleImpl schedule_task_callback, - HashJoinImpl* pushdown_target, std::vector column_map) override { - // TODO(ARROW-15732) - // Each side of join might have an IO thread being called from. - // As of right now, we ignore the `num_threads` argument, so later we will have to - // re-add `num_threads_ = num_threads;` - num_threads_ = GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1; - + Status Init(ExecContext* ctx, JoinType join_type, size_t num_threads, + HashJoinSchema* schema_mgr, std::vector key_cmp, + Expression filter, OutputBatchCallback output_batch_callback, + FinishedCallback finished_callback, TaskScheduler* scheduler) override { START_COMPUTE_SPAN(span_, "HashJoinBasicImpl", {{"detail", filter.ToString()}, {"join.kind", ToString(join_type)}, - {"join.threads", static_cast(num_threads_)}}); + {"join.threads", static_cast(num_threads)}}); + num_threads_ = num_threads; ctx_ = ctx; join_type_ = join_type; schema_mgr_ = schema_mgr; @@ -111,6 +57,7 @@ class HashJoinBasicImpl : public HashJoinImpl { filter_ = std::move(filter); output_batch_callback_ = std::move(output_batch_callback); finished_callback_ = std::move(finished_callback); + scheduler_ = scheduler; local_states_.resize(num_threads_); for (size_t i = 0; i < local_states_.size(); ++i) { local_states_[i].is_initialized = false; @@ -119,39 +66,12 @@ class HashJoinBasicImpl : public HashJoinImpl { dict_probe_.Init(num_threads_); - pushdown_target_ = pushdown_target; - column_map_ = std::move(column_map); - if (pushdown_target_) pushdown_target_->ExpectBloomFilter(); - - right_input_row_count_ = 0; has_hash_table_ = false; num_batches_produced_.store(0); cancelled_ = false; - right_side_finished_ = false; - left_side_finished_ = false; - bloom_filters_ready_ = false; - left_queue_bloom_finished_ = false; - left_queue_probe_finished_ = false; - - scheduler_ = TaskScheduler::Make(); - if (pushdown_target_) { - bloom_filter_ = arrow::internal::make_unique(); - bloom_filter_builder_ = BloomFilterBuilder::Make( - use_sync_execution ? BloomFilterBuildStrategy::SINGLE_THREADED - : BloomFilterBuildStrategy::PARALLEL); - } - RegisterBuildBloomFilter(); RegisterBuildHashTable(); - RegisterBloomFilterQueuedBatches(); - RegisterProbeQueuedBatches(); RegisterScanHashTable(); - scheduler_->RegisterEnd(); - - RETURN_NOT_OK(scheduler_->StartScheduling( - 0 /*thread index*/, std::move(schedule_task_callback), - static_cast(2 * num_threads_) /*concurrent tasks*/, use_sync_execution)); - return Status::OK(); } @@ -162,32 +82,6 @@ class HashJoinBasicImpl : public HashJoinImpl { scheduler_->Abort(std::move(pos_abort_callback)); } - // Called by a downstream node after they have constructed a bloom filter - // that this node can use to filter inputs. - Status PushBloomFilter(size_t thread_index, std::unique_ptr filter, - std::vector column_map) override { - bool proceed; - { - std::lock_guard lock_bloom(bloom_filters_mutex_); - pushed_bloom_filters_.emplace_back(std::move(filter)); - bloom_filter_column_maps_.emplace_back(std::move(column_map)); - proceed = pushed_bloom_filters_.size() == num_expected_bloom_filters_; - ARROW_DCHECK(pushed_bloom_filters_.size() <= num_expected_bloom_filters_); - } - if (proceed) { - size_t num_batches; - { - std::lock_guard lock(left_batches_mutex_); - num_batches = left_batches_.size(); - bloom_filters_ready_ = true; - } - RETURN_NOT_OK(BloomFilterQueuedBatches(thread_index, num_batches)); - } - return Status::OK(); - } - - void ExpectBloomFilter() override { num_expected_bloom_filters_ += 1; } - private: void InitEncoder(int side, HashJoinProjection projection_handle, RowEncoder* encoder) { std::vector data_types; @@ -212,8 +106,6 @@ class HashJoinBasicImpl : public HashJoinImpl { if (has_payload) { InitEncoder(0, HashJoinProjection::PAYLOAD, &local_state.exec_batch_payloads); } - RETURN_NOT_OK(local_state.temp_stack.Init( - ctx_->memory_pool(), 4 * util::MiniBatch::kMiniBatchLength * sizeof(uint32_t))); local_state.is_initialized = true; } return Status::OK(); @@ -594,7 +486,7 @@ class HashJoinBasicImpl : public HashJoinImpl { } } - Status ProbeBatch(size_t thread_index, const ExecBatch& batch) { + Status ProbeSingleBatch(size_t thread_index, ExecBatch batch) override { ThreadLocalState& local_state = local_states_[thread_index]; RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index)); @@ -656,96 +548,18 @@ class HashJoinBasicImpl : public HashJoinImpl { return Status::OK(); } - Status ApplyBloomFiltersToBatch(size_t thread_index, ExecBatch& batch) { - if (batch.length == 0) return Status::OK(); - int64_t bit_vector_bytes = bit_util::BytesForBits(batch.length); - std::vector selected(bit_vector_bytes); - std::vector hashes(batch.length); - std::vector bv(bit_vector_bytes); - - RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index)); - // Start with full selection for the current batch - memset(selected.data(), 0xff, bit_vector_bytes); - for (size_t ifilter = 0; ifilter < num_expected_bloom_filters_; ifilter++) { - std::vector keys(bloom_filter_column_maps_[ifilter].size()); - for (size_t i = 0; i < keys.size(); i++) { - int input_idx = bloom_filter_column_maps_[ifilter][i]; - keys[i] = batch[input_idx]; - if (keys[i].is_scalar()) { - ARROW_ASSIGN_OR_RAISE( - keys[i], - MakeArrayFromScalar(*keys[i].scalar(), batch.length, ctx_->memory_pool())); - } - } - ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(std::move(keys))); - RETURN_NOT_OK(Hashing32::HashBatch( - key_batch, hashes.data(), ctx_->cpu_info()->hardware_flags(), - &local_states_[thread_index].temp_stack, 0, key_batch.length)); - - pushed_bloom_filters_[ifilter]->Find(ctx_->cpu_info()->hardware_flags(), - key_batch.length, hashes.data(), bv.data()); - arrow::internal::BitmapAnd(bv.data(), 0, selected.data(), 0, key_batch.length, 0, - selected.data()); - } - auto selected_buffer = - arrow::internal::make_unique(selected.data(), bit_vector_bytes); - ArrayData selected_arraydata(boolean(), batch.length, - {nullptr, std::move(selected_buffer)}); - Datum selected_datum(selected_arraydata); - FilterOptions options; - size_t first_nonscalar = batch.values.size(); - for (size_t i = 0; i < batch.values.size(); i++) { - if (!batch.values[i].is_scalar()) { - ARROW_ASSIGN_OR_RAISE(batch.values[i], - Filter(batch.values[i], selected_datum, options, ctx_)); - first_nonscalar = std::min(first_nonscalar, i); - ARROW_DCHECK_EQ(batch.values[i].length(), batch.values[first_nonscalar].length()); - } - } - // If they're all Scalar, then the length of the batch is the number of set bits - if (first_nonscalar == batch.values.size()) - batch.length = arrow::internal::CountSetBits(selected.data(), 0, batch.length); - else - batch.length = batch.values[first_nonscalar].length(); - return Status::OK(); - } - - int64_t BuildHashTable_num_tasks() { return 1; } - - Status BuildBloomFilter_exec_task(size_t thread_index, int64_t task_id) { - const ExecBatch& input_batch = right_batches_[task_id]; - SchemaProjectionMap key_to_in = - schema_mgr_->proj_maps[1].map(HashJoinProjection::KEY, HashJoinProjection::INPUT); - std::vector key_columns(key_to_in.num_cols); - for (size_t i = 0; i < key_columns.size(); i++) { - int input_idx = key_to_in.get(static_cast(i)); - key_columns[i] = input_batch[input_idx]; - if (key_columns[i].is_scalar()) { - ARROW_ASSIGN_OR_RAISE( - key_columns[i], MakeArrayFromScalar(*key_columns[i].scalar(), - input_batch.length, ctx_->memory_pool())); - } - } - ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(std::move(key_columns))); - - RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index)); - ThreadLocalState& tls = local_states_[thread_index]; - util::TempVectorHolder hash_holder(&tls.temp_stack, - util::MiniBatch::kMiniBatchLength); - uint32_t* hashes = hash_holder.mutable_data(); - for (int64_t i = 0; i < key_batch.length; i += util::MiniBatch::kMiniBatchLength) { - int64_t length = std::min(static_cast(key_batch.length - i), - static_cast(util::MiniBatch::kMiniBatchLength)); - RETURN_NOT_OK(Hashing32::HashBatch(key_batch, hashes, - ctx_->cpu_info()->hardware_flags(), - &tls.temp_stack, i, length)); - RETURN_NOT_OK(bloom_filter_builder_->PushNextBatch(thread_index, length, hashes)); - } - return Status::OK(); + void RegisterBuildHashTable() { + task_group_build_ = scheduler_->RegisterTaskGroup( + [this](size_t thread_index, int64_t task_id) -> Status { + return BuildHashTable_exec_task(thread_index, task_id); + }, + [this](size_t thread_index) -> Status { + return BuildHashTable_on_finished(thread_index); + }); } Status BuildHashTable_exec_task(size_t thread_index, int64_t /*task_id*/) { - const std::vector& batches = right_batches_; + AccumulationQueue batches = std::move(build_batches_); if (batches.empty()) { hash_table_empty_ = true; } else { @@ -756,7 +570,7 @@ class HashJoinBasicImpl : public HashJoinImpl { InitEncoder(1, HashJoinProjection::PAYLOAD, &hash_table_payloads_); } hash_table_empty_ = true; - for (size_t ibatch = 0; ibatch < batches.size(); ++ibatch) { + for (size_t ibatch = 0; ibatch < batches.batch_count(); ++ibatch) { if (cancelled_) { return Status::Cancelled("Hash join cancelled"); } @@ -789,159 +603,30 @@ class HashJoinBasicImpl : public HashJoinImpl { return Status::OK(); } - Status BuildBloomFilter_on_finished(size_t thread_index) { - if (cancelled_) return Status::Cancelled("Hash join cancelled"); - ARROW_DCHECK(pushdown_target_); - RETURN_NOT_OK(pushdown_target_->PushBloomFilter( - thread_index, std::move(bloom_filter_), std::move(column_map_))); - return BuildHashTable(thread_index); - } - Status BuildHashTable_on_finished(size_t thread_index) { - if (cancelled_) { - return Status::Cancelled("Hash join cancelled"); - } - - right_batches_.clear(); - - bool proceed; - { - std::lock_guard lock(left_batches_mutex_); - std::lock_guard lock_finish(finished_mutex_); - left_queue_bloom_finished_ = - left_queue_bloom_finished_ || num_expected_bloom_filters_ == 0; - proceed = !has_hash_table_ && left_queue_bloom_finished_; - has_hash_table_ = true; - } - if (proceed) RETURN_NOT_OK(ProbeQueuedBatches(thread_index)); - - return Status::OK(); - } - - void RegisterBuildBloomFilter() { - task_group_bloom_ = scheduler_->RegisterTaskGroup( - [this](size_t thread_index, int64_t task_id) -> Status { - return BuildBloomFilter_exec_task(thread_index, task_id); - }, - [this](size_t thread_index) -> Status { - return BuildBloomFilter_on_finished(thread_index); - }); - } - - void RegisterBuildHashTable() { - task_group_build_ = scheduler_->RegisterTaskGroup( - [this](size_t thread_index, int64_t task_id) -> Status { - return BuildHashTable_exec_task(thread_index, task_id); - }, - [this](size_t thread_index) -> Status { - return BuildHashTable_on_finished(thread_index); - }); + ARROW_DCHECK_EQ(build_batches_.batch_count(), 0); + has_hash_table_ = true; + return build_finished_callback_(thread_index); } - Status BuildBloomFilter(size_t thread_index) { - RETURN_NOT_OK(bloom_filter_builder_->Begin( - num_threads_, ctx_->cpu_info()->hardware_flags(), ctx_->memory_pool(), - right_input_row_count_, right_batches_.size(), bloom_filter_.get())); - - return scheduler_->StartTaskGroup(thread_index, task_group_bloom_, - right_batches_.size()); - } - - Status BuildHashTable(size_t thread_index) { + Status BuildHashTable(size_t thread_index, AccumulationQueue batches, + BuildFinishedCallback on_finished) override { + build_finished_callback_ = std::move(on_finished); + build_batches_ = std::move(batches); return scheduler_->StartTaskGroup(thread_index, task_group_build_, - BuildHashTable_num_tasks()); - } - - Status BloomFilterQueuedBatches_exec_task(size_t thread_index, int64_t task_id) { - if (cancelled_) return Status::Cancelled("Hash join cancelled"); - ExecBatch batch; - { - std::lock_guard lock(left_batches_mutex_); - batch = std::move(left_batches_[task_id]); - ARROW_DCHECK(!batch.values.empty()); - } - RETURN_NOT_OK(ApplyBloomFiltersToBatch(thread_index, batch)); - { - std::lock_guard lock(left_batches_mutex_); - left_batches_[task_id] = std::move(batch); - } - return Status::OK(); - } - - Status BloomFilterQueuedBatches_on_finished(size_t thread_index) { - if (cancelled_) return Status::Cancelled("Hash join cancelled"); - bool proceed; - { - std::lock_guard lock(finished_mutex_); - proceed = !left_queue_bloom_finished_ && has_hash_table_; - left_queue_bloom_finished_ = true; - } - if (proceed) return ProbeQueuedBatches(thread_index); - return Status::OK(); - } - - void RegisterBloomFilterQueuedBatches() { - task_group_bloom_filter_queued_ = scheduler_->RegisterTaskGroup( - [this](size_t thread_index, int64_t task_id) -> Status { - return BloomFilterQueuedBatches_exec_task(thread_index, task_id); - }, - [this](size_t thread_index) -> Status { - return BloomFilterQueuedBatches_on_finished(thread_index); - }); - } - - Status BloomFilterQueuedBatches(size_t thread_index, size_t num_batches) { - return scheduler_->StartTaskGroup(thread_index, task_group_bloom_filter_queued_, - num_batches); - } - - int64_t ProbeQueuedBatches_num_tasks() { - return static_cast(left_batches_.size()); - } - - Status ProbeQueuedBatches_exec_task(size_t thread_index, int64_t task_id) { - if (cancelled_) { - return Status::Cancelled("Hash join cancelled"); - } - return ProbeBatch(thread_index, std::move(left_batches_[task_id])); - } - - Status ProbeQueuedBatches_on_finished(size_t thread_index) { - if (cancelled_) { - return Status::Cancelled("Hash join cancelled"); - } - - left_batches_.clear(); - - bool proceed; - { - std::lock_guard lock(finished_mutex_); - ARROW_DCHECK(left_queue_bloom_finished_); - proceed = left_side_finished_ && !left_queue_probe_finished_; - left_queue_probe_finished_ = true; - } - if (proceed) { - RETURN_NOT_OK(OnLeftSideAndQueueFinished(thread_index)); - } - - return Status::OK(); + /*num_tasks=*/1); } - void RegisterProbeQueuedBatches() { - task_group_queued_ = scheduler_->RegisterTaskGroup( + void RegisterScanHashTable() { + task_group_scan_ = scheduler_->RegisterTaskGroup( [this](size_t thread_index, int64_t task_id) -> Status { - return ProbeQueuedBatches_exec_task(thread_index, task_id); + return ScanHashTable_exec_task(thread_index, task_id); }, [this](size_t thread_index) -> Status { - return ProbeQueuedBatches_on_finished(thread_index); + return ScanHashTable_on_finished(thread_index); }); } - Status ProbeQueuedBatches(size_t thread_index) { - return scheduler_->StartTaskGroup(thread_index, task_group_queued_, - ProbeQueuedBatches_num_tasks()); - } - int64_t ScanHashTable_num_tasks() { if (!has_hash_table_ || hash_table_empty_) { return 0; @@ -1006,58 +691,13 @@ class HashJoinBasicImpl : public HashJoinImpl { return Status::OK(); } - void RegisterScanHashTable() { - task_group_scan_ = scheduler_->RegisterTaskGroup( - [this](size_t thread_index, int64_t task_id) -> Status { - return ScanHashTable_exec_task(thread_index, task_id); - }, - [this](size_t thread_index) -> Status { - return ScanHashTable_on_finished(thread_index); - }); - } - Status ScanHashTable(size_t thread_index) { MergeHasMatch(); return scheduler_->StartTaskGroup(thread_index, task_group_scan_, ScanHashTable_num_tasks()); } - Result QueueBatchIfNeeded(size_t thread_index, int side, ExecBatch& batch) { - if (side == 0) { - // We don't want to do the filtering while holding the lock, since that can get - // expensive. - bool needs_filtering; - { - std::lock_guard lock(left_batches_mutex_); - bloom_filters_ready_ = bloom_filters_ready_ || num_expected_bloom_filters_ == 0; - needs_filtering = bloom_filters_ready_ && num_expected_bloom_filters_ != 0; - } - if (needs_filtering) RETURN_NOT_OK(ApplyBloomFiltersToBatch(thread_index, batch)); - - bool queued; - { - std::lock_guard lock(left_batches_mutex_); - queued = !bloom_filters_ready_ || !has_hash_table_; - if (queued) left_batches_.emplace_back(std::move(batch)); - } - return queued; - } else { - std::lock_guard lock(right_batches_mutex_); - right_input_row_count_ += batch.length; - right_batches_.emplace_back(std::move(batch)); - return true; - } - } - - Status OnRightSideFinished(size_t thread_index) { - if (pushdown_target_ == nullptr) { - return BuildHashTable(thread_index); - } else { - return BuildBloomFilter(thread_index); - } - } - - Status OnLeftSideAndQueueFinished(size_t thread_index) { + Status ProbingFinished(size_t thread_index) override { return ScanHashTable(thread_index); } @@ -1105,16 +745,14 @@ class HashJoinBasicImpl : public HashJoinImpl { HashJoinSchema* schema_mgr_; std::vector key_cmp_; Expression filter_; - std::unique_ptr scheduler_; - int task_group_bloom_; + TaskScheduler* scheduler_; int task_group_build_; - int task_group_bloom_filter_queued_; - int task_group_queued_; int task_group_scan_; // Callbacks // OutputBatchCallback output_batch_callback_; + BuildFinishedCallback build_finished_callback_; FinishedCallback finished_callback_; // Thread local runtime state @@ -1129,7 +767,6 @@ class HashJoinBasicImpl : public HashJoinImpl { std::vector match_right; bool is_has_match_initialized; std::vector has_match; - util::TempVectorStack temp_stack; }; std::vector local_states_; @@ -1146,34 +783,12 @@ class HashJoinBasicImpl : public HashJoinImpl { HashJoinDictBuildMulti dict_build_; HashJoinDictProbeMulti dict_probe_; - std::vector left_batches_; bool has_hash_table_; - std::mutex left_batches_mutex_; - - size_t right_input_row_count_; // Sum of the lengths of ExecBatches in right_batches_ - std::vector right_batches_; - std::mutex right_batches_mutex_; - // Bloom filter stuff - // - std::unique_ptr bloom_filter_builder_; - std::unique_ptr bloom_filter_; - std::vector column_map_; - std::vector> pushed_bloom_filters_; - std::vector> bloom_filter_column_maps_; - std::mutex bloom_filters_mutex_; - size_t num_expected_bloom_filters_; - HashJoinImpl* pushdown_target_; + AccumulationQueue build_batches_; std::atomic num_batches_produced_; bool cancelled_; - - bool bloom_filters_ready_; - bool right_side_finished_; - bool left_side_finished_; - bool left_queue_bloom_finished_; - bool left_queue_probe_finished_; - std::mutex finished_mutex_; }; Result> HashJoinImpl::MakeBasic() { diff --git a/cpp/src/arrow/compute/exec/hash_join.h b/cpp/src/arrow/compute/exec/hash_join.h index 2b3bef0ead2..97bdf166a01 100644 --- a/cpp/src/arrow/compute/exec/hash_join.h +++ b/cpp/src/arrow/compute/exec/hash_join.h @@ -21,6 +21,7 @@ #include #include +#include "arrow/compute/exec/accumulation_queue.h" #include "arrow/compute/exec/bloom_filter.h" #include "arrow/compute/exec/options.h" #include "arrow/compute/exec/schema_util.h" @@ -33,6 +34,8 @@ namespace arrow { namespace compute { +using arrow::util::AccumulationQueue; + class ARROW_EXPORT HashJoinSchema { public: Status Init(JoinType join_type, const Schema& left_schema, @@ -100,22 +103,20 @@ class ARROW_EXPORT HashJoinSchema { class HashJoinImpl { public: using OutputBatchCallback = std::function; + using BuildFinishedCallback = std::function; + using ProbeFinishedCallback = std::function; using FinishedCallback = std::function; virtual ~HashJoinImpl() = default; - virtual Status Init(ExecContext* ctx, JoinType join_type, bool use_sync_execution, - size_t num_threads, HashJoinSchema* schema_mgr, - std::vector key_cmp, Expression filter, - OutputBatchCallback output_batch_callback, - FinishedCallback finished_callback, - TaskScheduler::ScheduleImpl schedule_task_callback, - HashJoinImpl* pushdown_target, std::vector column_map) = 0; - virtual void ExpectBloomFilter() = 0; - virtual Status PushBloomFilter(size_t thread_index, - std::unique_ptr filter, - std::vector column_map) = 0; - virtual Status InputReceived(size_t thread_index, int side, ExecBatch batch) = 0; - virtual Status InputFinished(size_t thread_index, int side) = 0; + virtual Status Init(ExecContext* ctx, JoinType join_type, size_t num_threads, + HashJoinSchema* schema_mgr, std::vector key_cmp, + Expression filter, OutputBatchCallback output_batch_callback, + FinishedCallback finished_callback, TaskScheduler* scheduler) = 0; + + virtual Status BuildHashTable(size_t thread_index, AccumulationQueue batches, + BuildFinishedCallback on_finished) = 0; + virtual Status ProbeSingleBatch(size_t thread_index, ExecBatch batch) = 0; + virtual Status ProbingFinished(size_t thread_index) = 0; virtual void Abort(TaskScheduler::AbortContinuationImpl pos_abort_callback) = 0; static Result> MakeBasic(); diff --git a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc index 8d8be7f904f..0786071f997 100644 --- a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc +++ b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc @@ -36,7 +36,6 @@ namespace arrow { namespace compute { struct BenchmarkSettings { - bool bloom_filter = false; int num_threads = 1; JoinType join_type = JoinType::INNER; int batch_size = 1024; @@ -111,11 +110,16 @@ class JoinBenchmark { auto l_schema = *l_schema_builder.Finish(); auto r_schema = *r_schema_builder.Finish(); - l_batches_ = + BatchesWithSchema l_batches_with_schema = MakeRandomBatches(l_schema, settings.num_probe_batches, settings.batch_size); - r_batches_ = + BatchesWithSchema r_batches_with_schema = MakeRandomBatches(r_schema, settings.num_build_batches, settings.batch_size); + for (ExecBatch& batch : l_batches_with_schema.batches) + l_batches_.InsertBatch(std::move(batch)); + for (ExecBatch& batch : r_batches_with_schema.batches) + r_batches_.InsertBatch(std::move(batch)); + stats_.num_probe_rows = settings.num_probe_batches * settings.batch_size; ctx_ = arrow::internal::make_unique( @@ -124,27 +128,12 @@ class JoinBenchmark { schema_mgr_ = arrow::internal::make_unique(); Expression filter = literal(true); - DCHECK_OK(schema_mgr_->Init(settings.join_type, *l_batches_.schema, left_keys, - *r_batches_.schema, right_keys, filter, "l_", "r_")); + DCHECK_OK(schema_mgr_->Init(settings.join_type, *l_batches_with_schema.schema, + left_keys, *r_batches_with_schema.schema, right_keys, + filter, "l_", "r_")); join_ = *HashJoinImpl::MakeBasic(); - HashJoinImpl* bloom_filter_pushdown_target = nullptr; - std::vector key_input_map; - - bool bloom_filter_does_not_apply_to_join = - settings.join_type == JoinType::LEFT_ANTI || - settings.join_type == JoinType::LEFT_OUTER || - settings.join_type == JoinType::FULL_OUTER; - if (settings.bloom_filter && !bloom_filter_does_not_apply_to_join) { - bloom_filter_pushdown_target = join_.get(); - SchemaProjectionMap probe_key_to_input = schema_mgr_->proj_maps[0].map( - HashJoinProjection::KEY, HashJoinProjection::INPUT); - int num_keys = probe_key_to_input.num_cols; - for (int i = 0; i < num_keys; i++) - key_input_map.push_back(probe_key_to_input.get(i)); - } - omp_set_num_threads(settings.num_threads); auto schedule_callback = [](std::function func) -> Status { #pragma omp task @@ -152,39 +141,47 @@ class JoinBenchmark { return Status::OK(); }; + scheduler_ = TaskScheduler::Make(); DCHECK_OK(join_->Init( - ctx_.get(), settings.join_type, !is_parallel, settings.num_threads, - schema_mgr_.get(), std::move(key_cmp), std::move(filter), [](ExecBatch) {}, - [](int64_t x) {}, schedule_callback, bloom_filter_pushdown_target, - std::move(key_input_map))); + ctx_.get(), settings.join_type, settings.num_threads, schema_mgr_.get(), + std::move(key_cmp), std::move(filter), [](ExecBatch) {}, [](int64_t x) {}, + scheduler_.get())); + + task_group_probe_ = scheduler_->RegisterTaskGroup( + [this](size_t thread_index, int64_t task_id) -> Status { + return join_->ProbeSingleBatch(thread_index, std::move(l_batches_[task_id])); + }, + [this](size_t thread_index) -> Status { + return join_->ProbingFinished(thread_index); + }); + + scheduler_->RegisterEnd(); + + DCHECK_OK(scheduler_->StartScheduling( + 0 /*thread index*/, std::move(schedule_callback), + static_cast(2 * settings.num_threads) /*concurrent tasks*/, !is_parallel)); } void RunJoin() { #pragma omp parallel { int tid = omp_get_thread_num(); -#pragma omp for nowait - for (auto it = r_batches_.batches.begin(); it != r_batches_.batches.end(); ++it) - DCHECK_OK(join_->InputReceived(tid, /*side=*/1, *it)); -#pragma omp for nowait - for (auto it = l_batches_.batches.begin(); it != l_batches_.batches.end(); ++it) - DCHECK_OK(join_->InputReceived(tid, /*side=*/0, *it)); - -#pragma omp barrier - -#pragma omp single nowait - { DCHECK_OK(join_->InputFinished(tid, /*side=*/1)); } - -#pragma omp single nowait - { DCHECK_OK(join_->InputFinished(tid, /*side=*/0)); } +#pragma omp single + DCHECK_OK( + join_->BuildHashTable(tid, std::move(r_batches_), [this](size_t thread_index) { + return scheduler_->StartTaskGroup(thread_index, task_group_probe_, + l_batches_.batch_count()); + })); } } - BatchesWithSchema l_batches_; - BatchesWithSchema r_batches_; + std::unique_ptr scheduler_; + AccumulationQueue l_batches_; + AccumulationQueue r_batches_; std::unique_ptr schema_mgr_; std::unique_ptr join_; std::unique_ptr ctx_; + int task_group_probe_; struct { uint64_t num_probe_rows; @@ -193,11 +190,17 @@ class JoinBenchmark { static void HashJoinBasicBenchmarkImpl(benchmark::State& st, BenchmarkSettings& settings) { - JoinBenchmark bm(settings); uint64_t total_rows = 0; for (auto _ : st) { - bm.RunJoin(); - total_rows += bm.stats_.num_probe_rows; + st.PauseTiming(); + { + JoinBenchmark bm(settings); + st.ResumeTiming(); + bm.RunJoin(); + st.PauseTiming(); + total_rows += bm.stats_.num_probe_rows; + } + st.ResumeTiming(); } st.counters["rows/sec"] = benchmark::Counter(total_rows, benchmark::Counter::kIsRate); } @@ -288,16 +291,6 @@ static void BM_HashJoinBasic_NullPercentage(benchmark::State& st) { HashJoinBasicBenchmarkImpl(st, settings); } - -static void BM_HashJoinBasic_BloomFilter(benchmark::State& st, bool bloom_filter) { - BenchmarkSettings settings; - settings.bloom_filter = bloom_filter; - settings.selectivity = static_cast(st.range(0)) / 100.0; - settings.num_build_batches = static_cast(st.range(1)); - settings.num_probe_batches = settings.num_build_batches; - - HashJoinBasicBenchmarkImpl(st, settings); -} #endif std::vector hashtable_krows = benchmark::CreateRange(1, 4096, 8); @@ -425,16 +418,6 @@ BENCHMARK(BM_HashJoinBasic_BuildParallelism) BENCHMARK(BM_HashJoinBasic_NullPercentage) ->ArgNames({"Null Percentage"}) ->DenseRange(0, 100, 10); - -std::vector bloomfilter_argnames = {"Selectivity", "HashTable krows"}; -std::vector> bloomfilter_args = { - benchmark::CreateDenseRange(0, 100, 10), hashtable_krows}; -BENCHMARK_CAPTURE(BM_HashJoinBasic_BloomFilter, "Bloom Filter", true) - ->ArgNames(bloomfilter_argnames) - ->ArgsProduct(selectivity_args); -BENCHMARK_CAPTURE(BM_HashJoinBasic_BloomFilter, "No Bloom Filter", false) - ->ArgNames(bloomfilter_argnames) - ->ArgsProduct(selectivity_args); #else BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int32}", {int32()}) diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc index 60ad75228ab..baa82671259 100644 --- a/cpp/src/arrow/compute/exec/hash_join_node.cc +++ b/cpp/src/arrow/compute/exec/hash_join_node.cc @@ -15,12 +15,14 @@ // specific language governing permissions and limitations // under the License. +#include #include #include #include "arrow/compute/exec/exec_plan.h" #include "arrow/compute/exec/hash_join.h" #include "arrow/compute/exec/hash_join_dict.h" +#include "arrow/compute/exec/key_hash.h" #include "arrow/compute/exec/options.h" #include "arrow/compute/exec/schema_util.h" #include "arrow/compute/exec/util.h" @@ -470,6 +472,189 @@ Status ValidateHashJoinNodeOptions(const HashJoinNodeOptions& join_options) { return Status::OK(); } +class HashJoinNode; + +// This is a struct encapsulating things related to Bloom filters and pushing them around +// between HashJoinNodes. The general strategy is to notify other joins at plan-creation +// time for that join to expect a Bloom filter. Once the full build side has been +// accumulated for a given join, it will build the Bloom filter and push it to its +// pushdown target. Once a join has received all of its Bloom filters, it will evaluate it +// on every batch that has been queued so far as well as any new probe-side batch that +// comes in. +struct BloomFilterPushdownContext { + using BuildFinishedCallback = std::function; + using FiltersReceivedCallback = std::function; + using FilterFinishedCallback = std::function; + void Init(HashJoinNode* owner, size_t num_threads, TaskScheduler* scheduler, + FiltersReceivedCallback on_bloom_filters_received, bool disable_bloom_filter, + bool use_sync_execution); + + Status StartProducing(); + + void ExpectBloomFilter() { eval_.num_expected_bloom_filters_ += 1; } + + // Builds the Bloom filter, taking ownership of the batches until the build + // is done. + Status BuildBloomFilter(size_t thread_index, AccumulationQueue batches, + BuildFinishedCallback on_finished); + + // Sends the Bloom filter to the pushdown target. + Status PushBloomFilter(); + + // Receives a Bloom filter and its associated column map. + Status ReceiveBloomFilter(std::unique_ptr filter, + std::vector column_map) { + bool proceed; + { + std::lock_guard guard(eval_.receive_mutex_); + eval_.received_filters_.emplace_back(std::move(filter)); + eval_.received_maps_.emplace_back(std::move(column_map)); + proceed = eval_.num_expected_bloom_filters_ == eval_.received_filters_.size(); + + ARROW_DCHECK_EQ(eval_.received_filters_.size(), eval_.received_maps_.size()); + ARROW_DCHECK_LE(eval_.received_filters_.size(), eval_.num_expected_bloom_filters_); + } + if (proceed) { + return eval_.all_received_callback_(); + } + return Status::OK(); + } + + // Evaluates the Bloom filter on a group of batches, taking ownership of them + // until the whole filtering process is complete. + Status FilterBatches(size_t thread_index, AccumulationQueue batches, + FilterFinishedCallback on_finished) { + eval_.batches_ = std::move(batches); + eval_.on_finished_ = std::move(on_finished); + + if (eval_.num_expected_bloom_filters_ == 0) + return eval_.on_finished_(thread_index, std::move(eval_.batches_)); + + return scheduler_->StartTaskGroup(thread_index, eval_.task_id_, + /*num_tasks=*/eval_.batches_.batch_count()); + } + + // Applies all Bloom filters on the input batch. + Status FilterSingleBatch(size_t thread_index, ExecBatch* batch_ptr) { + ExecBatch& batch = *batch_ptr; + if (eval_.num_expected_bloom_filters_ == 0 || batch.length == 0) return Status::OK(); + + int64_t bit_vector_bytes = bit_util::BytesForBits(batch.length); + std::vector selected(bit_vector_bytes); + std::vector hashes(batch.length); + std::vector bv(bit_vector_bytes); + + ARROW_ASSIGN_OR_RAISE(util::TempVectorStack * stack, GetStack(thread_index)); + + // Start with full selection for the current batch + memset(selected.data(), 0xff, bit_vector_bytes); + for (size_t ifilter = 0; ifilter < eval_.num_expected_bloom_filters_; ifilter++) { + std::vector keys(eval_.received_maps_[ifilter].size()); + for (size_t i = 0; i < keys.size(); i++) { + int input_idx = eval_.received_maps_[ifilter][i]; + keys[i] = batch[input_idx]; + if (keys[i].is_scalar()) { + ARROW_ASSIGN_OR_RAISE( + keys[i], + MakeArrayFromScalar(*keys[i].scalar(), batch.length, ctx_->memory_pool())); + } + } + ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(std::move(keys))); + RETURN_NOT_OK(Hashing32::HashBatch(key_batch, hashes.data(), + ctx_->cpu_info()->hardware_flags(), stack, 0, + key_batch.length)); + + eval_.received_filters_[ifilter]->Find(ctx_->cpu_info()->hardware_flags(), + key_batch.length, hashes.data(), bv.data()); + arrow::internal::BitmapAnd(bv.data(), 0, selected.data(), 0, key_batch.length, 0, + selected.data()); + } + auto selected_buffer = + arrow::internal::make_unique(selected.data(), bit_vector_bytes); + ArrayData selected_arraydata(boolean(), batch.length, + {nullptr, std::move(selected_buffer)}); + Datum selected_datum(selected_arraydata); + FilterOptions options; + size_t first_nonscalar = batch.values.size(); + for (size_t i = 0; i < batch.values.size(); i++) { + if (!batch.values[i].is_scalar()) { + ARROW_ASSIGN_OR_RAISE(batch.values[i], + Filter(batch.values[i], selected_datum, options, ctx_)); + first_nonscalar = std::min(first_nonscalar, i); + ARROW_DCHECK_EQ(batch.values[i].length(), batch.values[first_nonscalar].length()); + } + } + // If they're all Scalar, then the length of the batch is the number of set bits + if (first_nonscalar == batch.values.size()) + batch.length = arrow::internal::CountSetBits(selected.data(), 0, batch.length); + else + batch.length = batch.values[first_nonscalar].length(); + return Status::OK(); + } + + private: + Status BuildBloomFilter_exec_task(size_t thread_index, int64_t task_id); + + Status BuildBloomFilter_on_finished(size_t thread_index) { + return build_.on_finished_(thread_index, std::move(build_.batches_)); + } + + // The Bloom filter is built on the build side of some upstream join. For a join to + // evaluate the Bloom filter on its input columns, it has to rearrange its input columns + // to match the column order of the Bloom filter. + // + // The first part of the pair is the HashJoin to actually perform the pushdown into. + // The second part is a mapping such that column_map[i] is the index of key i in + // the first part's input. + // If we should disable Bloom filter, returns nullptr and an empty vector, and sets + // the disable_bloom_filter_ flag. + std::pair> GetPushdownTarget(HashJoinNode* start); + + Result GetStack(size_t thread_index) { + if (!tld_[thread_index].is_init) { + RETURN_NOT_OK(tld_[thread_index].stack.Init( + ctx_->memory_pool(), 4 * util::MiniBatch::kMiniBatchLength * sizeof(uint32_t))); + tld_[thread_index].is_init = true; + } + return &tld_[thread_index].stack; + } + + bool disable_bloom_filter_; + HashJoinSchema* schema_mgr_; + ExecContext* ctx_; + TaskScheduler* scheduler_; + + struct ThreadLocalData { + bool is_init = false; + util::TempVectorStack stack; + }; + std::vector tld_; + + struct { + int task_id_; + std::unique_ptr builder_; + AccumulationQueue batches_; + BuildFinishedCallback on_finished_; + } build_; + + struct { + std::unique_ptr bloom_filter_; + HashJoinNode* pushdown_target_; + std::vector column_map_; + } push_; + + struct { + int task_id_; + size_t num_expected_bloom_filters_ = 0; + std::mutex receive_mutex_; + std::vector> received_filters_; + std::vector> received_maps_; + AccumulationQueue batches_; + FiltersReceivedCallback all_received_callback_; + FilterFinishedCallback on_finished_; + } eval_; +}; + class HashJoinNode : public ExecNode { public: HashJoinNode(ExecPlan* plan, NodeVector inputs, const HashJoinNodeOptions& join_options, @@ -533,6 +718,120 @@ class HashJoinNode : public ExecNode { const char* kind_name() const override { return "HashJoinNode"; } + Status OnBuildSideBatch(size_t thread_index, ExecBatch batch) { + std::lock_guard guard(build_side_mutex_); + build_accumulator_.InsertBatch(std::move(batch)); + return Status::OK(); + } + + Status OnBuildSideFinished(size_t thread_index) { + return pushdown_context_.BuildBloomFilter( + thread_index, std::move(build_accumulator_), + [this](size_t thread_index, AccumulationQueue batches) { + return OnBloomFilterFinished(thread_index, std::move(batches)); + }); + } + + Status OnBloomFilterFinished(size_t thread_index, AccumulationQueue batches) { + RETURN_NOT_OK(pushdown_context_.PushBloomFilter()); + return impl_->BuildHashTable( + thread_index, std::move(batches), + [this](size_t thread_index) { return OnHashTableFinished(thread_index); }); + } + + Status OnHashTableFinished(size_t thread_index) { + bool should_probe; + { + std::lock_guard guard(probe_side_mutex_); + should_probe = queued_batches_filtered_ && !hash_table_ready_; + hash_table_ready_ = true; + } + if (should_probe) { + return ProbeQueuedBatches(thread_index); + } + return Status::OK(); + } + + Status OnProbeSideBatch(size_t thread_index, ExecBatch batch) { + { + std::lock_guard guard(probe_side_mutex_); + if (!bloom_filters_ready_) { + probe_accumulator_.InsertBatch(std::move(batch)); + return Status::OK(); + } + } + RETURN_NOT_OK(pushdown_context_.FilterSingleBatch(thread_index, &batch)); + + { + std::lock_guard guard(probe_side_mutex_); + if (!hash_table_ready_) { + probe_accumulator_.InsertBatch(std::move(batch)); + return Status::OK(); + } + } + RETURN_NOT_OK(impl_->ProbeSingleBatch(thread_index, std::move(batch))); + return Status::OK(); + } + + Status OnProbeSideFinished(size_t thread_index) { + bool probing_finished; + { + std::lock_guard guard(probe_side_mutex_); + probing_finished = queued_batches_probed_ && !probe_side_finished_; + probe_side_finished_ = true; + } + if (probing_finished) return impl_->ProbingFinished(thread_index); + return Status::OK(); + } + + Status OnFiltersReceived() { + std::unique_lock guard(probe_side_mutex_); + bloom_filters_ready_ = true; + size_t thread_index = thread_indexer_(); + AccumulationQueue batches = std::move(probe_accumulator_); + guard.unlock(); + return pushdown_context_.FilterBatches( + thread_index, std::move(batches), + [this](size_t thread_index, AccumulationQueue batches) { + return OnQueuedBatchesFiltered(thread_index, std::move(batches)); + }); + } + + Status OnQueuedBatchesFiltered(size_t thread_index, AccumulationQueue batches) { + bool should_probe; + { + std::lock_guard guard(probe_side_mutex_); + probe_accumulator_.Concatenate(std::move(batches)); + should_probe = !queued_batches_filtered_ && hash_table_ready_; + queued_batches_filtered_ = true; + } + if (should_probe) { + return ProbeQueuedBatches(thread_index); + } + return Status::OK(); + } + + Status ProbeQueuedBatches(size_t thread_index) { + { + std::lock_guard guard(probe_side_mutex_); + queued_batches_to_probe_ = std::move(probe_accumulator_); + } + return scheduler_->StartTaskGroup(thread_index, task_group_probe_, + queued_batches_to_probe_.batch_count()); + } + + Status OnQueuedBatchesProbed(size_t thread_index) { + queued_batches_to_probe_.Clear(); + bool probing_finished; + { + std::lock_guard guard(probe_side_mutex_); + probing_finished = !queued_batches_probed_ && probe_side_finished_; + queued_batches_probed_ = true; + } + if (probing_finished) return impl_->ProbingFinished(thread_index); + return Status::OK(); + } + void InputReceived(ExecNode* input, ExecBatch batch) override { ARROW_DCHECK(std::find(inputs_.begin(), inputs_.end(), input) != inputs_.end()); if (complete_.load()) { @@ -547,16 +846,19 @@ class HashJoinNode : public ExecNode { START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", {{"batch.length", batch.length}}); - { - Status status = impl_->InputReceived(thread_index, side, std::move(batch)); - if (!status.ok()) { - StopProducing(); - ErrorIfNotOk(status); - return; - } + Status status = side == 0 ? OnProbeSideBatch(thread_index, std::move(batch)) + : OnBuildSideBatch(thread_index, std::move(batch)); + + if (!status.ok()) { + StopProducing(); + ErrorIfNotOk(status); + return; } + if (batch_count_[side].Increment()) { - Status status = impl_->InputFinished(thread_index, side); + status = side == 0 ? OnProbeSideFinished(thread_index) + : OnBuildSideFinished(thread_index); + if (!status.ok()) { StopProducing(); ErrorIfNotOk(status); @@ -581,7 +883,9 @@ class HashJoinNode : public ExecNode { EVENT(span_, "InputFinished", {{"side", side}, {"batches.length", total_batches}}); if (batch_count_[side].SetTotal(total_batches)) { - Status status = impl_->InputFinished(thread_index, side); + Status status = side == 0 ? OnProbeSideFinished(thread_index) + : OnBuildSideFinished(thread_index); + if (!status.ok()) { StopProducing(); ErrorIfNotOk(status); @@ -590,131 +894,42 @@ class HashJoinNode : public ExecNode { } } - // The Bloom filter is built on the build side of some upstream join. For a join to - // evaluate the Bloom filter on its input columns, it has to rearrange its input columns - // to match the column order of the Bloom filter. - // - // The first part of the pair is the HashJoin to actually perform the pushdown into. - // The second part is a mapping such that column_map[i] is the index of key i in - // the first part's input. - // If we should disable Bloom filter, returns nullptr and an empty vector, and sets - // the disable_bloom_filter_ flag. - std::pair> GetPushdownTarget() { -#if !ARROW_LITTLE_ENDIAN - // TODO (ARROW-16591): Debug bloom_filter.cc to enable on Big endian. It probably just - // needs a few byte swaps in the proper spots. - disable_bloom_filter_ = true; - return {nullptr, {}}; -#else - // A build-side Bloom filter tells us if a row is definitely not in the build side. - // This allows us to early-eliminate rows or early-accept rows depending on the type - // of join. Left Outer Join and Full Outer Join output all rows, so a build-side Bloom - // filter would only allow us to early-output. Left Antijoin outputs only if there is - // no match, so again early output. We don't implement early output for now, so we - // must disallow these types of joins. - bool bloom_filter_does_not_apply_to_join = join_type_ == JoinType::LEFT_ANTI || - join_type_ == JoinType::LEFT_OUTER || - join_type_ == JoinType::FULL_OUTER; - disable_bloom_filter_ = disable_bloom_filter_ || bloom_filter_does_not_apply_to_join; - - for (int side = 0; side <= 1 && !disable_bloom_filter_; side++) { - SchemaProjectionMap keys_to_input = schema_mgr_->proj_maps[side].map( - HashJoinProjection::KEY, HashJoinProjection::INPUT); - // Bloom filter currently doesn't support dictionaries. - for (int i = 0; i < keys_to_input.num_cols; i++) { - int idx = keys_to_input.get(i); - bool is_dict = - inputs_[side]->output_schema()->field(idx)->type()->id() == Type::DICTIONARY; - if (is_dict) { - disable_bloom_filter_ = true; - break; - } - } - } - - bool all_comparisons_is = true; - for (JoinKeyCmp cmp : key_cmp_) all_comparisons_is &= (cmp == JoinKeyCmp::IS); - - if ((join_type_ == JoinType::RIGHT_OUTER || join_type_ == JoinType::FULL_OUTER) && - all_comparisons_is) - disable_bloom_filter_ = true; - - if (disable_bloom_filter_) return {nullptr, {}}; - - // We currently only push Bloom filters on the probe side, and only if that input is - // also a join. - SchemaProjectionMap probe_key_to_input = - schema_mgr_->proj_maps[0].map(HashJoinProjection::KEY, HashJoinProjection::INPUT); - int num_keys = probe_key_to_input.num_cols; - - // A mapping such that bloom_to_target[i] is the index of key i in the pushdown - // target's input - std::vector bloom_to_target(num_keys); - HashJoinNode* pushdown_target = this; - for (int i = 0; i < num_keys; i++) bloom_to_target[i] = probe_key_to_input.get(i); - - for (ExecNode* candidate = inputs()[0]; candidate->kind_name() == this->kind_name(); - candidate = candidate->inputs()[0]) { - auto* candidate_as_join = checked_cast(candidate); - SchemaProjectionMap candidate_output_to_input = - candidate_as_join->schema_mgr_->proj_maps[0].map(HashJoinProjection::OUTPUT, - HashJoinProjection::INPUT); - - // Check if any of the keys are missing, if they are, break - bool break_outer = false; - for (int i = 0; i < num_keys; i++) { - // Since all of the probe side columns are before the build side columns, - // if the index of an output is greater than the number of probe-side input - // columns, it must have come from the candidate's build side. - if (bloom_to_target[i] >= candidate_output_to_input.num_cols) { - break_outer = true; - break; - } - int candidate_input_idx = candidate_output_to_input.get(bloom_to_target[i]); - // The output column has to have come from somewhere... - ARROW_DCHECK_NE(candidate_input_idx, schema_mgr_->kMissingField()); - } - if (break_outer) break; - - // The Bloom filter will filter out nulls, which may cause a Right/Full Outer Join - // to incorrectly output some rows with nulls padding the probe-side rows. This may - // cause a row with all null keys to be emitted. This is normally not an issue - // with EQ, but if all comparisons are IS (i.e. all-null is accepted), this could - // produce incorrect rows. - bool can_produce_build_side_nulls = - candidate_as_join->join_type_ == JoinType::RIGHT_OUTER || - candidate_as_join->join_type_ == JoinType::FULL_OUTER; - - if (all_comparisons_is || can_produce_build_side_nulls) break; - - // All keys are present, we can update the mapping - for (int i = 0; i < num_keys; i++) { - int candidate_input_idx = candidate_output_to_input.get(bloom_to_target[i]); - bloom_to_target[i] = candidate_input_idx; - } - pushdown_target = candidate_as_join; - } - return std::make_pair(pushdown_target->impl_.get(), std::move(bloom_to_target)); -#endif // ARROW_LITTLE_ENDIAN - } - Status PrepareToProduce() override { bool use_sync_execution = !(plan_->exec_context()->executor()); - size_t num_threads = use_sync_execution ? 1 : thread_indexer_.Capacity(); + // TODO(ARROW-15732) + // Each side of join might have an IO thread being called from. Once this is fixed + // we will change it back to just the CPU's thread pool capacity. + size_t num_threads = (GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1); + + scheduler_ = TaskScheduler::Make(); + pushdown_context_.Init( + this, num_threads, scheduler_.get(), [this]() { return OnFiltersReceived(); }, + disable_bloom_filter_, use_sync_execution); + + RETURN_NOT_OK(impl_->Init( + plan_->exec_context(), join_type_, num_threads, schema_mgr_.get(), key_cmp_, + filter_, [this](ExecBatch batch) { this->OutputBatchCallback(batch); }, + [this](int64_t total_num_batches) { this->FinishedCallback(total_num_batches); }, + scheduler_.get())); - HashJoinImpl* pushdown_target = nullptr; - std::vector column_map; - std::tie(pushdown_target, column_map) = GetPushdownTarget(); + task_group_probe_ = scheduler_->RegisterTaskGroup( + [this](size_t thread_index, int64_t task_id) -> Status { + return impl_->ProbeSingleBatch(thread_index, + std::move(queued_batches_to_probe_[task_id])); + }, + [this](size_t thread_index) -> Status { + return OnQueuedBatchesProbed(thread_index); + }); - return impl_->Init( - plan_->exec_context(), join_type_, use_sync_execution, num_threads, - schema_mgr_.get(), key_cmp_, filter_, - [this](ExecBatch batch) { this->OutputBatchCallback(batch); }, - [this](int64_t total_num_batches) { this->FinishedCallback(total_num_batches); }, + scheduler_->RegisterEnd(); + + RETURN_NOT_OK(scheduler_->StartScheduling( + 0 /*thread index*/, [this](std::function func) -> Status { return this->ScheduleTaskCallback(std::move(func)); }, - pushdown_target, std::move(column_map)); + static_cast(2 * num_threads) /*concurrent tasks*/, use_sync_execution)); + return Status::OK(); } Status StartProducing() override { @@ -723,7 +938,7 @@ class HashJoinNode : public ExecNode { {"node.detail", ToString()}, {"node.kind", kind_name()}}); END_SPAN_ON_FUTURE_COMPLETION(span_, finished(), this); - + RETURN_NOT_OK(pushdown_context_.StartProducing()); return Status::OK(); } @@ -797,9 +1012,229 @@ class HashJoinNode : public ExecNode { std::unique_ptr schema_mgr_; std::unique_ptr impl_; util::AsyncTaskGroup task_group_; + std::unique_ptr scheduler_; + util::AccumulationQueue build_accumulator_; + util::AccumulationQueue probe_accumulator_; + util::AccumulationQueue queued_batches_to_probe_; + + std::mutex build_side_mutex_; + std::mutex probe_side_mutex_; + + int task_group_probe_; + bool bloom_filters_ready_ = false; + bool hash_table_ready_ = false; + bool queued_batches_filtered_ = false; + bool queued_batches_probed_ = false; + bool probe_side_finished_ = false; + + friend struct BloomFilterPushdownContext; bool disable_bloom_filter_; + BloomFilterPushdownContext pushdown_context_; }; +void BloomFilterPushdownContext::Init(HashJoinNode* owner, size_t num_threads, + TaskScheduler* scheduler, + FiltersReceivedCallback on_bloom_filters_received, + bool disable_bloom_filter, + bool use_sync_execution) { + schema_mgr_ = owner->schema_mgr_.get(); + ctx_ = owner->plan_->exec_context(); + scheduler_ = scheduler; + tld_.resize(num_threads); + disable_bloom_filter_ = disable_bloom_filter; + std::tie(push_.pushdown_target_, push_.column_map_) = GetPushdownTarget(owner); + eval_.all_received_callback_ = std::move(on_bloom_filters_received); + if (!disable_bloom_filter_) { + ARROW_CHECK(push_.pushdown_target_); + push_.bloom_filter_ = arrow::internal::make_unique(); + push_.pushdown_target_->pushdown_context_.ExpectBloomFilter(); + + build_.builder_ = BloomFilterBuilder::Make( + use_sync_execution ? BloomFilterBuildStrategy::SINGLE_THREADED + : BloomFilterBuildStrategy::PARALLEL); + + build_.task_id_ = scheduler_->RegisterTaskGroup( + [this](size_t thread_index, int64_t task_id) { + return BuildBloomFilter_exec_task(thread_index, task_id); + }, + [this](size_t thread_index) { + return BuildBloomFilter_on_finished(thread_index); + }); + } + + eval_.task_id_ = scheduler_->RegisterTaskGroup( + [this](size_t thread_index, int64_t task_id) { + return FilterSingleBatch(thread_index, &eval_.batches_[task_id]); + }, + [this](size_t thread_index) { + return eval_.on_finished_(thread_index, std::move(eval_.batches_)); + }); +} + +Status BloomFilterPushdownContext::StartProducing() { + if (eval_.num_expected_bloom_filters_ == 0) return eval_.all_received_callback_(); + return Status::OK(); +} + +Status BloomFilterPushdownContext::BuildBloomFilter(size_t thread_index, + AccumulationQueue batches, + BuildFinishedCallback on_finished) { + build_.batches_ = std::move(batches); + build_.on_finished_ = std::move(on_finished); + + if (disable_bloom_filter_) + return build_.on_finished_(thread_index, std::move(build_.batches_)); + + RETURN_NOT_OK(build_.builder_->Begin( + /*num_threads=*/tld_.size(), ctx_->cpu_info()->hardware_flags(), + ctx_->memory_pool(), build_.batches_.row_count(), build_.batches_.batch_count(), + push_.bloom_filter_.get())); + + return scheduler_->StartTaskGroup(thread_index, build_.task_id_, + /*num_tasks=*/build_.batches_.batch_count()); +} + +Status BloomFilterPushdownContext::PushBloomFilter() { + if (!disable_bloom_filter_) + return push_.pushdown_target_->pushdown_context_.ReceiveBloomFilter( + std::move(push_.bloom_filter_), std::move(push_.column_map_)); + return Status::OK(); +} + +Status BloomFilterPushdownContext::BuildBloomFilter_exec_task(size_t thread_index, + int64_t task_id) { + const ExecBatch& input_batch = build_.batches_[task_id]; + SchemaProjectionMap key_to_in = + schema_mgr_->proj_maps[1].map(HashJoinProjection::KEY, HashJoinProjection::INPUT); + std::vector key_columns(key_to_in.num_cols); + for (size_t i = 0; i < key_columns.size(); i++) { + int input_idx = key_to_in.get(static_cast(i)); + key_columns[i] = input_batch[input_idx]; + if (key_columns[i].is_scalar()) { + ARROW_ASSIGN_OR_RAISE(key_columns[i], + MakeArrayFromScalar(*key_columns[i].scalar(), + input_batch.length, ctx_->memory_pool())); + } + } + ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(std::move(key_columns))); + + ARROW_ASSIGN_OR_RAISE(util::TempVectorStack * stack, GetStack(thread_index)); + util::TempVectorHolder hash_holder(stack, util::MiniBatch::kMiniBatchLength); + uint32_t* hashes = hash_holder.mutable_data(); + for (int64_t i = 0; i < key_batch.length; i += util::MiniBatch::kMiniBatchLength) { + int64_t length = std::min(static_cast(key_batch.length - i), + static_cast(util::MiniBatch::kMiniBatchLength)); + RETURN_NOT_OK(Hashing32::HashBatch( + key_batch, hashes, ctx_->cpu_info()->hardware_flags(), stack, i, length)); + RETURN_NOT_OK(build_.builder_->PushNextBatch(thread_index, length, hashes)); + } + return Status::OK(); +} + +std::pair> BloomFilterPushdownContext::GetPushdownTarget( + HashJoinNode* start) { +#if !ARROW_LITTLE_ENDIAN + // TODO (ARROW-16591): Debug bloom_filter.cc to enable on Big endian. It probably just + // needs a few byte swaps in the proper spots. + disable_bloom_filter_ = true; + return {nullptr, {}}; +#else + if (disable_bloom_filter_) return {nullptr, {}}; + JoinType join_type = start->join_type_; + + // A build-side Bloom filter tells us if a row is definitely not in the build side. + // This allows us to early-eliminate rows or early-accept rows depending on the type + // of join. Left Outer Join and Full Outer Join output all rows, so a build-side Bloom + // filter would only allow us to early-output. Left Antijoin outputs only if there is + // no match, so again early output. We don't implement early output for now, so we + // must disallow these types of joins. + bool bloom_filter_does_not_apply_to_join = join_type == JoinType::LEFT_ANTI || + join_type == JoinType::LEFT_OUTER || + join_type == JoinType::FULL_OUTER; + disable_bloom_filter_ = disable_bloom_filter_ || bloom_filter_does_not_apply_to_join; + + // Bloom filter currently doesn't support dictionaries. + for (int side = 0; side <= 1 && !disable_bloom_filter_; side++) { + SchemaProjectionMap keys_to_input = start->schema_mgr_->proj_maps[side].map( + HashJoinProjection::KEY, HashJoinProjection::INPUT); + // Bloom filter currently doesn't support dictionaries. + for (int i = 0; i < keys_to_input.num_cols; i++) { + int idx = keys_to_input.get(i); + bool is_dict = start->inputs_[side]->output_schema()->field(idx)->type()->id() == + Type::DICTIONARY; + if (is_dict) { + disable_bloom_filter_ = true; + break; + } + } + } + + bool all_comparisons_is = true; + for (JoinKeyCmp cmp : start->key_cmp_) all_comparisons_is &= (cmp == JoinKeyCmp::IS); + + if ((join_type == JoinType::RIGHT_OUTER || join_type == JoinType::FULL_OUTER) && + all_comparisons_is) + disable_bloom_filter_ = true; + + if (disable_bloom_filter_) return {nullptr, {}}; + + // We currently only push Bloom filters on the probe side, and only if that input is + // also a join. + SchemaProjectionMap probe_key_to_input = start->schema_mgr_->proj_maps[0].map( + HashJoinProjection::KEY, HashJoinProjection::INPUT); + int num_keys = probe_key_to_input.num_cols; + + // A mapping such that bloom_to_target[i] is the index of key i in the pushdown + // target's input + std::vector bloom_to_target(num_keys); + HashJoinNode* pushdown_target = start; + for (int i = 0; i < num_keys; i++) bloom_to_target[i] = probe_key_to_input.get(i); + + for (ExecNode* candidate = start->inputs()[0]; + candidate->kind_name() == start->kind_name(); candidate = candidate->inputs()[0]) { + auto* candidate_as_join = checked_cast(candidate); + SchemaProjectionMap candidate_output_to_input = + candidate_as_join->schema_mgr_->proj_maps[0].map(HashJoinProjection::OUTPUT, + HashJoinProjection::INPUT); + + // Check if any of the keys are missing, if they are, break + bool break_outer = false; + for (int i = 0; i < num_keys; i++) { + // Since all of the probe side columns are before the build side columns, + // if the index of an output is greater than the number of probe-side input + // columns, it must have come from the candidate's build side. + if (bloom_to_target[i] >= candidate_output_to_input.num_cols) { + break_outer = true; + break; + } + int candidate_input_idx = candidate_output_to_input.get(bloom_to_target[i]); + // The output column has to have come from somewhere... + ARROW_DCHECK_NE(candidate_input_idx, start->schema_mgr_->kMissingField()); + } + if (break_outer) break; + + // The Bloom filter will filter out nulls, which may cause a Right/Full Outer Join + // to incorrectly output some rows with nulls padding the probe-side rows. This may + // cause a row with all null keys to be emitted. This is normally not an issue + // with EQ, but if all comparisons are IS (i.e. all-null is accepted), this could + // produce incorrect rows. + bool can_produce_build_side_nulls = + candidate_as_join->join_type_ == JoinType::RIGHT_OUTER || + candidate_as_join->join_type_ == JoinType::FULL_OUTER; + + if (all_comparisons_is || can_produce_build_side_nulls) break; + + // All keys are present, we can update the mapping + for (int i = 0; i < num_keys; i++) { + int candidate_input_idx = candidate_output_to_input.get(bloom_to_target[i]); + bloom_to_target[i] = candidate_input_idx; + } + pushdown_target = candidate_as_join; + } + return std::make_pair(pushdown_target, std::move(bloom_to_target)); +#endif // ARROW_LITTLE_ENDIAN +} + namespace internal { void RegisterHashJoinNode(ExecFactoryRegistry* registry) { DCHECK_OK(registry->AddFactory("hashjoin", HashJoinNode::Make));