diff --git a/cpp/examples/arrow/compute_register_example.cc b/cpp/examples/arrow/compute_register_example.cc index d1a1372b82a..3c20a3d2a87 100644 --- a/cpp/examples/arrow/compute_register_example.cc +++ b/cpp/examples/arrow/compute_register_example.cc @@ -46,7 +46,7 @@ class ExampleFunctionOptionsType : public cp::FunctionOptionsType { } // optional: support for serialization // Result> Serialize(const FunctionOptions&) const override; - // Result> Deserialize(const Buffer& buffer) const override; + // Result> Deserialize(const Buffer&) const override; }; cp::FunctionOptionsType* GetExampleFunctionOptionsType() { @@ -74,8 +74,8 @@ const cp::FunctionDoc func_doc{ int main(int argc, char** argv) { const std::string name = "compute_register_example"; auto func = std::make_shared(name, cp::Arity::Unary(), &func_doc); - func->AddKernel({cp::InputType::Array(arrow::int64())}, arrow::int64(), - ExampleFunctionImpl); + ABORT_ON_FAILURE(func->AddKernel({cp::InputType::Array(arrow::int64())}, arrow::int64(), + ExampleFunctionImpl)); auto registry = cp::GetFunctionRegistry(); ABORT_ON_FAILURE(registry->AddFunction(std::move(func))); @@ -90,8 +90,8 @@ int main(int argc, char** argv) { std::cout << maybe_result->make_array()->ToString() << std::endl; - // Expression serialization will raise NotImplemented if an expression includes FunctionOptions - // for which serialization is not supported. + // Expression serialization will raise NotImplemented if an expression includes + // FunctionOptions for which serialization is not supported. auto expr = cp::call(name, {}, options); auto maybe_serialized = cp::Serialize(expr); std::cerr << maybe_serialized.status().ToString() << std::endl; diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index d2f80ce7213..cb6e91bd40e 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -218,6 +218,7 @@ set(ARROW_SRCS util/thread_pool.cc util/time.cc util/trie.cc + util/unreachable.cc util/uri.cc util/utf8.cc util/value_parsing.cc diff --git a/cpp/src/arrow/array/builder_base.h b/cpp/src/arrow/array/builder_base.h index 905b3c1b491..c2aba4e959f 100644 --- a/cpp/src/arrow/array/builder_base.h +++ b/cpp/src/arrow/array/builder_base.h @@ -50,9 +50,10 @@ class ARROW_EXPORT ArrayBuilder { public: explicit ArrayBuilder(MemoryPool* pool) : pool_(pool), null_bitmap_builder_(pool) {} - virtual ~ArrayBuilder() = default; ARROW_DEFAULT_MOVE_AND_ASSIGN(ArrayBuilder); + virtual ~ArrayBuilder() = default; + /// For nested types. Since the objects are owned by this class instance, we /// skip shared pointers and just return a raw pointer ArrayBuilder* child(int i) { return children_[i].get(); } diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h index 6c47a464b1d..cfd525ab2d6 100644 --- a/cpp/src/arrow/buffer.h +++ b/cpp/src/arrow/buffer.h @@ -416,7 +416,10 @@ class ARROW_EXPORT ResizableBuffer : public MutableBuffer { /// /// @param new_size The new size for the buffer. /// @param shrink_to_fit Whether to shrink the capacity if new size < current size - virtual Status Resize(const int64_t new_size, bool shrink_to_fit = true) = 0; + virtual Status Resize(const int64_t new_size, bool shrink_to_fit) = 0; + Status Resize(const int64_t new_size) { + return Resize(new_size, /*shrink_to_fit=*/true); + } /// Ensure that buffer has enough memory allocated to fit the indicated /// capacity (and meets the 64 byte padding requirement in Layout.md). diff --git a/cpp/src/arrow/compute/api_aggregate.h b/cpp/src/arrow/compute/api_aggregate.h index 7a6c44bd923..d66d4f1517c 100644 --- a/cpp/src/arrow/compute/api_aggregate.h +++ b/cpp/src/arrow/compute/api_aggregate.h @@ -425,7 +425,7 @@ struct ARROW_EXPORT Aggregate { /// This will be replaced by streaming execution operators. ARROW_EXPORT Result GroupBy(const std::vector& arguments, const std::vector& keys, - const std::vector& aggregates, + const std::vector& aggregates, bool use_threads = false, ExecContext* ctx = default_exec_context()); } // namespace internal diff --git a/cpp/src/arrow/compute/api_vector.h b/cpp/src/arrow/compute/api_vector.h index 9d8d4271db8..32439980f54 100644 --- a/cpp/src/arrow/compute/api_vector.h +++ b/cpp/src/arrow/compute/api_vector.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include "arrow/compute/function.h" #include "arrow/datum.h" @@ -87,7 +88,7 @@ enum class SortOrder { class ARROW_EXPORT SortKey : public util::EqualityComparable { public: explicit SortKey(std::string name, SortOrder order = SortOrder::Ascending) - : name(name), order(order) {} + : name(std::move(name)), order(order) {} using util::EqualityComparable::Equals; using util::EqualityComparable::operator==; diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 63f8d39f551..2a32c96ed3b 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -102,6 +103,12 @@ void PrintTo(const ExecBatch& batch, std::ostream* os) { } } +std::string ExecBatch::ToString() const { + std::stringstream ss; + PrintTo(*this, &ss); + return ss.str(); +} + ExecBatch ExecBatch::Slice(int64_t offset, int64_t length) const { ExecBatch out = *this; for (auto& value : out.values) { diff --git a/cpp/src/arrow/compute/exec.h b/cpp/src/arrow/compute/exec.h index de1b695de48..1b70ee244cb 100644 --- a/cpp/src/arrow/compute/exec.h +++ b/cpp/src/arrow/compute/exec.h @@ -233,6 +233,8 @@ struct ARROW_EXPORT ExecBatch { return result; } + std::string ToString() const; + ARROW_EXPORT friend void PrintTo(const ExecBatch&, std::ostream*); }; diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index 433e895c243..20c8c347cc1 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -22,21 +22,29 @@ #include #include +#include "arrow/array/concatenate.h" #include "arrow/array/util.h" #include "arrow/compute/api_vector.h" #include "arrow/compute/exec.h" #include "arrow/compute/exec/expression.h" +#include "arrow/compute/exec_internal.h" #include "arrow/compute/registry.h" #include "arrow/datum.h" #include "arrow/record_batch.h" #include "arrow/result.h" #include "arrow/util/async_generator.h" +#include "arrow/util/bit_util.h" #include "arrow/util/checked_cast.h" #include "arrow/util/logging.h" #include "arrow/util/optional.h" +#include "arrow/util/task_group.h" +#include "arrow/util/thread_pool.h" +#include "arrow/util/unreachable.h" +#include "arrow/util/vector.h" namespace arrow { +using BitUtil::CountLeadingZeros; using internal::checked_cast; using internal::checked_pointer_cast; @@ -237,6 +245,15 @@ Status ExecNode::Validate() const { return Status::OK(); } +bool ExecNode::ErrorIfNotOk(Status status) { + if (status.ok()) return false; + + for (auto out : outputs_) { + out->ErrorReceived(this, out == outputs_.back() ? std::move(status) : status); + } + return true; +} + struct SourceNode : ExecNode { SourceNode(ExecPlan* plan, std::string label, std::shared_ptr output_schema, AsyncGenerator> generator) @@ -247,8 +264,7 @@ struct SourceNode : ExecNode { const char* kind_name() override { return "SourceNode"; } [[noreturn]] static void NoInputs() { - DCHECK(false) << "no inputs; this should never be called"; - std::abort(); + Unreachable("no inputs; this should never be called"); } [[noreturn]] void InputReceived(ExecNode*, int, ExecBatch) override { NoInputs(); } [[noreturn]] void ErrorReceived(ExecNode*, Status) override { NoInputs(); } @@ -377,10 +393,7 @@ struct FilterNode : ExecNode { DCHECK_EQ(input, inputs_[0]); auto maybe_filtered = DoFilter(std::move(batch)); - if (!maybe_filtered.ok()) { - outputs_[0]->ErrorReceived(this, maybe_filtered.status()); - return; - } + if (ErrorIfNotOk(maybe_filtered.status())) return; maybe_filtered->guarantee = batch.guarantee; outputs_[0]->InputReceived(this, seq, maybe_filtered.MoveValueUnsafe()); @@ -456,10 +469,7 @@ struct ProjectNode : ExecNode { DCHECK_EQ(input, inputs_[0]); auto maybe_projected = DoProject(std::move(batch)); - if (!maybe_projected.ok()) { - outputs_[0]->ErrorReceived(this, maybe_projected.status()); - return; - } + if (ErrorIfNotOk(maybe_projected.status())) return; maybe_projected->guarantee = batch.guarantee; outputs_[0]->InputReceived(this, seq, maybe_projected.MoveValueUnsafe()); @@ -519,6 +529,47 @@ Result MakeProjectNode(ExecNode* input, std::string label, input, std::move(label), schema(std::move(fields)), std::move(exprs)); } +class AtomicCounter { + public: + AtomicCounter() = default; + + int count() const { return count_.load(); } + + util::optional total() const { + int total = total_.load(); + if (total == -1) return {}; + return total; + } + + // return true if the counter is complete + bool Increment() { + DCHECK_NE(count_.load(), total_.load()); + int count = count_.fetch_add(1) + 1; + if (count != total_.load()) return false; + return DoneOnce(); + } + + // return true if the counter is complete + bool SetTotal(int total) { + total_.store(total); + if (count_.load() != total) return false; + return DoneOnce(); + } + + // return true if the counter has not already been completed + bool Cancel() { return DoneOnce(); } + + private: + // ensure there is only one true return from Increment(), SetTotal(), or Cancel() + bool DoneOnce() { + bool expected = false; + return complete_.compare_exchange_strong(expected, true); + } + + std::atomic count_{0}, total_{-1}; + std::atomic complete_{false}; +}; + struct SinkNode : ExecNode { SinkNode(ExecNode* input, std::string label, AsyncGenerator>* generator) @@ -543,8 +594,7 @@ struct SinkNode : ExecNode { // sink nodes have no outputs from which to feel backpressure [[noreturn]] static void NoOutputs() { - DCHECK(false) << "no outputs; this should never be called"; - std::abort(); + Unreachable("no outputs; this should never be called"); } [[noreturn]] void ResumeProducing(ExecNode* output) override { NoOutputs(); } [[noreturn]] void PauseProducing(ExecNode* output) override { NoOutputs(); } @@ -560,37 +610,31 @@ struct SinkNode : ExecNode { void InputReceived(ExecNode* input, int seq_num, ExecBatch batch) override { DCHECK_EQ(input, inputs_[0]); - std::unique_lock lock(mutex_); - if (finished_.is_finished()) return; + bool did_push = producer_.Push(std::move(batch)); + if (!did_push) return; // producer_ was Closed already - ++num_received_; - if (num_received_ == emit_stop_) { - lock.unlock(); - producer_.Push(std::move(batch)); - Finish(); - return; + if (auto total = input_counter_.total()) { + DCHECK_LE(seq_num, *total); } - if (emit_stop_ != -1) { - DCHECK_LE(seq_num, emit_stop_); + if (input_counter_.Increment()) { + Finish(); } - - lock.unlock(); - producer_.Push(std::move(batch)); } void ErrorReceived(ExecNode* input, Status error) override { DCHECK_EQ(input, inputs_[0]); + producer_.Push(std::move(error)); - Finish(); + + if (input_counter_.Cancel()) { + Finish(); + } inputs_[0]->StopProducing(this); } void InputFinished(ExecNode* input, int seq_stop) override { - std::unique_lock lock(mutex_); - emit_stop_ = seq_stop; - if (num_received_ == emit_stop_) { - lock.unlock(); + if (input_counter_.SetTotal(seq_stop)) { Finish(); } } @@ -602,10 +646,7 @@ struct SinkNode : ExecNode { } } - std::mutex mutex_; - - int num_received_ = 0; - int emit_stop_ = -1; + AtomicCounter input_counter_; Future<> finished_ = Future<>::MakeFinished(); PushGenerator>::Producer producer_; @@ -646,6 +687,34 @@ std::shared_ptr MakeGeneratorReader( return out; } +class ThreadIndexer { + public: + size_t operator()() { + auto id = std::this_thread::get_id(); + + std::unique_lock lock(mutex_); + const auto& id_index = *id_to_index_.emplace(id, id_to_index_.size()).first; + + return Check(id_index.second); + } + + static size_t Capacity() { + static size_t max_size = arrow::internal::ThreadPool::DefaultCapacity(); + return max_size; + } + + private: + size_t Check(size_t thread_index) { + DCHECK_LT(thread_index, Capacity()) << "thread index " << thread_index + << " is out of range [0, " << Capacity() << ")"; + + return thread_index; + } + + std::mutex mutex_; + std::unordered_map id_to_index_; +}; + struct ScalarAggregateNode : ExecNode { ScalarAggregateNode(ExecNode* input, std::string label, std::shared_ptr output_schema, @@ -663,6 +732,7 @@ struct ScalarAggregateNode : ExecNode { for (size_t i = 0; i < kernels_.size(); ++i) { KernelContext batch_ctx{plan()->exec_context()}; batch_ctx.SetState(states_[i][thread_index].get()); + ExecBatch single_column_batch{{batch.values[i]}, batch.length}; RETURN_NOT_OK(kernels_[i]->consume(&batch_ctx, single_column_batch)); } @@ -672,24 +742,12 @@ struct ScalarAggregateNode : ExecNode { void InputReceived(ExecNode* input, int seq, ExecBatch batch) override { DCHECK_EQ(input, inputs_[0]); - std::unique_lock lock(mutex_); - auto it = - thread_indices_.emplace(std::this_thread::get_id(), thread_indices_.size()).first; - auto thread_index = it->second; - - lock.unlock(); + auto thread_index = get_thread_index_(); - Status st = DoConsume(std::move(batch), thread_index); - if (!st.ok()) { - outputs_[0]->ErrorReceived(this, std::move(st)); - return; - } + if (ErrorIfNotOk(DoConsume(std::move(batch), thread_index))) return; - lock.lock(); - ++num_received_; - st = MaybeFinish(&lock); - if (!st.ok()) { - outputs_[0]->ErrorReceived(this, std::move(st)); + if (input_counter_.Increment()) { + ErrorIfNotOk(Finish()); } } @@ -698,14 +756,11 @@ struct ScalarAggregateNode : ExecNode { outputs_[0]->ErrorReceived(this, std::move(error)); } - void InputFinished(ExecNode* input, int seq) override { + void InputFinished(ExecNode* input, int num_total) override { DCHECK_EQ(input, inputs_[0]); - std::unique_lock lock(mutex_); - num_total_ = seq; - Status st = MaybeFinish(&lock); - if (!st.ok()) { - outputs_[0]->ErrorReceived(this, std::move(st)); + if (input_counter_.SetTotal(num_total)) { + ErrorIfNotOk(Finish()); } } @@ -726,18 +781,16 @@ struct ScalarAggregateNode : ExecNode { } void StopProducing() override { + if (input_counter_.Cancel()) { + finished_.MarkFinished(); + } inputs_[0]->StopProducing(this); - finished_.MarkFinished(); } Future<> finished() override { return finished_; } private: - Status MaybeFinish(std::unique_lock* lock) { - if (num_received_ != num_total_) return Status::OK(); - - if (states_.empty()) return Status::OK(); - + Status Finish() { ExecBatch batch{{}, 1}; batch.values.resize(kernels_.size()); @@ -747,21 +800,19 @@ struct ScalarAggregateNode : ExecNode { kernels_[i], &ctx, std::move(states_[i]))); RETURN_NOT_OK(kernels_[i]->finalize(&ctx, &batch.values[i])); } - states_.clear(); - lock->unlock(); - - outputs_[0]->InputReceived(this, 0, batch); + outputs_[0]->InputReceived(this, 0, std::move(batch)); finished_.MarkFinished(); return Status::OK(); } Future<> finished_ = Future<>::MakeFinished(); std::vector kernels_; + std::vector>> states_; - std::unordered_map thread_indices_; - std::mutex mutex_; - int num_received_ = 0, num_total_ = -1; + + ThreadIndexer get_thread_index_; + AtomicCounter input_counter_; }; Result MakeScalarAggregateNode(ExecNode* input, std::string label, @@ -797,7 +848,7 @@ Result MakeScalarAggregateNode(ExecNode* input, std::string label, } KernelContext kernel_ctx{exec_ctx}; - states[i].resize(exec_ctx->executor() ? exec_ctx->executor()->GetCapacity() : 1); + states[i].resize(ThreadIndexer::Capacity()); RETURN_NOT_OK(Kernel::InitAll(&kernel_ctx, KernelInitArgs{kernels[i], { @@ -819,5 +870,426 @@ Result MakeScalarAggregateNode(ExecNode* input, std::string label, std::move(states)); } +namespace internal { + +Result> GetKernels( + ExecContext* ctx, const std::vector& aggregates, + const std::vector& in_descrs); + +Result>> InitKernels( + const std::vector& kernels, ExecContext* ctx, + const std::vector& aggregates, + const std::vector& in_descrs); + +Result ResolveKernels( + const std::vector& aggregates, + const std::vector& kernels, + const std::vector>& states, ExecContext* ctx, + const std::vector& descrs); + +} // namespace internal + +struct GroupByNode : ExecNode { + GroupByNode(ExecNode* input, std::string label, std::shared_ptr output_schema, + ExecContext* ctx, const std::vector&& key_field_ids, + const std::vector&& agg_src_field_ids, + const std::vector&& aggs, + const std::vector&& agg_kernels) + : ExecNode(input->plan(), std::move(label), {input}, {"groupby"}, + std::move(output_schema), /*num_outputs=*/1), + ctx_(ctx), + key_field_ids_(std::move(key_field_ids)), + agg_src_field_ids_(std::move(agg_src_field_ids)), + aggs_(std::move(aggs)), + agg_kernels_(std::move(agg_kernels)) {} + + const char* kind_name() override { return "GroupByNode"; } + + Status Consume(ExecBatch batch) { + size_t thread_index = get_thread_index_(); + if (thread_index >= local_states_.size()) { + return Status::IndexError("thread index ", thread_index, " is out of range [0, ", + local_states_.size(), ")"); + } + + auto state = &local_states_[thread_index]; + RETURN_NOT_OK(InitLocalStateIfNeeded(state)); + + // Create a batch with key columns + std::vector keys(key_field_ids_.size()); + for (size_t i = 0; i < key_field_ids_.size(); ++i) { + keys[i] = batch.values[key_field_ids_[i]]; + } + ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(keys)); + + // Create a batch with group ids + ARROW_ASSIGN_OR_RAISE(Datum id_batch, state->grouper->Consume(key_batch)); + + // Execute aggregate kernels + for (size_t i = 0; i < agg_kernels_.size(); ++i) { + KernelContext kernel_ctx{ctx_}; + kernel_ctx.SetState(state->agg_states[i].get()); + + ARROW_ASSIGN_OR_RAISE( + auto agg_batch, + ExecBatch::Make({batch.values[agg_src_field_ids_[i]], id_batch})); + + RETURN_NOT_OK(agg_kernels_[i]->resize(&kernel_ctx, state->grouper->num_groups())); + RETURN_NOT_OK(agg_kernels_[i]->consume(&kernel_ctx, agg_batch)); + } + + return Status::OK(); + } + + Status Merge() { + ThreadLocalState* state0 = &local_states_[0]; + for (size_t i = 1; i < local_states_.size(); ++i) { + ThreadLocalState* state = &local_states_[i]; + if (!state->grouper) { + continue; + } + + ARROW_ASSIGN_OR_RAISE(ExecBatch other_keys, state->grouper->GetUniques()); + ARROW_ASSIGN_OR_RAISE(Datum transposition, state0->grouper->Consume(other_keys)); + state->grouper.reset(); + + for (size_t i = 0; i < agg_kernels_.size(); ++i) { + KernelContext batch_ctx{ctx_}; + DCHECK(state0->agg_states[i]); + batch_ctx.SetState(state0->agg_states[i].get()); + + RETURN_NOT_OK(agg_kernels_[i]->resize(&batch_ctx, state0->grouper->num_groups())); + RETURN_NOT_OK(agg_kernels_[i]->merge(&batch_ctx, std::move(*state->agg_states[i]), + *transposition.array())); + state->agg_states[i].reset(); + } + } + return Status::OK(); + } + + Result Finalize() { + ThreadLocalState* state = &local_states_[0]; + + ExecBatch out_data{{}, state->grouper->num_groups()}; + out_data.values.resize(agg_kernels_.size() + key_field_ids_.size()); + + // Aggregate fields come before key fields to match the behavior of GroupBy function + for (size_t i = 0; i < agg_kernels_.size(); ++i) { + KernelContext batch_ctx{ctx_}; + batch_ctx.SetState(state->agg_states[i].get()); + RETURN_NOT_OK(agg_kernels_[i]->finalize(&batch_ctx, &out_data.values[i])); + state->agg_states[i].reset(); + } + + ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, state->grouper->GetUniques()); + std::move(out_keys.values.begin(), out_keys.values.end(), + out_data.values.begin() + agg_kernels_.size()); + state->grouper.reset(); + + if (output_counter_.SetTotal( + static_cast(BitUtil::CeilDiv(out_data.length, output_batch_size())))) { + // this will be hit if out_data.length == 0 + finished_.MarkFinished(); + } + return out_data; + } + + void OutputNthBatch(int n) { + // bail if StopProducing was called + if (finished_.is_finished()) return; + + int64_t batch_size = output_batch_size(); + outputs_[0]->InputReceived(this, n, out_data_.Slice(batch_size * n, batch_size)); + + if (output_counter_.Increment()) { + finished_.MarkFinished(); + } + } + + Status OutputResult() { + RETURN_NOT_OK(Merge()); + ARROW_ASSIGN_OR_RAISE(out_data_, Finalize()); + + int num_output_batches = *output_counter_.total(); + outputs_[0]->InputFinished(this, num_output_batches); + + auto executor = ctx_->executor(); + for (int i = 0; i < num_output_batches; ++i) { + if (executor) { + // bail if StopProducing was called + if (finished_.is_finished()) break; + + RETURN_NOT_OK(executor->Spawn([this, i] { OutputNthBatch(i); })); + } else { + OutputNthBatch(i); + } + } + + return Status::OK(); + } + + void InputReceived(ExecNode* input, int seq, ExecBatch batch) override { + // bail if StopProducing was called + if (finished_.is_finished()) return; + + DCHECK_EQ(input, inputs_[0]); + + if (ErrorIfNotOk(Consume(std::move(batch)))) return; + + if (input_counter_.Increment()) { + ErrorIfNotOk(OutputResult()); + } + } + + void ErrorReceived(ExecNode* input, Status error) override { + DCHECK_EQ(input, inputs_[0]); + + outputs_[0]->ErrorReceived(this, std::move(error)); + } + + void InputFinished(ExecNode* input, int num_total) override { + // bail if StopProducing was called + if (finished_.is_finished()) return; + + DCHECK_EQ(input, inputs_[0]); + + if (input_counter_.SetTotal(num_total)) { + ErrorIfNotOk(OutputResult()); + } + } + + Status StartProducing() override { + finished_ = Future<>::Make(); + + local_states_.resize(ThreadIndexer::Capacity()); + return Status::OK(); + } + + void PauseProducing(ExecNode* output) override {} + + void ResumeProducing(ExecNode* output) override {} + + void StopProducing(ExecNode* output) override { + DCHECK_EQ(output, outputs_[0]); + + if (input_counter_.Cancel()) { + finished_.MarkFinished(); + } else if (output_counter_.Cancel()) { + finished_.MarkFinished(); + } + inputs_[0]->StopProducing(this); + } + + void StopProducing() override { StopProducing(outputs_[0]); } + + Future<> finished() override { return finished_; } + + private: + struct ThreadLocalState { + std::unique_ptr grouper; + std::vector> agg_states; + }; + + ThreadLocalState* GetLocalState() { + size_t thread_index = get_thread_index_(); + return &local_states_[thread_index]; + } + + Status InitLocalStateIfNeeded(ThreadLocalState* state) { + // Get input schema + auto input_schema = inputs_[0]->output_schema(); + + if (state->grouper != nullptr) return Status::OK(); + + // Build vector of key field data types + std::vector key_descrs(key_field_ids_.size()); + for (size_t i = 0; i < key_field_ids_.size(); ++i) { + auto key_field_id = key_field_ids_[i]; + key_descrs[i] = ValueDescr(input_schema->field(key_field_id)->type()); + } + + // Construct grouper + ARROW_ASSIGN_OR_RAISE(state->grouper, internal::Grouper::Make(key_descrs, ctx_)); + + // Build vector of aggregate source field data types + std::vector agg_src_descrs(agg_kernels_.size()); + for (size_t i = 0; i < agg_kernels_.size(); ++i) { + auto agg_src_field_id = agg_src_field_ids_[i]; + agg_src_descrs[i] = + ValueDescr(input_schema->field(agg_src_field_id)->type(), ValueDescr::ARRAY); + } + + ARROW_ASSIGN_OR_RAISE( + state->agg_states, + internal::InitKernels(agg_kernels_, ctx_, aggs_, agg_src_descrs)); + + return Status::OK(); + } + + int output_batch_size() const { + int result = static_cast(ctx_->exec_chunksize()); + if (result < 0) { + result = 32 * 1024; + } + return result; + } + + ExecContext* ctx_; + Future<> finished_ = Future<>::MakeFinished(); + + const std::vector key_field_ids_; + const std::vector agg_src_field_ids_; + const std::vector aggs_; + const std::vector agg_kernels_; + + ThreadIndexer get_thread_index_; + AtomicCounter input_counter_, output_counter_; + + std::vector local_states_; + ExecBatch out_data_; +}; + +Result MakeGroupByNode(ExecNode* input, std::string label, + std::vector keys, + std::vector agg_srcs, + std::vector aggs) { + // Get input schema + auto input_schema = input->output_schema(); + + // Find input field indices for key fields + std::vector key_field_ids(keys.size()); + for (size_t i = 0; i < keys.size(); ++i) { + ARROW_ASSIGN_OR_RAISE(auto match, FieldRef(keys[i]).FindOne(*input_schema)); + key_field_ids[i] = match[0]; + } + + // Find input field indices for aggregates + std::vector agg_src_field_ids(aggs.size()); + for (size_t i = 0; i < aggs.size(); ++i) { + ARROW_ASSIGN_OR_RAISE(auto match, FieldRef(agg_srcs[i]).FindOne(*input_schema)); + agg_src_field_ids[i] = match[0]; + } + + // Build vector of aggregate source field data types + DCHECK_EQ(agg_srcs.size(), aggs.size()); + std::vector agg_src_descrs(aggs.size()); + for (size_t i = 0; i < aggs.size(); ++i) { + auto agg_src_field_id = agg_src_field_ids[i]; + agg_src_descrs[i] = + ValueDescr(input_schema->field(agg_src_field_id)->type(), ValueDescr::ARRAY); + } + + auto ctx = input->plan()->exec_context(); + + // Construct aggregates + ARROW_ASSIGN_OR_RAISE(auto agg_kernels, + internal::GetKernels(ctx, aggs, agg_src_descrs)); + + ARROW_ASSIGN_OR_RAISE(auto agg_states, + internal::InitKernels(agg_kernels, ctx, aggs, agg_src_descrs)); + + ARROW_ASSIGN_OR_RAISE( + FieldVector agg_result_fields, + internal::ResolveKernels(aggs, agg_kernels, agg_states, ctx, agg_src_descrs)); + + // Build field vector for output schema + FieldVector output_fields{keys.size() + aggs.size()}; + + // Aggregate fields come before key fields to match the behavior of GroupBy function + for (size_t i = 0; i < aggs.size(); ++i) { + output_fields[i] = agg_result_fields[i]; + } + size_t base = aggs.size(); + for (size_t i = 0; i < keys.size(); ++i) { + int key_field_id = key_field_ids[i]; + output_fields[base + i] = input_schema->field(key_field_id); + } + + auto aggs_copy = aggs; + + return input->plan()->EmplaceNode( + input, std::move(label), schema(std::move(output_fields)), ctx, + std::move(key_field_ids), std::move(agg_src_field_ids), std::move(aggs), + std::move(agg_kernels)); +} + +Result GroupByUsingExecPlan(const std::vector& arguments, + const std::vector& keys, + const std::vector& aggregates, + bool use_threads, ExecContext* ctx) { + using arrow::compute::detail::ExecBatchIterator; + + FieldVector scan_fields(arguments.size() + keys.size()); + std::vector keys_str(keys.size()); + std::vector arguments_str(arguments.size()); + for (size_t i = 0; i < arguments.size(); ++i) { + arguments_str[i] = std::string("agg_") + std::to_string(i); + scan_fields[i] = field(arguments_str[i], arguments[i].type()); + } + for (size_t i = 0; i < keys.size(); ++i) { + keys_str[i] = std::string("key_") + std::to_string(i); + scan_fields[arguments.size() + i] = field(keys_str[i], keys[i].type()); + } + + std::vector scan_batches; + std::vector inputs; + for (const auto& argument : arguments) { + inputs.push_back(argument); + } + for (const auto& key : keys) { + inputs.push_back(key); + } + ARROW_ASSIGN_OR_RAISE(auto batch_iterator, + ExecBatchIterator::Make(inputs, ctx->exec_chunksize())); + ExecBatch batch; + while (batch_iterator->Next(&batch)) { + if (batch.length == 0) continue; + scan_batches.push_back(batch); + } + + ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make(ctx)); + auto source = MakeSourceNode( + plan.get(), "source", schema(std::move(scan_fields)), + MakeVectorGenerator(arrow::internal::MapVector( + [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, + std::move(scan_batches)))); + + ARROW_ASSIGN_OR_RAISE( + auto gby, MakeGroupByNode(source, "gby", keys_str, arguments_str, aggregates)); + auto sink_gen = MakeSinkNode(gby, "sink"); + + RETURN_NOT_OK(plan->Validate()); + RETURN_NOT_OK(plan->StartProducing()); + + auto collected_fut = CollectAsyncGenerator(sink_gen); + + auto start_and_collect = + AllComplete({plan->finished(), Future<>(collected_fut)}) + .Then([collected_fut]() -> Result> { + ARROW_ASSIGN_OR_RAISE(auto collected, collected_fut.result()); + return ::arrow::internal::MapVector( + [](util::optional batch) { return std::move(*batch); }, + std::move(collected)); + }); + + std::vector output_batches = + start_and_collect.MoveResult().MoveValueUnsafe(); + + ArrayDataVector out_data(arguments.size() + keys.size()); + for (size_t i = 0; i < arguments.size() + keys.size(); ++i) { + std::vector> arrays(output_batches.size()); + for (size_t j = 0; j < output_batches.size(); ++j) { + arrays[j] = output_batches[j].values[i].make_array(); + } + ARROW_ASSIGN_OR_RAISE(auto concatenated_array, Concatenate(arrays)); + out_data[i] = concatenated_array->data(); + } + + int64_t length = out_data[0]->length; + return ArrayData::Make(struct_(gby->output_schema()->fields()), length, + {/*null_bitmap=*/nullptr}, std::move(out_data), + /*null_count=*/0); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index c36c174af05..07bb365bbc7 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -222,6 +222,10 @@ class ARROW_EXPORT ExecNode { std::vector input_labels, std::shared_ptr output_schema, int num_outputs); + // A helper method to send an error status to all outputs. + // Returns true if the status was an error. + bool ErrorIfNotOk(Status status); + ExecPlan* plan_; std::string label_; @@ -283,5 +287,19 @@ ARROW_EXPORT Result MakeScalarAggregateNode(ExecNode* input, std::string label, std::vector aggregates); +/// \brief Make a node which groups input rows based on key fields and computes +/// aggregates for each group +ARROW_EXPORT +Result MakeGroupByNode(ExecNode* input, std::string label, + std::vector keys, + std::vector agg_srcs, + std::vector aggs); + +ARROW_EXPORT +Result GroupByUsingExecPlan(const std::vector& arguments, + const std::vector& keys, + const std::vector& aggregates, + bool use_threads, ExecContext* ctx); + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index bcb63c25b3a..aa807468bcb 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. +#include + #include #include -#include - #include "arrow/compute/exec.h" #include "arrow/compute/exec/exec_plan.h" #include "arrow/compute/exec/expression.h" @@ -382,10 +382,8 @@ TEST(ExecPlanExecution, SourceFilterSink) { MakeTestSourceNode(plan.get(), "source", basic_data, /*parallel=*/false, /*slow=*/false)); - ASSERT_OK_AND_ASSIGN(auto predicate, - equal(field_ref("i32"), literal(6)).Bind(*basic_data.schema)); - - ASSERT_OK_AND_ASSIGN(auto filter, MakeFilterNode(source, "filter", predicate)); + ASSERT_OK_AND_ASSIGN( + auto filter, MakeFilterNode(source, "filter", equal(field_ref("i32"), literal(6)))); auto sink_gen = MakeSinkNode(filter, "sink"); @@ -424,6 +422,106 @@ TEST(ExecPlanExecution, SourceProjectSink) { "[[null, 6], [true, 7], [true, 8]]")})))); } +namespace { + +BatchesWithSchema MakeGroupableBatches(int multiplicity = 1) { + BatchesWithSchema out; + + out.batches = {ExecBatchFromJSON({int32(), utf8()}, R"([ + [12, "alfa"], + [7, "beta"], + [3, "alfa"] + ])"), + ExecBatchFromJSON({int32(), utf8()}, R"([ + [-2, "alfa"], + [-1, "gama"], + [3, "alfa"] + ])"), + ExecBatchFromJSON({int32(), utf8()}, R"([ + [5, "gama"], + [3, "beta"], + [-8, "alfa"] + ])")}; + + size_t batch_count = out.batches.size(); + for (int repeat = 1; repeat < multiplicity; ++repeat) { + for (size_t i = 0; i < batch_count; ++i) { + out.batches.push_back(out.batches[i]); + } + } + + out.schema = schema({field("i32", int32()), field("str", utf8())}); + + return out; +} +} // namespace + +TEST(ExecPlanExecution, SourceGroupedSum) { + for (bool parallel : {false, true}) { + SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); + + auto input = MakeGroupableBatches(/*multiplicity=*/parallel ? 100 : 1); + + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + + ASSERT_OK_AND_ASSIGN(auto source, + MakeTestSourceNode(plan.get(), "source", input, + /*parallel=*/parallel, /*slow=*/false)); + ASSERT_OK_AND_ASSIGN( + auto gby, MakeGroupByNode(source, "gby", /*keys=*/{"str"}, /*targets=*/{"i32"}, + {{"hash_sum", nullptr}})); + auto sink_gen = MakeSinkNode(gby, "sink"); + + ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), + Finishes(ResultWith(UnorderedElementsAreArray({ExecBatchFromJSON( + {int64(), utf8()}, + parallel ? R"([[800, "alfa"], [1000, "beta"], [400, "gama"]])" + : R"([[8, "alfa"], [10, "beta"], [4, "gama"]])")})))); + } +} + +TEST(ExecPlanExecution, SourceFilterProjectGroupedSumFilter) { + for (bool parallel : {false, true}) { + SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); + + int batch_multiplicity = parallel ? 100 : 1; + auto input = MakeGroupableBatches(/*multiplicity=*/batch_multiplicity); + + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + + ASSERT_OK_AND_ASSIGN(auto source, + MakeTestSourceNode(plan.get(), "source", input, + /*parallel=*/parallel, /*slow=*/false)); + ASSERT_OK_AND_ASSIGN( + auto filter, + MakeFilterNode(source, "filter", greater_equal(field_ref("i32"), literal(0)))); + + ASSERT_OK_AND_ASSIGN( + auto projection, + MakeProjectNode(filter, "project", + { + field_ref("str"), + call("multiply", {field_ref("i32"), literal(2)}), + })); + + ASSERT_OK_AND_ASSIGN(auto gby, MakeGroupByNode(projection, "gby", /*keys=*/{"str"}, + /*targets=*/{"multiply(i32, 2)"}, + {{"hash_sum", nullptr}})); + + ASSERT_OK_AND_ASSIGN( + auto having, + MakeFilterNode(gby, "having", + greater(field_ref("hash_sum"), literal(10 * batch_multiplicity)))); + + auto sink_gen = MakeSinkNode(having, "sink"); + + ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), + Finishes(ResultWith(UnorderedElementsAreArray({ExecBatchFromJSON( + {int64(), utf8()}, parallel ? R"([[3600, "alfa"], [2000, "beta"]])" + : R"([[36, "alfa"], [20, "beta"]])")})))); + } +} + TEST(ExecPlanExecution, SourceScalarAggSink) { ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); diff --git a/cpp/src/arrow/compute/exec/test_util.cc b/cpp/src/arrow/compute/exec/test_util.cc index 8cc6200ea40..b47d6087c0b 100644 --- a/cpp/src/arrow/compute/exec/test_util.cc +++ b/cpp/src/arrow/compute/exec/test_util.cc @@ -17,6 +17,9 @@ #include "arrow/compute/exec/test_util.h" +#include +#include + #include #include #include @@ -27,9 +30,6 @@ #include #include -#include -#include - #include "arrow/compute/exec.h" #include "arrow/compute/exec/exec_plan.h" #include "arrow/datum.h" diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h index 36d20c7289e..099bd95bbf2 100644 --- a/cpp/src/arrow/compute/kernel.h +++ b/cpp/src/arrow/compute/kernel.h @@ -52,7 +52,7 @@ struct ARROW_EXPORT KernelState { /// \brief Context/state for the execution of a particular kernel. class ARROW_EXPORT KernelContext { public: - explicit KernelContext(ExecContext* exec_ctx) : exec_ctx_(exec_ctx), state_() {} + explicit KernelContext(ExecContext* exec_ctx) : exec_ctx_(exec_ctx) {} /// \brief Allocate buffer from the context's memory pool. The contents are /// not initialized. @@ -80,7 +80,7 @@ class ARROW_EXPORT KernelContext { private: ExecContext* exec_ctx_; - KernelState* state_; + KernelState* state_ = NULLPTR; }; /// \brief The standard kernel execution API that must be implemented for @@ -693,10 +693,12 @@ struct ScalarAggregateKernel : public Kernel { // ---------------------------------------------------------------------- // HashAggregateKernel (for HashAggregateFunction) +using HashAggregateResize = std::function; + using HashAggregateConsume = std::function; using HashAggregateMerge = - std::function; + std::function; // Finalize returns Datum to permit multiple return values using HashAggregateFinalize = std::function; @@ -706,6 +708,7 @@ using HashAggregateFinalize = std::function; /// kernel are the init, consume, merge, and finalize functions. /// /// * init: creates a new KernelState for a kernel. +/// * resize: ensure that the KernelState can accommodate the specified number of groups. /// * consume: processes an ExecBatch (which includes the argument as well /// as an array of group identifiers) and updates the KernelState found in the /// KernelContext. @@ -716,20 +719,24 @@ struct HashAggregateKernel : public Kernel { HashAggregateKernel() = default; HashAggregateKernel(std::shared_ptr sig, KernelInit init, - HashAggregateConsume consume, HashAggregateMerge merge, - HashAggregateFinalize finalize) + HashAggregateResize resize, HashAggregateConsume consume, + HashAggregateMerge merge, HashAggregateFinalize finalize) : Kernel(std::move(sig), std::move(init)), + resize(std::move(resize)), consume(std::move(consume)), merge(std::move(merge)), finalize(std::move(finalize)) {} HashAggregateKernel(std::vector in_types, OutputType out_type, - KernelInit init, HashAggregateMerge merge, - HashAggregateConsume consume, HashAggregateFinalize finalize) + KernelInit init, HashAggregateConsume consume, + HashAggregateResize resize, HashAggregateMerge merge, + HashAggregateFinalize finalize) : HashAggregateKernel( KernelSignature::Make(std::move(in_types), std::move(out_type)), - std::move(init), std::move(consume), std::move(merge), std::move(finalize)) {} + std::move(init), std::move(resize), std::move(consume), std::move(merge), + std::move(finalize)) {} + HashAggregateResize resize; HashAggregateConsume consume; HashAggregateMerge merge; HashAggregateFinalize finalize; diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h index 5163d3fd03d..3d02b273066 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h +++ b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include "arrow/compute/api_aggregate.h" #include "arrow/compute/kernels/aggregate_internal.h" @@ -233,9 +234,8 @@ struct MinMaxImpl : public ScalarAggregator { using ThisType = MinMaxImpl; using StateType = MinMaxState; - MinMaxImpl(const std::shared_ptr& out_type, - const ScalarAggregateOptions& options) - : out_type(out_type), options(options) {} + MinMaxImpl(std::shared_ptr out_type, ScalarAggregateOptions options) + : out_type(std::move(out_type)), options(std::move(options)) {} Status Consume(KernelContext*, const ExecBatch& batch) override { if (batch[0].is_array()) { diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index ed40a6b1b8c..79213b93b37 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -17,7 +17,9 @@ #include #include +#include #include +#include #include #include @@ -39,6 +41,8 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/cpu_info.h" #include "arrow/util/make_unique.h" +#include "arrow/util/task_group.h" +#include "arrow/util/thread_pool.h" #include "arrow/visitor_inline.h" namespace arrow { @@ -748,67 +752,128 @@ struct GrouperFastImpl : Grouper { /// Implementations should be default constructible and perform initialization in /// Init(). struct GroupedAggregator : KernelState { - virtual Status Init(ExecContext*, const FunctionOptions*, - const std::shared_ptr&) = 0; + virtual Status Init(ExecContext*, const FunctionOptions*) = 0; + + virtual Status Resize(int64_t new_num_groups) = 0; virtual Status Consume(const ExecBatch& batch) = 0; - virtual Result Finalize() = 0; + virtual Status Merge(GroupedAggregator&& other, const ArrayData& group_id_mapping) = 0; - template - Status MaybeReserve(int64_t old_num_groups, const ExecBatch& batch, - const Reserve& reserve) { - int64_t new_num_groups = batch[2].scalar_as().value; - if (new_num_groups <= old_num_groups) { - return Status::OK(); - } - return reserve(new_num_groups - old_num_groups); - } + virtual Result Finalize() = 0; virtual std::shared_ptr out_type() const = 0; }; +template +Result> HashAggregateInit(KernelContext* ctx, + const KernelInitArgs& args) { + auto impl = ::arrow::internal::make_unique(); + RETURN_NOT_OK(impl->Init(ctx->exec_context(), args.options)); + return std::move(impl); +} + +HashAggregateKernel MakeKernel(InputType argument_type, KernelInit init) { + HashAggregateKernel kernel; + + kernel.init = std::move(init); + + kernel.signature = KernelSignature::Make( + {std::move(argument_type), InputType::Array(Type::UINT32)}, + OutputType( + [](KernelContext* ctx, const std::vector&) -> Result { + return checked_cast(ctx->state())->out_type(); + })); + + kernel.resize = [](KernelContext* ctx, int64_t num_groups) { + return checked_cast(ctx->state())->Resize(num_groups); + }; + + kernel.consume = [](KernelContext* ctx, const ExecBatch& batch) { + return checked_cast(ctx->state())->Consume(batch); + }; + + kernel.merge = [](KernelContext* ctx, KernelState&& other, + const ArrayData& group_id_mapping) { + return checked_cast(ctx->state()) + ->Merge(checked_cast(other), group_id_mapping); + }; + + kernel.finalize = [](KernelContext* ctx, Datum* out) { + ARROW_ASSIGN_OR_RAISE(*out, + checked_cast(ctx->state())->Finalize()); + return Status::OK(); + }; + + return kernel; +} + +Status AddHashAggKernels( + const std::vector>& types, + Result make_kernel(const std::shared_ptr&), + HashAggregateFunction* function) { + for (const auto& ty : types) { + ARROW_ASSIGN_OR_RAISE(auto kernel, make_kernel(ty)); + RETURN_NOT_OK(function->AddKernel(std::move(kernel))); + } + return Status::OK(); +} + // ---------------------------------------------------------------------- // Count implementation struct GroupedCountImpl : public GroupedAggregator { - Status Init(ExecContext* ctx, const FunctionOptions* options, - const std::shared_ptr&) override { + Status Init(ExecContext* ctx, const FunctionOptions* options) override { options_ = checked_cast(*options); counts_ = BufferBuilder(ctx->memory_pool()); return Status::OK(); } - Status Consume(const ExecBatch& batch) override { - RETURN_NOT_OK(MaybeReserve(num_groups_, batch, [&](int64_t added_groups) { - num_groups_ += added_groups; - return counts_.Append(added_groups * sizeof(int64_t), 0); - })); + Status Resize(int64_t new_num_groups) override { + auto added_groups = new_num_groups - num_groups_; + num_groups_ = new_num_groups; + return counts_.Append(added_groups * sizeof(int64_t), 0); + } + + Status Merge(GroupedAggregator&& raw_other, + const ArrayData& group_id_mapping) override { + auto other = checked_cast(&raw_other); - auto group_ids = batch[1].array()->GetValues(1); - auto raw_counts = reinterpret_cast(counts_.mutable_data()); + auto counts = reinterpret_cast(counts_.mutable_data()); + auto other_counts = reinterpret_cast(other->counts_.mutable_data()); + + auto g = group_id_mapping.GetValues(1); + for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, ++g) { + counts[*g] += other_counts[other_g]; + } + return Status::OK(); + } + + Status Consume(const ExecBatch& batch) override { + auto counts = reinterpret_cast(counts_.mutable_data()); const auto& input = batch[0].array(); - if (!options_.skip_nulls) { - if (input->GetNullCount() != 0) { - for (int64_t i = 0, input_i = input->offset; i < input->length; ++i, ++input_i) { - auto g = group_ids[i]; - raw_counts[g] += !BitUtil::GetBit(input->buffers[0]->data(), input_i); - } + if (options_.skip_nulls) { + auto g_begin = + reinterpret_cast(batch[1].array()->buffers[1]->data()); + + arrow::internal::VisitSetBitRunsVoid(input->buffers[0], input->offset, + input->length, + [&](int64_t offset, int64_t length) { + auto g = g_begin + offset; + for (int64_t i = 0; i < length; ++i, ++g) { + counts[*g] += 1; + } + }); + } else if (input->MayHaveNulls()) { + auto g = batch[1].array()->GetValues(1); + + auto end = input->offset + input->length; + for (int64_t i = input->offset; i < end; ++i, ++g) { + counts[*g] += !BitUtil::GetBit(input->buffers[0]->data(), i); } - return Status::OK(); } - - arrow::internal::VisitSetBitRunsVoid( - input->buffers[0], input->offset, input->length, - [&](int64_t begin, int64_t length) { - for (int64_t input_i = begin, i = begin - input->offset; - input_i < begin + length; ++input_i, ++i) { - auto g = group_ids[i]; - raw_counts[g] += 1; - } - }); return Status::OK(); } @@ -827,72 +892,58 @@ struct GroupedCountImpl : public GroupedAggregator { // ---------------------------------------------------------------------- // Sum implementation +template struct GroupedSumImpl : public GroupedAggregator { - // NB: whether we are accumulating into double, int64_t, or uint64_t - // we always have 64 bits per group in the sums buffer. - static constexpr size_t kSumSize = sizeof(int64_t); - - using ConsumeImpl = std::function&, - const uint32_t*, void*, int64_t*)>; - - struct GetConsumeImpl { - template ::Type> - Status Visit(const T&) { - consume_impl = [](const std::shared_ptr& input, const uint32_t* group, - void* boxed_sums, int64_t* counts) { - auto sums = reinterpret_cast::CType*>(boxed_sums); - - VisitArrayDataInline( - *input, - [&](typename TypeTraits::CType value) { - sums[*group] += value; - counts[*group] += 1; - ++group; - }, - [&] { ++group; }); - }; - out_type = TypeTraits::type_singleton(); - return Status::OK(); - } - - Status Visit(const HalfFloatType& type) { - return Status::NotImplemented("Summing data of type ", type); - } + using AccType = typename FindAccumulatorType::Type; + using SumType = typename TypeTraits::CType; - Status Visit(const DataType& type) { - return Status::NotImplemented("Summing data of type ", type); - } - - ConsumeImpl consume_impl; - std::shared_ptr out_type; - }; - - Status Init(ExecContext* ctx, const FunctionOptions*, - const std::shared_ptr& input_type) override { + Status Init(ExecContext* ctx, const FunctionOptions*) override { pool_ = ctx->memory_pool(); sums_ = BufferBuilder(pool_); counts_ = BufferBuilder(pool_); + out_type_ = TypeTraits::type_singleton(); + return Status::OK(); + } - GetConsumeImpl get_consume_impl; - RETURN_NOT_OK(VisitTypeInline(*input_type, &get_consume_impl)); - - consume_impl_ = std::move(get_consume_impl.consume_impl); - out_type_ = std::move(get_consume_impl.out_type); - + Status Resize(int64_t new_num_groups) override { + auto added_groups = new_num_groups - num_groups_; + num_groups_ = new_num_groups; + RETURN_NOT_OK(sums_.Append(added_groups * sizeof(AccType), 0)); + RETURN_NOT_OK(counts_.Append(added_groups * sizeof(int64_t), 0)); return Status::OK(); } Status Consume(const ExecBatch& batch) override { - RETURN_NOT_OK(MaybeReserve(num_groups_, batch, [&](int64_t added_groups) { - num_groups_ += added_groups; - RETURN_NOT_OK(sums_.Append(added_groups * kSumSize, 0)); - RETURN_NOT_OK(counts_.Append(added_groups * sizeof(int64_t), 0)); - return Status::OK(); - })); + auto sums = reinterpret_cast(sums_.mutable_data()); + auto counts = reinterpret_cast(counts_.mutable_data()); + + auto g = batch[1].array()->GetValues(1); + VisitArrayDataInline( + *batch[0].array(), + [&](typename TypeTraits::CType value) { + sums[*g] += value; + counts[*g] += 1; + ++g; + }, + [&] { ++g; }); + return Status::OK(); + } + + Status Merge(GroupedAggregator&& raw_other, + const ArrayData& group_id_mapping) override { + auto other = checked_cast(&raw_other); + + auto counts = reinterpret_cast(counts_.mutable_data()); + auto sums = reinterpret_cast(sums_.mutable_data()); + + auto other_counts = reinterpret_cast(other->counts_.mutable_data()); + auto other_sums = reinterpret_cast(other->sums_.mutable_data()); - auto group_ids = batch[1].array()->GetValues(1); - consume_impl_(batch[0].array(), group_ids, sums_.mutable_data(), - reinterpret_cast(counts_.mutable_data())); + auto g = group_id_mapping.GetValues(1); + for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, ++g) { + counts[*g] += other_counts[other_g]; + sums[*g] += other_sums[other_g]; + } return Status::OK(); } @@ -925,120 +976,118 @@ struct GroupedSumImpl : public GroupedAggregator { int64_t num_groups_ = 0; BufferBuilder sums_, counts_; std::shared_ptr out_type_; - ConsumeImpl consume_impl_; MemoryPool* pool_; }; +struct GroupedSumFactory { + template ::Type> + Status Visit(const T&) { + kernel = MakeKernel(std::move(argument_type), HashAggregateInit>); + return Status::OK(); + } + + Status Visit(const HalfFloatType& type) { + return Status::NotImplemented("Summing data of type ", type); + } + + Status Visit(const DataType& type) { + return Status::NotImplemented("Summing data of type ", type); + } + + static Result Make(const std::shared_ptr& type) { + GroupedSumFactory factory; + factory.argument_type = InputType::Array(type); + RETURN_NOT_OK(VisitTypeInline(*type, &factory)); + return std::move(factory.kernel); + } + + HashAggregateKernel kernel; + InputType argument_type; +}; + // ---------------------------------------------------------------------- // MinMax implementation template -struct Extrema : std::numeric_limits {}; +struct AntiExtrema { + static constexpr CType anti_min() { return std::numeric_limits::max(); } + static constexpr CType anti_max() { return std::numeric_limits::min(); } +}; template <> -struct Extrema { - static constexpr float min() { return -std::numeric_limits::infinity(); } - static constexpr float max() { return std::numeric_limits::infinity(); } +struct AntiExtrema { + static constexpr float anti_min() { return std::numeric_limits::infinity(); } + static constexpr float anti_max() { return -std::numeric_limits::infinity(); } }; template <> -struct Extrema { - static constexpr double min() { return -std::numeric_limits::infinity(); } - static constexpr double max() { return std::numeric_limits::infinity(); } +struct AntiExtrema { + static constexpr double anti_min() { return std::numeric_limits::infinity(); } + static constexpr double anti_max() { return -std::numeric_limits::infinity(); } }; +template struct GroupedMinMaxImpl : public GroupedAggregator { - using ConsumeImpl = - std::function&, const uint32_t*, void*, void*, - uint8_t*, uint8_t*)>; - - using ResizeImpl = std::function; - - template - static ResizeImpl MakeResizeImpl(CType anti_extreme) { - // resize a min or max buffer, storing the correct anti extreme - return [anti_extreme](BufferBuilder* builder, int64_t added_groups) { - TypedBufferBuilder typed_builder(std::move(*builder)); - RETURN_NOT_OK(typed_builder.Append(added_groups, anti_extreme)); - *builder = std::move(*typed_builder.bytes_builder()); - return Status::OK(); - }; - } + using CType = typename TypeTraits::CType; - struct GetImpl { - template ::CType> - enable_if_number Visit(const T&) { - consume_impl = [](const std::shared_ptr& input, const uint32_t* group, - void* mins, void* maxes, uint8_t* has_values, - uint8_t* has_nulls) { - auto raw_mins = reinterpret_cast(mins); - auto raw_maxes = reinterpret_cast(maxes); - - VisitArrayDataInline( - *input, - [&](CType val) { - raw_maxes[*group] = std::max(raw_maxes[*group], val); - raw_mins[*group] = std::min(raw_mins[*group], val); - BitUtil::SetBit(has_values, *group++); - }, - [&] { BitUtil::SetBit(has_nulls, *group++); }); - }; - - resize_min_impl = MakeResizeImpl(Extrema::max()); - resize_max_impl = MakeResizeImpl(Extrema::min()); - return Status::OK(); - } - - Status Visit(const BooleanType& type) { - return Status::NotImplemented("Grouped MinMax data of type ", type); - } - - Status Visit(const HalfFloatType& type) { - return Status::NotImplemented("Grouped MinMax data of type ", type); - } - - Status Visit(const DataType& type) { - return Status::NotImplemented("Grouped MinMax data of type ", type); - } - - ConsumeImpl consume_impl; - ResizeImpl resize_min_impl, resize_max_impl; - }; - - Status Init(ExecContext* ctx, const FunctionOptions* options, - const std::shared_ptr& input_type) override { + Status Init(ExecContext* ctx, const FunctionOptions* options) override { options_ = *checked_cast(options); - type_ = input_type; - - mins_ = BufferBuilder(ctx->memory_pool()); - maxes_ = BufferBuilder(ctx->memory_pool()); + type_ = TypeTraits::type_singleton(); + mins_ = TypedBufferBuilder(ctx->memory_pool()); + maxes_ = TypedBufferBuilder(ctx->memory_pool()); has_values_ = TypedBufferBuilder(ctx->memory_pool()); has_nulls_ = TypedBufferBuilder(ctx->memory_pool()); + return Status::OK(); + } - GetImpl get_impl; - RETURN_NOT_OK(VisitTypeInline(*input_type, &get_impl)); - - consume_impl_ = std::move(get_impl.consume_impl); - resize_min_impl_ = std::move(get_impl.resize_min_impl); - resize_max_impl_ = std::move(get_impl.resize_max_impl); - + Status Resize(int64_t new_num_groups) override { + auto added_groups = new_num_groups - num_groups_; + num_groups_ = new_num_groups; + RETURN_NOT_OK(mins_.Append(added_groups, AntiExtrema::anti_min())); + RETURN_NOT_OK(maxes_.Append(added_groups, AntiExtrema::anti_max())); + RETURN_NOT_OK(has_values_.Append(added_groups, false)); + RETURN_NOT_OK(has_nulls_.Append(added_groups, false)); return Status::OK(); } Status Consume(const ExecBatch& batch) override { - RETURN_NOT_OK(MaybeReserve(num_groups_, batch, [&](int64_t added_groups) { - num_groups_ += added_groups; - RETURN_NOT_OK(resize_min_impl_(&mins_, added_groups)); - RETURN_NOT_OK(resize_max_impl_(&maxes_, added_groups)); - RETURN_NOT_OK(has_values_.Append(added_groups, false)); - RETURN_NOT_OK(has_nulls_.Append(added_groups, false)); - return Status::OK(); - })); + auto g = batch[1].array()->GetValues(1); + auto raw_mins = reinterpret_cast(mins_.mutable_data()); + auto raw_maxes = reinterpret_cast(maxes_.mutable_data()); + + VisitArrayDataInline( + *batch[0].array(), + [&](CType val) { + raw_maxes[*g] = std::max(raw_maxes[*g], val); + raw_mins[*g] = std::min(raw_mins[*g], val); + BitUtil::SetBit(has_values_.mutable_data(), *g++); + }, + [&] { BitUtil::SetBit(has_nulls_.mutable_data(), *g++); }); + return Status::OK(); + } + + Status Merge(GroupedAggregator&& raw_other, + const ArrayData& group_id_mapping) override { + auto other = checked_cast(&raw_other); + + auto raw_mins = reinterpret_cast(mins_.mutable_data()); + auto raw_maxes = reinterpret_cast(maxes_.mutable_data()); + + auto other_raw_mins = reinterpret_cast(other->mins_.mutable_data()); + auto other_raw_maxes = reinterpret_cast(other->maxes_.mutable_data()); + + auto g = group_id_mapping.GetValues(1); + for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, ++g) { + raw_mins[*g] = std::min(raw_mins[*g], other_raw_mins[other_g]); + raw_maxes[*g] = std::max(raw_maxes[*g], other_raw_maxes[other_g]); - auto group_ids = batch[1].array()->GetValues(1); - consume_impl_(batch[0].array(), group_ids, mins_.mutable_data(), - maxes_.mutable_data(), has_values_.mutable_data(), - has_nulls_.mutable_data()); + if (BitUtil::GetBit(other->has_values_.data(), other_g)) { + BitUtil::SetBit(has_values_.mutable_data(), *g); + } + if (BitUtil::GetBit(other->has_nulls_.data(), other_g)) { + BitUtil::SetBit(has_nulls_.mutable_data(), *g); + } + } return Status::OK(); } @@ -1067,52 +1116,40 @@ struct GroupedMinMaxImpl : public GroupedAggregator { } int64_t num_groups_; - BufferBuilder mins_, maxes_; + TypedBufferBuilder mins_, maxes_; TypedBufferBuilder has_values_, has_nulls_; std::shared_ptr type_; - ConsumeImpl consume_impl_; - ResizeImpl resize_min_impl_, resize_max_impl_; ScalarAggregateOptions options_; }; -template -HashAggregateKernel MakeKernel(InputType argument_type) { - HashAggregateKernel kernel; - - kernel.init = [](KernelContext* ctx, - const KernelInitArgs& args) -> Result> { - auto impl = ::arrow::internal::make_unique(); - // FIXME(bkietz) Init should not take a type. That should be an unboxed template arg - // for the Impl. Otherwise we're not exposing dispatch as well as we should. - RETURN_NOT_OK(impl->Init(ctx->exec_context(), args.options, args.inputs[0].type)); - return std::move(impl); - }; +struct GroupedMinMaxFactory { + template + enable_if_number Visit(const T&) { + kernel = + MakeKernel(std::move(argument_type), HashAggregateInit>); + return Status::OK(); + } - kernel.signature = KernelSignature::Make( - {std::move(argument_type), InputType::Array(Type::UINT32), - InputType::Scalar(Type::UINT32)}, - OutputType( - [](KernelContext* ctx, const std::vector&) -> Result { - return checked_cast(ctx->state())->out_type(); - })); + Status Visit(const HalfFloatType& type) { + return Status::NotImplemented("Summing data of type ", type); + } - kernel.consume = [](KernelContext* ctx, const ExecBatch& batch) { - return checked_cast(ctx->state())->Consume(batch); - }; + Status Visit(const DataType& type) { + return Status::NotImplemented("Summing data of type ", type); + } - kernel.merge = [](KernelContext* ctx, KernelState&&, KernelState*) { - // TODO(ARROW-11840) merge two hash tables - return Status::NotImplemented("Merge hashed aggregations"); - }; + static Result Make(const std::shared_ptr& type) { + GroupedMinMaxFactory factory; + factory.argument_type = InputType::Array(type); + RETURN_NOT_OK(VisitTypeInline(*type, &factory)); + return std::move(factory.kernel); + } - kernel.finalize = [](KernelContext* ctx, Datum* out) { - ARROW_ASSIGN_OR_RAISE(*out, - checked_cast(ctx->state())->Finalize()); - return Status::OK(); - }; + HashAggregateKernel kernel; + InputType argument_type; +}; - return kernel; -} +} // namespace Result> GetKernels( ExecContext* ctx, const std::vector& aggregates, @@ -1129,8 +1166,7 @@ Result> GetKernels( ctx->func_registry()->GetFunction(aggregates[i].function)); ARROW_ASSIGN_OR_RAISE( const Kernel* kernel, - function->DispatchExact( - {in_descrs[i], ValueDescr::Array(uint32()), ValueDescr::Scalar(uint32())})); + function->DispatchExact({in_descrs[i], ValueDescr::Array(uint32())})); kernels[i] = static_cast(kernel); } return kernels; @@ -1154,13 +1190,13 @@ Result>> InitKernels( KernelContext kernel_ctx{ctx}; ARROW_ASSIGN_OR_RAISE( - states[i], kernels[i]->init(&kernel_ctx, KernelInitArgs{kernels[i], - { - in_descrs[i].type, - uint32(), - uint32(), - }, - options})); + states[i], + kernels[i]->init(&kernel_ctx, KernelInitArgs{kernels[i], + { + in_descrs[i], + ValueDescr::Array(uint32()), + }, + options})); } return std::move(states); @@ -1179,17 +1215,14 @@ Result ResolveKernels( ARROW_ASSIGN_OR_RAISE(auto descr, kernels[i]->signature->out_type().Resolve( &kernel_ctx, { - descrs[i].type, - uint32(), - uint32(), + descrs[i], + ValueDescr::Array(uint32()), })); fields[i] = field(aggregates[i].function, std::move(descr.type)); } return fields; } -} // namespace - Result> Grouper::Make(const std::vector& descrs, ExecContext* ctx) { if (GrouperFastImpl::CanUse(descrs)) { @@ -1199,7 +1232,13 @@ Result> Grouper::Make(const std::vector& de } Result GroupBy(const std::vector& arguments, const std::vector& keys, - const std::vector& aggregates, ExecContext* ctx) { + const std::vector& aggregates, bool use_threads, + ExecContext* ctx) { + auto task_group = + use_threads + ? arrow::internal::TaskGroup::MakeThreaded(arrow::internal::GetCpuThreadPool()) + : arrow::internal::TaskGroup::MakeSerial(); + // Construct and initialize HashAggregateKernels ARROW_ASSIGN_OR_RAISE(auto argument_descrs, ExecBatch::Make(arguments).Map( @@ -1207,24 +1246,33 @@ Result GroupBy(const std::vector& arguments, const std::vector>> states( + task_group->parallelism()); + for (auto& state : states) { + ARROW_ASSIGN_OR_RAISE(state, InitKernels(kernels, ctx, aggregates, argument_descrs)); + } ARROW_ASSIGN_OR_RAISE( FieldVector out_fields, - ResolveKernels(aggregates, kernels, states, ctx, argument_descrs)); + ResolveKernels(aggregates, kernels, states[0], ctx, argument_descrs)); using arrow::compute::detail::ExecBatchIterator; ARROW_ASSIGN_OR_RAISE(auto argument_batch_iterator, ExecBatchIterator::Make(arguments, ctx->exec_chunksize())); - // Construct Grouper + // Construct Groupers ARROW_ASSIGN_OR_RAISE(auto key_descrs, ExecBatch::Make(keys).Map([](ExecBatch batch) { return batch.GetDescriptors(); })); - ARROW_ASSIGN_OR_RAISE(auto grouper, Grouper::Make(key_descrs, ctx)); + std::vector> groupers(task_group->parallelism()); + for (auto& grouper : groupers) { + ARROW_ASSIGN_OR_RAISE(grouper, Grouper::Make(key_descrs, ctx)); + } + + std::mutex mutex; + std::unordered_map thread_ids; int i = 0; for (ValueDescr& key_descr : key_descrs) { @@ -1240,16 +1288,49 @@ Result GroupBy(const std::vector& arguments, const std::vectorNext(&key_batch)) { if (key_batch.length == 0) continue; - // compute a batch of group ids - ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(key_batch)); + task_group->Append([&, key_batch, argument_batch] { + size_t thread_index; + { + std::unique_lock lock(mutex); + auto it = thread_ids.emplace(std::this_thread::get_id(), thread_ids.size()).first; + thread_index = it->second; + DCHECK_LT(static_cast(thread_index), task_group->parallelism()); + } + + auto grouper = groupers[thread_index].get(); + + // compute a batch of group ids + ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(key_batch)); + + // consume group ids with HashAggregateKernels + for (size_t i = 0; i < kernels.size(); ++i) { + KernelContext batch_ctx{ctx}; + batch_ctx.SetState(states[thread_index][i].get()); + ARROW_ASSIGN_OR_RAISE(auto batch, ExecBatch::Make({argument_batch[i], id_batch})); + RETURN_NOT_OK(kernels[i]->resize(&batch_ctx, grouper->num_groups())); + RETURN_NOT_OK(kernels[i]->consume(&batch_ctx, batch)); + } + + return Status::OK(); + }); + } + + RETURN_NOT_OK(task_group->Finish()); + + // Merge if necessary + for (size_t thread_index = 1; thread_index < thread_ids.size(); ++thread_index) { + ARROW_ASSIGN_OR_RAISE(ExecBatch other_keys, groupers[thread_index]->GetUniques()); + ARROW_ASSIGN_OR_RAISE(Datum transposition, groupers[0]->Consume(other_keys)); + groupers[thread_index].reset(); - // consume group ids with HashAggregateKernels for (size_t i = 0; i < kernels.size(); ++i) { KernelContext batch_ctx{ctx}; - batch_ctx.SetState(states[i].get()); - ARROW_ASSIGN_OR_RAISE(auto batch, ExecBatch::Make({argument_batch[i], id_batch, - Datum(grouper->num_groups())})); - RETURN_NOT_OK(kernels[i]->consume(&batch_ctx, batch)); + batch_ctx.SetState(states[0][i].get()); + + RETURN_NOT_OK(kernels[i]->resize(&batch_ctx, groupers[0]->num_groups())); + RETURN_NOT_OK(kernels[i]->merge(&batch_ctx, std::move(*states[thread_index][i]), + *transposition.array())); + states[thread_index][i].reset(); } } @@ -1259,13 +1340,13 @@ Result GroupBy(const std::vector& arguments, const std::vectorfinalize(&batch_ctx, &out)); *it++ = out.array(); } - ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, grouper->GetUniques()); + ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, groupers[0]->GetUniques()); for (const auto& key : out_keys.values) { *it++ = key.array(); } @@ -1332,18 +1413,18 @@ namespace { const FunctionDoc hash_count_doc{"Count the number of null / non-null values", ("By default, non-null values are counted.\n" "This can be changed through ScalarAggregateOptions."), - {"array", "group_id_array", "group_count"}, + {"array", "group_id_array"}, "ScalarAggregateOptions"}; const FunctionDoc hash_sum_doc{"Sum values of a numeric array", ("Null values are ignored."), - {"array", "group_id_array", "group_count"}}; + {"array", "group_id_array"}}; const FunctionDoc hash_min_max_doc{ "Compute the minimum and maximum values of a numeric array", ("Null values are ignored by default.\n" "This can be changed through ScalarAggregateOptions."), - {"array", "group_id_array", "group_count"}, + {"array", "group_id_array"}, "ScalarAggregateOptions"}; } // namespace @@ -1351,25 +1432,32 @@ void RegisterHashAggregateBasic(FunctionRegistry* registry) { { static auto default_scalar_aggregate_options = ScalarAggregateOptions::Defaults(); auto func = std::make_shared( - "hash_count", Arity::Ternary(), &hash_count_doc, + "hash_count", Arity::Binary(), &hash_count_doc, &default_scalar_aggregate_options); - DCHECK_OK(func->AddKernel(MakeKernel(ValueDescr::ARRAY))); + + DCHECK_OK(func->AddKernel( + MakeKernel(ValueDescr::ARRAY, HashAggregateInit))); DCHECK_OK(registry->AddFunction(std::move(func))); } { - auto func = std::make_shared("hash_sum", Arity::Ternary(), + auto func = std::make_shared("hash_sum", Arity::Binary(), &hash_sum_doc); - DCHECK_OK(func->AddKernel(MakeKernel(ValueDescr::ARRAY))); + DCHECK_OK(AddHashAggKernels({boolean()}, GroupedSumFactory::Make, func.get())); + DCHECK_OK(AddHashAggKernels(SignedIntTypes(), GroupedSumFactory::Make, func.get())); + DCHECK_OK(AddHashAggKernels(UnsignedIntTypes(), GroupedSumFactory::Make, func.get())); + DCHECK_OK( + AddHashAggKernels(FloatingPointTypes(), GroupedSumFactory::Make, func.get())); DCHECK_OK(registry->AddFunction(std::move(func))); } { static auto default_scalar_aggregate_options = ScalarAggregateOptions::Defaults(); auto func = std::make_shared( - "hash_min_max", Arity::Ternary(), &hash_min_max_doc, + "hash_min_max", Arity::Binary(), &hash_min_max_doc, &default_scalar_aggregate_options); - DCHECK_OK(func->AddKernel(MakeKernel(ValueDescr::ARRAY))); + DCHECK_OK(AddHashAggKernels({boolean()}, GroupedSumFactory::Make, func.get())); + DCHECK_OK(AddHashAggKernels(NumericTypes(), GroupedMinMaxFactory::Make, func.get())); DCHECK_OK(registry->AddFunction(std::move(func))); } } diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc index 8c8a4b23932..b0327c7aa81 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc @@ -30,6 +30,9 @@ #include "arrow/compute/api_scalar.h" #include "arrow/compute/api_vector.h" #include "arrow/compute/cast.h" +#include "arrow/compute/exec.h" +#include "arrow/compute/exec/exec_plan.h" +#include "arrow/compute/exec/test_util.h" #include "arrow/compute/kernels/aggregate_internal.h" #include "arrow/compute/kernels/codegen_internal.h" #include "arrow/compute/kernels/test_util.h" @@ -38,6 +41,7 @@ #include "arrow/testing/generator.h" #include "arrow/testing/gtest_common.h" #include "arrow/testing/gtest_util.h" +#include "arrow/testing/matchers.h" #include "arrow/testing/random.h" #include "arrow/type.h" #include "arrow/type_traits.h" @@ -46,6 +50,7 @@ #include "arrow/util/int_util_internal.h" #include "arrow/util/key_value_metadata.h" #include "arrow/util/logging.h" +#include "arrow/util/thread_pool.h" using testing::HasSubstr; @@ -121,6 +126,27 @@ void ValidateGroupBy(const std::vector& aggregates, AssertDatumsEqual(expected, actual, /*verbose=*/true); } +ExecContext* small_chunksize_context(bool use_threads = false) { + static ExecContext ctx, + ctx_with_threads{default_memory_pool(), arrow::internal::GetCpuThreadPool()}; + ctx.set_exec_chunksize(2); + ctx_with_threads.set_exec_chunksize(2); + return use_threads ? &ctx_with_threads : &ctx; +} + +Result GroupByTest( + const std::vector& arguments, const std::vector& keys, + const std::vector<::arrow::compute::internal::Aggregate>& aggregates, + bool use_threads, bool use_exec_plan) { + if (use_exec_plan) { + return GroupByUsingExecPlan(arguments, keys, aggregates, use_threads, + small_chunksize_context(use_threads)); + } else { + return internal::GroupBy(arguments, keys, aggregates, use_threads, + default_exec_context()); + } +} + } // namespace TEST(Grouper, SupportedKeys) { @@ -175,12 +201,19 @@ struct TestGrouper { } void ExpectConsume(const std::string& key_json, const std::string& expected) { - ExpectConsume(ExecBatch(*RecordBatchFromJSON(key_schema_, key_json)), + ExpectConsume(ExecBatchFromJSON(descrs_, key_json), ArrayFromJSON(uint32(), expected)); } - void ExpectConsume(const std::vector& key_batch, Datum expected) { - ExpectConsume(*ExecBatch::Make(key_batch), expected); + void ExpectConsume(const std::vector& key_values, Datum expected) { + ASSERT_OK_AND_ASSIGN(auto key_batch, ExecBatch::Make(key_values)); + ExpectConsume(key_batch, expected); + } + + void ExpectConsume(const ExecBatch& key_batch, Datum expected) { + Datum ids; + ConsumeAndValidate(key_batch, &ids); + AssertEquivalentIds(expected, ids); } void AssertEquivalentIds(const Datum& expected, const Datum& actual) { @@ -190,10 +223,8 @@ struct TestGrouper { int64_t num_ids = left->length(); auto left_data = left->data(); auto right_data = right->data(); - const uint32_t* left_ids = - reinterpret_cast(left_data->buffers[1]->data()); - const uint32_t* right_ids = - reinterpret_cast(right_data->buffers[1]->data()); + auto left_ids = reinterpret_cast(left_data->buffers[1]->data()); + auto right_ids = reinterpret_cast(right_data->buffers[1]->data()); uint32_t max_left_id = 0; uint32_t max_right_id = 0; for (int64_t i = 0; i < num_ids; ++i) { @@ -224,13 +255,6 @@ struct TestGrouper { } } - void ExpectConsume(const ExecBatch& key_batch, Datum expected) { - Datum ids; - ConsumeAndValidate(key_batch, &ids); - AssertEquivalentIds(expected, ids); - // AssertDatumsEqual(expected, ids, /*verbose=*/true); - } - void ConsumeAndValidate(const ExecBatch& key_batch, Datum* ids = nullptr) { ASSERT_OK_AND_ASSIGN(Datum id_batch, grouper_->Consume(key_batch)); @@ -514,85 +538,171 @@ TEST(GroupBy, Errors) { [null, 3] ])"); - EXPECT_RAISES_WITH_MESSAGE_THAT( - NotImplemented, HasSubstr("Direct execution of HASH_AGGREGATE functions"), - CallFunction("hash_sum", {batch->GetColumnByName("argument"), - batch->GetColumnByName("group_id"), Datum(uint32_t(4))})); + EXPECT_THAT(CallFunction("hash_sum", {batch->GetColumnByName("argument"), + batch->GetColumnByName("group_id")}), + Raises(StatusCode::NotImplemented, + HasSubstr("Direct execution of HASH_AGGREGATE functions"))); } -TEST(GroupBy, SumOnly) { - auto batch = RecordBatchFromJSON( - schema({field("argument", float64()), field("key", int64())}), R"([ +namespace { +void SortBy(std::vector names, Datum* aggregated_and_grouped) { + SortOptions options{{SortKey("key_0", SortOrder::Ascending)}}; + + ASSERT_OK_AND_ASSIGN( + auto batch, RecordBatch::FromStructArray(aggregated_and_grouped->make_array())); + ASSERT_OK_AND_ASSIGN(Datum sort_indices, SortIndices(batch, options)); + + ASSERT_OK_AND_ASSIGN(*aggregated_and_grouped, + Take(*aggregated_and_grouped, sort_indices)); +} +} // namespace + +TEST(GroupBy, CountOnly) { + for (bool use_exec_plan : {false, true}) { + for (bool use_threads : {true, false}) { + SCOPED_TRACE(use_threads ? "parallel/merged" : "serial"); + + auto table = TableFromJSON( + schema({field("argument", float64()), field("key", int64())}), {R"([ [1.0, 1], - [null, 1], + [null, 1] + ])", + R"([ [0.0, 2], [null, 3], [4.0, null], [3.25, 1], - [0.125, 2], + [0.125, 2] + ])", + R"([ [-0.25, 2], [0.75, null], [null, 3] - ])"); + ])"}); - ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped, - internal::GroupBy({batch->GetColumnByName("argument")}, - {batch->GetColumnByName("key")}, - { - {"hash_sum", nullptr}, - })); + ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped, + GroupByTest({table->GetColumnByName("argument")}, + {table->GetColumnByName("key")}, + { + {"hash_count", nullptr}, + }, + use_threads, use_exec_plan)); + SortBy({"key_0"}, &aggregated_and_grouped); + + AssertDatumsEqual(ArrayFromJSON(struct_({ + field("hash_count", int64()), + field("key_0", int64()), + }), + R"([ + [2, 1], + [3, 2], + [0, 3], + [2, null] + ])"), + aggregated_and_grouped, + /*verbose=*/true); + } + } +} - AssertDatumsEqual(ArrayFromJSON(struct_({ - field("hash_sum", float64()), - field("key_0", int64()), - }), - R"([ +TEST(GroupBy, SumOnly) { + for (bool use_exec_plan : {false, true}) { + for (bool use_threads : {true, false}) { + SCOPED_TRACE(use_threads ? "parallel/merged" : "serial"); + + auto table = TableFromJSON( + schema({field("argument", float64()), field("key", int64())}), {R"([ + [1.0, 1], + [null, 1] + ])", + R"([ + [0.0, 2], + [null, 3], + [4.0, null], + [3.25, 1], + [0.125, 2] + ])", + R"([ + [-0.25, 2], + [0.75, null], + [null, 3] + ])"}); + + ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped, + GroupByTest({table->GetColumnByName("argument")}, + {table->GetColumnByName("key")}, + { + {"hash_sum", nullptr}, + }, + use_threads, use_exec_plan)); + SortBy({"key_0"}, &aggregated_and_grouped); + + AssertDatumsEqual(ArrayFromJSON(struct_({ + field("hash_sum", float64()), + field("key_0", int64()), + }), + R"([ [4.25, 1], [-0.125, 2], [null, 3], [4.75, null] ])"), - aggregated_and_grouped, - /*verbose=*/true); + aggregated_and_grouped, + /*verbose=*/true); + } + } } TEST(GroupBy, MinMaxOnly) { - auto batch = RecordBatchFromJSON( - schema({field("argument", float64()), field("key", int64())}), R"([ + for (bool use_exec_plan : {false, true}) { + for (bool use_threads : {true, false}) { + SCOPED_TRACE(use_threads ? "parallel/merged" : "serial"); + + auto table = TableFromJSON( + schema({field("argument", float64()), field("key", int64())}), {R"([ [1.0, 1], - [null, 1], + [null, 1] + ])", + R"([ [0.0, 2], [null, 3], [4.0, null], [3.25, 1], - [0.125, 2], + [0.125, 2] + ])", + R"([ [-0.25, 2], [0.75, null], [null, 3] - ])"); - - ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped, - internal::GroupBy({batch->GetColumnByName("argument")}, - {batch->GetColumnByName("key")}, - { - {"hash_min_max", nullptr}, - })); + ])"}); - AssertDatumsEqual(ArrayFromJSON(struct_({ - field("hash_min_max", struct_({ - field("min", float64()), - field("max", float64()), - })), - field("key_0", int64()), - }), - R"([ + ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped, + GroupByTest({table->GetColumnByName("argument")}, + {table->GetColumnByName("key")}, + { + {"hash_min_max", nullptr}, + }, + use_threads, use_exec_plan)); + SortBy({"key_0"}, &aggregated_and_grouped); + + AssertDatumsEqual( + ArrayFromJSON(struct_({ + field("hash_min_max", struct_({ + field("min", float64()), + field("max", float64()), + })), + field("key_0", int64()), + }), + R"([ [{"min": 1.0, "max": 3.25}, 1], [{"min": -0.25, "max": 0.125}, 2], [{"min": null, "max": null}, 3], [{"min": 0.75, "max": 4.0}, null] ])"), - aggregated_and_grouped, - /*verbose=*/true); + aggregated_and_grouped, + /*verbose=*/true); + } + } } TEST(GroupBy, CountAndSum) { @@ -845,12 +955,6 @@ TEST(GroupBy, MinMaxWithNewGroupsInChunkedArray) { /*verbose=*/true); } -ExecContext* small_chunksize_context() { - static ExecContext ctx; - ctx.set_exec_chunksize(2); - return &ctx; -} - TEST(GroupBy, SmallChunkSizeSumOnly) { auto batch = RecordBatchFromJSON( schema({field("argument", float64()), field("key", int64())}), R"([ diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 0a289985ca2..192f84f46df 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -456,6 +456,12 @@ Result> FragmentsToBatches( }); } +const FieldVector kAugmentedFields{ + field("__fragment_index", int32()), + field("__batch_index", int32()), + field("__last_in_fragment", boolean()), +}; + Result MakeScanNode(compute::ExecPlan* plan, FragmentGenerator fragment_gen, std::shared_ptr options) { @@ -496,12 +502,12 @@ Result MakeScanNode(compute::ExecPlan* plan, return batch; }); - auto augmented_fields = options->dataset_schema->fields(); - augmented_fields.push_back(field("__fragment_index", int32())); - augmented_fields.push_back(field("__batch_index", int32())); - augmented_fields.push_back(field("__last_in_fragment", boolean())); - return compute::MakeSourceNode(plan, "dataset_scan", - schema(std::move(augmented_fields)), std::move(gen)); + auto fields = options->dataset_schema->fields(); + for (const auto& aug_field : kAugmentedFields) { + fields.push_back(aug_field); + } + return compute::MakeSourceNode(plan, "dataset_scan", schema(std::move(fields)), + std::move(gen)); } class OneShotScanTask : public ScanTask { @@ -776,24 +782,13 @@ Future> AsyncScanner::ToTableAsync( }); } -namespace { -Result GetSelectionSize(const Datum& selection, int64_t length) { - if (length == 0) return 0; - - if (selection.is_scalar()) { - if (!selection.scalar()->is_valid) return 0; - if (!selection.scalar_as().value) return 0; - return length; - } - - ARROW_ASSIGN_OR_RAISE(auto count, compute::Sum(selection)); - return static_cast(count.scalar_as().value); -} -} // namespace - Result AsyncScanner::CountRows() { ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments()); - ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make()); + + auto cpu_executor = scan_options_->use_threads ? internal::GetCpuThreadPool() : nullptr; + compute::ExecContext exec_context(scan_options_->pool, cpu_executor); + + ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(&exec_context)); // Drop projection since we only need to count rows auto options = std::make_shared(*scan_options_); RETURN_NOT_OK(SetProjection(options.get(), std::vector())); @@ -802,7 +797,7 @@ Result AsyncScanner::CountRows() { fragment_gen = MakeMappedGenerator( std::move(fragment_gen), [&](const std::shared_ptr& fragment) { - return fragment->CountRows(scan_options_->filter, scan_options_) + return fragment->CountRows(options->filter, options) .Then([&, fragment](util::optional fast_count) mutable -> std::shared_ptr { if (fast_count) { @@ -825,25 +820,21 @@ Result AsyncScanner::CountRows() { auto get_selection, compute::MakeProjectNode(scan, "get_selection", {options->filter})); + ARROW_ASSIGN_OR_RAISE( + auto sum_selection, + compute::MakeScalarAggregateNode(get_selection, "sum_selection", + {compute::internal::Aggregate{"sum", nullptr}})); + AsyncGenerator> sink_gen = - compute::MakeSinkNode(get_selection, "sink"); + compute::MakeSinkNode(sum_selection, "sink"); RETURN_NOT_OK(plan->StartProducing()); - - RETURN_NOT_OK( - VisitAsyncGenerator(std::move(sink_gen), - [&](const util::optional& batch) { - // TODO replace with scalar aggregation node - ARROW_ASSIGN_OR_RAISE( - int64_t slow_count, - GetSelectionSize(batch->values[0], batch->length)); - total += slow_count; - return Status::OK(); - }) - .status()); - + auto maybe_slow_count = sink_gen().result(); plan->finished().Wait(); + ARROW_ASSIGN_OR_RAISE(auto slow_count, maybe_slow_count); + total += slow_count->values[0].scalar_as().value; + return total.load(); } @@ -1157,6 +1148,25 @@ Result SyncScanner::CountRows() { Result MakeScanNode(compute::ExecPlan* plan, std::shared_ptr dataset, std::shared_ptr scan_options) { + if (scan_options->dataset_schema == nullptr) { + scan_options->dataset_schema = dataset->schema(); + } + + if (!scan_options->filter.IsBound()) { + ARROW_ASSIGN_OR_RAISE(scan_options->filter, + scan_options->filter.Bind(*dataset->schema())); + } + + if (!scan_options->projection.IsBound()) { + auto fields = dataset->schema()->fields(); + for (const auto& aug_field : kAugmentedFields) { + fields.push_back(aug_field); + } + + ARROW_ASSIGN_OR_RAISE(scan_options->projection, + scan_options->projection.Bind(Schema(std::move(fields)))); + } + // using a generator for speculative forward compatibility with async fragment discovery ARROW_ASSIGN_OR_RAISE(auto fragments_it, dataset->GetFragments(scan_options->filter)); ARROW_ASSIGN_OR_RAISE(auto fragments_vec, fragments_it.ToVector()); @@ -1175,9 +1185,9 @@ Result MakeAugmentedProjectNode( } } - for (auto aug_name : {"__fragment_index", "__batch_index", "__last_in_fragment"}) { - exprs.push_back(compute::field_ref(aug_name)); - names.emplace_back(aug_name); + for (const auto& aug_field : kAugmentedFields) { + exprs.push_back(compute::field_ref(aug_field->name())); + names.push_back(aug_field->name()); } return compute::MakeProjectNode(input, std::move(label), std::move(exprs), std::move(names)); diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 5dc83c662de..de7f780183a 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -1175,6 +1175,22 @@ DatasetAndBatches MakeBasicDataset() { return {dataset, batches}; } + +compute::Expression Materialize(std::vector names, + bool include_aug_fields = false) { + if (include_aug_fields) { + for (auto aug_name : {"__fragment_index", "__batch_index", "__last_in_fragment"}) { + names.emplace_back(aug_name); + } + } + + std::vector exprs; + for (const auto& name : names) { + exprs.push_back(field_ref(name)); + } + + return project(exprs, names); +} } // namespace TEST(ScanNode, Schema) { @@ -1184,7 +1200,7 @@ TEST(ScanNode, Schema) { auto options = std::make_shared(); options->use_async = true; - options->dataset_schema = basic.dataset->schema(); + options->projection = Materialize({}); // set an empty projection ASSERT_OK_AND_ASSIGN(auto scan, MakeScanNode(plan.get(), basic.dataset, options)); @@ -1192,6 +1208,8 @@ TEST(ScanNode, Schema) { fields.push_back(field("__fragment_index", int32())); fields.push_back(field("__batch_index", int32())); fields.push_back(field("__last_in_fragment", boolean())); + // output_schema is *always* the full augmented dataset schema, regardless of projection + // (but some columns *may* be placeholder null Scalars if not projected) AssertSchemaEqual(Schema(fields), *scan->output_schema()); } @@ -1202,7 +1220,8 @@ TEST(ScanNode, Trivial) { auto options = std::make_shared(); options->use_async = true; - options->dataset_schema = basic.dataset->schema(); + // ensure all fields are materialized + options->projection = Materialize({"a", "b", "c"}, /*include_aug_fields=*/true); ASSERT_OK_AND_ASSIGN(auto scan, MakeScanNode(plan.get(), basic.dataset, options)); auto sink_gen = MakeSinkNode(scan, "sink"); @@ -1220,9 +1239,9 @@ TEST(ScanNode, FilteredOnVirtualColumn) { auto options = std::make_shared(); options->use_async = true; - options->dataset_schema = basic.dataset->schema(); - ASSERT_OK_AND_ASSIGN(options->filter, - less(field_ref("c"), literal(30)).Bind(*basic.dataset->schema())); + options->filter = less(field_ref("c"), literal(30)); + // ensure all fields are materialized + options->projection = Materialize({"a", "b", "c"}, /*include_aug_fields=*/true); ASSERT_OK_AND_ASSIGN(auto scan, MakeScanNode(plan.get(), basic.dataset, options)); @@ -1245,10 +1264,9 @@ TEST(ScanNode, DeferredFilterOnPhysicalColumn) { auto options = std::make_shared(); options->use_async = true; - options->dataset_schema = basic.dataset->schema(); - ASSERT_OK_AND_ASSIGN( - options->filter, - greater(field_ref("a"), literal(4)).Bind(*basic.dataset->schema())); + options->filter = greater(field_ref("a"), literal(4)); + // ensure all fields are materialized + options->projection = Materialize({"a", "b", "c"}, /*include_aug_fields=*/true); ASSERT_OK_AND_ASSIGN(auto scan, MakeScanNode(plan.get(), basic.dataset, options)); @@ -1270,8 +1288,7 @@ TEST(ScanNode, DISABLED_ProjectionPushdown) { auto options = std::make_shared(); options->use_async = true; - options->dataset_schema = basic.dataset->schema(); - ASSERT_OK(SetProjection(options.get(), {field_ref("b")}, {"b"})); + options->projection = Materialize({"b"}, /*include_aug_fields=*/true); ASSERT_OK_AND_ASSIGN(auto scan, MakeScanNode(plan.get(), basic.dataset, options)); @@ -1298,16 +1315,14 @@ TEST(ScanNode, MaterializationOfVirtualColumn) { auto options = std::make_shared(); options->use_async = true; - options->dataset_schema = basic.dataset->schema(); + options->projection = Materialize({"a", "b", "c"}, /*include_aug_fields=*/true); ASSERT_OK_AND_ASSIGN(auto scan, MakeScanNode(plan.get(), basic.dataset, options)); ASSERT_OK_AND_ASSIGN( auto project, - compute::MakeProjectNode( - scan, "project", - {field_ref("a"), field_ref("b"), field_ref("c"), field_ref("__fragment_index"), - field_ref("__batch_index"), field_ref("__last_in_fragment")})); + dataset::MakeAugmentedProjectNode( + scan, "project", {field_ref("a"), field_ref("b"), field_ref("c")})); auto sink_gen = MakeSinkNode(project, "sink"); @@ -1352,16 +1367,12 @@ TEST(ScanNode, MinimalEndToEnd) { auto options = std::make_shared(); // sync scanning is not supported by ScanNode options->use_async = true; - // for now, we must replicate the dataset schema here - options->dataset_schema = dataset->schema(); // specify the filter compute::Expression b_is_true = field_ref("b"); - ASSERT_OK_AND_ASSIGN(b_is_true, b_is_true.Bind(*dataset->schema())); options->filter = b_is_true; // for now, specify the projection as the full project expression (eventually this can // just be a list of materialized field names) compute::Expression a_times_2 = call("multiply", {field_ref("a"), literal(2)}); - ASSERT_OK_AND_ASSIGN(a_times_2, a_times_2.Bind(*dataset->schema())); options->projection = call("make_struct", {a_times_2}, compute::MakeStructOptions{{"a * 2"}}); @@ -1388,20 +1399,181 @@ TEST(ScanNode, MinimalEndToEnd) { std::shared_ptr sink_reader = compute::MakeGeneratorReader( schema({field("a * 2", int32())}), std::move(sink_gen), exec_context.memory_pool()); - // start the ExecPlan then wait 1s for completion + // start the ExecPlan ASSERT_OK(plan->StartProducing()); - ASSERT_TRUE(plan->finished().Wait(/*seconds=*/1)) << "ExecPlan didn't finish within 1s"; // collect sink_reader into a Table ASSERT_OK_AND_ASSIGN(auto collected, Table::FromRecordBatchReader(sink_reader.get())); + // wait 1s for completion + ASSERT_TRUE(plan->finished().Wait(/*seconds=*/1)) << "ExecPlan didn't finish within 1s"; + auto expected = TableFromJSON(schema({field("a * 2", int32())}), { R"([ {"a * 2": 4}, {"a * 2": null}, {"a * 2": null} ])"}); + AssertTablesEqual(*expected, *collected, /*same_chunk_layout=*/false); +} + +TEST(ScanNode, MinimalScalarAggEndToEnd) { + // NB: This test is here for didactic purposes + + // Specify a MemoryPool and ThreadPool for the ExecPlan + compute::ExecContext exec_context(default_memory_pool(), internal::GetCpuThreadPool()); + + // A ScanNode is constructed from an ExecPlan (into which it is inserted), + // a Dataset (whose batches will be scanned), and ScanOptions (to specify a filter for + // predicate pushdown, a projection to skip materialization of unnecessary columns, ...) + ASSERT_OK_AND_ASSIGN(std::shared_ptr plan, + compute::ExecPlan::Make(&exec_context)); + + std::shared_ptr dataset = std::make_shared( + TableFromJSON(schema({field("a", int32()), field("b", boolean())}), + { + R"([{"a": 1, "b": null}, + {"a": 2, "b": true}])", + R"([{"a": null, "b": true}, + {"a": 3, "b": false}])", + R"([{"a": null, "b": true}, + {"a": 4, "b": false}])", + R"([{"a": 5, "b": null}, + {"a": 6, "b": false}, + {"a": 7, "b": false}])", + })); + + auto options = std::make_shared(); + // sync scanning is not supported by ScanNode + options->use_async = true; + // specify the filter + compute::Expression b_is_true = field_ref("b"); + options->filter = b_is_true; + // for now, specify the projection as the full project expression (eventually this can + // just be a list of materialized field names) + compute::Expression a_times_2 = call("multiply", {field_ref("a"), literal(2)}); + options->projection = + call("make_struct", {a_times_2}, compute::MakeStructOptions{{"a * 2"}}); + + // construct the scan node + ASSERT_OK_AND_ASSIGN(compute::ExecNode * scan, + dataset::MakeScanNode(plan.get(), dataset, options)); + + // pipe the scan node into a filter node + ASSERT_OK_AND_ASSIGN(compute::ExecNode * filter, + compute::MakeFilterNode(scan, "filter", b_is_true)); + + // pipe the filter node into a project node + ASSERT_OK_AND_ASSIGN(compute::ExecNode * project, + compute::MakeProjectNode(filter, "project", {a_times_2})); + + // pipe the projection into a scalar aggregate node + ASSERT_OK_AND_ASSIGN( + compute::ExecNode * sum, + compute::MakeScalarAggregateNode(project, "scalar_agg", + {compute::internal::Aggregate{"sum", nullptr}})); + + // finally, pipe the project node into a sink node + auto sink_gen = compute::MakeSinkNode(sum, "sink"); + + // translate sink_gen (async) to sink_reader (sync) + std::shared_ptr sink_reader = compute::MakeGeneratorReader( + schema({field("sum", int64())}), std::move(sink_gen), exec_context.memory_pool()); + // start the ExecPlan + ASSERT_OK(plan->StartProducing()); + + // collect sink_reader into a Table + ASSERT_OK_AND_ASSIGN(auto collected, Table::FromRecordBatchReader(sink_reader.get())); + + // wait 1s for completion + ASSERT_TRUE(plan->finished().Wait(/*seconds=*/1)) << "ExecPlan didn't finish within 1s"; + + auto expected = TableFromJSON(schema({field("sum", int64())}), { + R"([ + {"sum": 4} + ])"}); + AssertTablesEqual(*expected, *collected, /*same_chunk_layout=*/false); +} + +TEST(ScanNode, MinimalGroupedAggEndToEnd) { + // NB: This test is here for didactic purposes + + // Specify a MemoryPool and ThreadPool for the ExecPlan + compute::ExecContext exec_context(default_memory_pool(), internal::GetCpuThreadPool()); + + // A ScanNode is constructed from an ExecPlan (into which it is inserted), + // a Dataset (whose batches will be scanned), and ScanOptions (to specify a filter for + // predicate pushdown, a projection to skip materialization of unnecessary columns, ...) + ASSERT_OK_AND_ASSIGN(std::shared_ptr plan, + compute::ExecPlan::Make(&exec_context)); + + std::shared_ptr dataset = std::make_shared( + TableFromJSON(schema({field("a", int32()), field("b", boolean())}), + { + R"([{"a": 1, "b": null}, + {"a": 2, "b": true}])", + R"([{"a": null, "b": true}, + {"a": 3, "b": false}])", + R"([{"a": null, "b": true}, + {"a": 4, "b": false}])", + R"([{"a": 5, "b": null}, + {"a": 6, "b": false}, + {"a": 7, "b": false}])", + })); + + auto options = std::make_shared(); + // sync scanning is not supported by ScanNode + options->use_async = true; + // specify the filter + compute::Expression b_is_true = field_ref("b"); + options->filter = b_is_true; + // for now, specify the projection as the full project expression (eventually this can + // just be a list of materialized field names) + compute::Expression a_times_2 = call("multiply", {field_ref("a"), literal(2)}); + compute::Expression b = field_ref("b"); + options->projection = + call("make_struct", {a_times_2, b}, compute::MakeStructOptions{{"a * 2", "b"}}); + + // construct the scan node + ASSERT_OK_AND_ASSIGN(compute::ExecNode * scan, + dataset::MakeScanNode(plan.get(), dataset, options)); + + // pipe the scan node into a project node + ASSERT_OK_AND_ASSIGN( + compute::ExecNode * project, + compute::MakeProjectNode(scan, "project", {a_times_2, b}, {"a * 2", "b"})); + + // pipe the projection into a grouped aggregate node + ASSERT_OK_AND_ASSIGN(compute::ExecNode * sum, + compute::MakeGroupByNode( + project, "grouped_agg", /*keys=*/{"b"}, /*targets=*/{"a * 2"}, + {compute::internal::Aggregate{"hash_sum", nullptr}})); + + // finally, pipe the project node into a sink node + auto sink_gen = compute::MakeSinkNode(sum, "sink"); + + // translate sink_gen (async) to sink_reader (sync) + std::shared_ptr sink_reader = compute::MakeGeneratorReader( + schema({field("hash_sum", int64()), field("b", boolean())}), std::move(sink_gen), + exec_context.memory_pool()); + + // start the ExecPlan + ASSERT_OK(plan->StartProducing()); + + // collect sink_reader into a Table + ASSERT_OK_AND_ASSIGN(auto collected, Table::FromRecordBatchReader(sink_reader.get())); + + // wait 1s for completion + ASSERT_TRUE(plan->finished().Wait(/*seconds=*/1)) << "ExecPlan didn't finish within 1s"; + + auto expected = + TableFromJSON(schema({field("hash_sum", int64()), field("b", boolean())}), { + R"([ + {"hash_sum": 12, "b": null}, + {"hash_sum": 4, "b": true}, + {"hash_sum": 40, "b": false} + ])"}); AssertTablesEqual(*expected, *collected, /*same_chunk_layout=*/false); } diff --git a/cpp/src/arrow/util/future.cc b/cpp/src/arrow/util/future.cc index f288a15be3f..fc8022a95e4 100644 --- a/cpp/src/arrow/util/future.cc +++ b/cpp/src/arrow/util/future.cc @@ -245,7 +245,8 @@ class ConcreteFutureImpl : public FutureImpl { CallbackRecord callback_record{std::move(callback), opts}; if (IsFutureFinished(state_)) { lock.unlock(); - RunOrScheduleCallback(std::move(callback_record), /*in_add_callback=*/true); + RunOrScheduleCallback(shared_from_this(), std::move(callback_record), + /*in_add_callback=*/true); } else { callbacks_.push_back(std::move(callback_record)); } @@ -263,8 +264,8 @@ class ConcreteFutureImpl : public FutureImpl { } } - bool ShouldScheduleCallback(const CallbackRecord& callback_record, - bool in_add_callback) { + static bool ShouldScheduleCallback(const CallbackRecord& callback_record, + bool in_add_callback) { switch (callback_record.options.should_schedule) { case ShouldSchedule::Never: return false; @@ -280,7 +281,9 @@ class ConcreteFutureImpl : public FutureImpl { } } - void RunOrScheduleCallback(CallbackRecord&& callback_record, bool in_add_callback) { + static void RunOrScheduleCallback(const std::shared_ptr& self, + CallbackRecord&& callback_record, + bool in_add_callback) { if (ShouldScheduleCallback(callback_record, in_add_callback)) { struct CallbackTask { void operator()() { std::move(callback)(*self); } @@ -289,10 +292,10 @@ class ConcreteFutureImpl : public FutureImpl { std::shared_ptr self; }; // Need to keep `this` alive until the callback has a chance to be scheduled. - CallbackTask task{std::move(callback_record.callback), shared_from_this()}; + CallbackTask task{std::move(callback_record.callback), self}; DCHECK_OK(callback_record.options.executor->Spawn(std::move(task))); } else { - std::move(callback_record.callback)(*this); + std::move(callback_record.callback)(*self); } } @@ -311,16 +314,18 @@ class ConcreteFutureImpl : public FutureImpl { } cv_.notify_all(); + auto callbacks = std::move(callbacks_); + auto self = shared_from_this(); + // run callbacks, lock not needed since the future is finished by this // point so nothing else can modify the callbacks list and it is safe // to iterate. // // In fact, it is important not to hold the locks because the callback // may be slow or do its own locking on other resources - for (auto& callback_record : callbacks_) { - RunOrScheduleCallback(std::move(callback_record), /*in_add_callback=*/false); + for (auto& callback_record : callbacks) { + RunOrScheduleCallback(self, std::move(callback_record), /*in_add_callback=*/false); } - callbacks_.clear(); } void DoWait() { diff --git a/cpp/src/arrow/util/unreachable.cc b/cpp/src/arrow/util/unreachable.cc new file mode 100644 index 00000000000..4ffe3a8f787 --- /dev/null +++ b/cpp/src/arrow/util/unreachable.cc @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/util/unreachable.h" + +#include "arrow/util/logging.h" + +namespace arrow { + +[[noreturn]] void Unreachable(const char* message) { + DCHECK(false) << message; + std::abort(); +} + +} // namespace arrow diff --git a/cpp/src/arrow/util/unreachable.h b/cpp/src/arrow/util/unreachable.h new file mode 100644 index 00000000000..027f76e84d2 --- /dev/null +++ b/cpp/src/arrow/util/unreachable.h @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace arrow { + +[[noreturn]] void Unreachable(const char* message = "Unreachable"); + +} // namespace arrow