diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 8a469e3fe12..63f8d39f551 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -141,6 +141,22 @@ Result ExecBatch::Make(std::vector values) { return ExecBatch(std::move(values), length); } +Result> ExecBatch::ToRecordBatch( + std::shared_ptr schema, MemoryPool* pool) const { + ArrayVector columns(schema->num_fields()); + + for (size_t i = 0; i < columns.size(); ++i) { + const Datum& value = values[i]; + if (value.is_array()) { + columns[i] = value.make_array(); + continue; + } + ARROW_ASSIGN_OR_RAISE(columns[i], MakeArrayFromScalar(*value.scalar(), length, pool)); + } + + return RecordBatch::Make(std::move(schema), length, std::move(columns)); +} + namespace { Result> AllocateDataBuffer(KernelContext* ctx, int64_t length, diff --git a/cpp/src/arrow/compute/exec.h b/cpp/src/arrow/compute/exec.h index 77d04b86ceb..de1b695de48 100644 --- a/cpp/src/arrow/compute/exec.h +++ b/cpp/src/arrow/compute/exec.h @@ -182,6 +182,9 @@ struct ARROW_EXPORT ExecBatch { static Result Make(std::vector values); + Result> ToRecordBatch( + std::shared_ptr schema, MemoryPool* pool = default_memory_pool()) const; + /// The values representing positional arguments to be passed to a kernel's /// exec function for processing. std::vector values; diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index d0d50af1ac7..35e4af3889a 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -18,12 +18,17 @@ #include "arrow/compute/exec/exec_plan.h" #include +#include +#include #include +#include "arrow/array/util.h" #include "arrow/compute/api_vector.h" #include "arrow/compute/exec.h" #include "arrow/compute/exec/expression.h" +#include "arrow/compute/registry.h" #include "arrow/datum.h" +#include "arrow/record_batch.h" #include "arrow/result.h" #include "arrow/util/async_generator.h" #include "arrow/util/checked_cast.h" @@ -33,6 +38,7 @@ namespace arrow { using internal::checked_cast; +using internal::checked_pointer_cast; namespace compute { @@ -489,15 +495,23 @@ struct ProjectNode : ExecNode { }; Result MakeProjectNode(ExecNode* input, std::string label, - std::vector exprs) { + std::vector exprs, + std::vector names) { FieldVector fields(exprs.size()); + if (names.size() == 0) { + names.resize(exprs.size()); + for (size_t i = 0; i < exprs.size(); ++i) { + names[i] = exprs[i].ToString(); + } + } + int i = 0; for (auto& expr : exprs) { if (!expr.IsBound()) { ARROW_ASSIGN_OR_RAISE(expr, expr.Bind(*input->output_schema())); } - fields[i] = field(expr.ToString(), expr.type()); + fields[i] = field(std::move(names[i]), expr.type()); ++i; } @@ -552,15 +566,16 @@ struct SinkNode : ExecNode { ++num_received_; if (num_received_ == emit_stop_) { lock.unlock(); + producer_.Push(std::move(batch)); Finish(); - lock.lock(); + return; } if (emit_stop_ != -1) { DCHECK_LE(seq_num, emit_stop_); } - lock.unlock(); + lock.unlock(); producer_.Push(std::move(batch)); } @@ -574,8 +589,10 @@ struct SinkNode : ExecNode { void InputFinished(ExecNode* input, int seq_stop) override { std::unique_lock lock(mutex_); emit_stop_ = seq_stop; - lock.unlock(); - Finish(); + if (num_received_ == emit_stop_) { + lock.unlock(); + Finish(); + } } private: @@ -601,5 +618,205 @@ AsyncGenerator> MakeSinkNode(ExecNode* input, return out; } +std::shared_ptr MakeGeneratorReader( + std::shared_ptr schema, + std::function>()> gen, MemoryPool* pool) { + struct Impl : RecordBatchReader { + std::shared_ptr schema() const override { return schema_; } + + Status ReadNext(std::shared_ptr* record_batch) override { + ARROW_ASSIGN_OR_RAISE(auto batch, iterator_.Next()); + if (batch) { + ARROW_ASSIGN_OR_RAISE(*record_batch, batch->ToRecordBatch(schema_, pool_)); + } else { + *record_batch = IterationEnd>(); + } + return Status::OK(); + } + + MemoryPool* pool_; + std::shared_ptr schema_; + Iterator> iterator_; + }; + + auto out = std::make_shared(); + out->pool_ = pool; + out->schema_ = std::move(schema); + out->iterator_ = MakeGeneratorIterator(std::move(gen)); + return out; +} + +struct ScalarAggregateNode : ExecNode { + ScalarAggregateNode(ExecNode* input, std::string label, + std::shared_ptr output_schema, + std::vector kernels, + std::vector>> states) + : ExecNode(input->plan(), std::move(label), {input}, {"target"}, + /*output_schema=*/std::move(output_schema), + /*num_outputs=*/1), + kernels_(std::move(kernels)), + states_(std::move(states)) {} + + const char* kind_name() override { return "ScalarAggregateNode"; } + + Status DoConsume(const ExecBatch& batch, size_t thread_index) { + for (size_t i = 0; i < kernels_.size(); ++i) { + KernelContext batch_ctx{plan()->exec_context()}; + batch_ctx.SetState(states_[i][thread_index].get()); + ExecBatch single_column_batch{{batch.values[i]}, batch.length}; + RETURN_NOT_OK(kernels_[i]->consume(&batch_ctx, single_column_batch)); + } + return Status::OK(); + } + + void InputReceived(ExecNode* input, int seq, ExecBatch batch) override { + DCHECK_EQ(input, inputs_[0]); + + std::unique_lock lock(mutex_); + auto it = + thread_indices_.emplace(std::this_thread::get_id(), thread_indices_.size()).first; + ++num_received_; + auto thread_index = it->second; + + lock.unlock(); + + Status st = DoConsume(std::move(batch), thread_index); + if (!st.ok()) { + outputs_[0]->ErrorReceived(this, std::move(st)); + return; + } + + lock.lock(); + st = MaybeFinish(&lock); + if (!st.ok()) { + outputs_[0]->ErrorReceived(this, std::move(st)); + } + } + + void ErrorReceived(ExecNode* input, Status error) override { + DCHECK_EQ(input, inputs_[0]); + outputs_[0]->ErrorReceived(this, std::move(error)); + } + + void InputFinished(ExecNode* input, int seq) override { + DCHECK_EQ(input, inputs_[0]); + std::unique_lock lock(mutex_); + num_total_ = seq; + Status st = MaybeFinish(&lock); + + if (!st.ok()) { + outputs_[0]->ErrorReceived(this, std::move(st)); + } + } + + Status StartProducing() override { + finished_ = Future<>::Make(); + // Scalar aggregates will only output a single batch + outputs_[0]->InputFinished(this, 1); + return Status::OK(); + } + + void PauseProducing(ExecNode* output) override {} + + void ResumeProducing(ExecNode* output) override {} + + void StopProducing(ExecNode* output) override { + DCHECK_EQ(output, outputs_[0]); + StopProducing(); + } + + void StopProducing() override { + inputs_[0]->StopProducing(this); + finished_.MarkFinished(); + } + + Future<> finished() override { return finished_; } + + private: + Status MaybeFinish(std::unique_lock* lock) { + if (num_received_ != num_total_) return Status::OK(); + + if (finished_.is_finished()) return Status::OK(); + + ExecBatch batch{{}, 1}; + batch.values.resize(kernels_.size()); + + for (size_t i = 0; i < kernels_.size(); ++i) { + KernelContext ctx{plan()->exec_context()}; + ARROW_ASSIGN_OR_RAISE(auto merged, ScalarAggregateKernel::MergeAll( + kernels_[i], &ctx, std::move(states_[i]))); + RETURN_NOT_OK(kernels_[i]->finalize(&ctx, &batch.values[i])); + } + lock->unlock(); + + outputs_[0]->InputReceived(this, 0, batch); + + finished_.MarkFinished(); + return Status::OK(); + } + + Future<> finished_ = Future<>::MakeFinished(); + std::vector kernels_; + std::vector>> states_; + std::unordered_map thread_indices_; + std::mutex mutex_; + int num_received_ = 0, num_total_; +}; + +Result MakeScalarAggregateNode(ExecNode* input, std::string label, + std::vector aggregates) { + if (input->output_schema()->num_fields() != static_cast(aggregates.size())) { + return Status::Invalid("Provided ", aggregates.size(), + " aggregates, expected one for each field of ", + input->output_schema()->ToString()); + } + + auto exec_ctx = input->plan()->exec_context(); + + std::vector kernels(aggregates.size()); + std::vector>> states(kernels.size()); + FieldVector fields(kernels.size()); + + for (size_t i = 0; i < kernels.size(); ++i) { + ARROW_ASSIGN_OR_RAISE(auto function, + exec_ctx->func_registry()->GetFunction(aggregates[i].function)); + + if (function->kind() != Function::SCALAR_AGGREGATE) { + return Status::Invalid("Provided non ScalarAggregateFunction ", + aggregates[i].function); + } + + auto in_type = ValueDescr::Array(input->output_schema()->fields()[i]->type()); + + ARROW_ASSIGN_OR_RAISE(const Kernel* kernel, function->DispatchExact({in_type})); + kernels[i] = static_cast(kernel); + + if (aggregates[i].options == nullptr) { + aggregates[i].options = function->default_options(); + } + + KernelContext kernel_ctx{exec_ctx}; + states[i].resize(exec_ctx->executor() ? exec_ctx->executor()->GetCapacity() : 1); + RETURN_NOT_OK(Kernel::InitAll(&kernel_ctx, + KernelInitArgs{kernels[i], + { + in_type, + }, + aggregates[i].options}, + &states[i])); + + // pick one to resolve the kernel signature + kernel_ctx.SetState(states[i][0].get()); + ARROW_ASSIGN_OR_RAISE( + auto descr, kernels[i]->signature->out_type().Resolve(&kernel_ctx, {in_type})); + + fields[i] = field(aggregates[i].function, std::move(descr.type)); + } + + return input->plan()->EmplaceNode( + input, std::move(label), schema(std::move(fields)), std::move(kernels), + std::move(states)); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index 6c29ddfa7a6..c36c174af05 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -22,6 +22,7 @@ #include #include +#include "arrow/compute/api_aggregate.h" #include "arrow/compute/exec.h" #include "arrow/compute/type_fwd.h" #include "arrow/type_fwd.h" @@ -243,12 +244,19 @@ ExecNode* MakeSourceNode(ExecPlan* plan, std::string label, /// \brief Add a sink node which forwards to an AsyncGenerator /// -/// Emitted batches will not be ordered; instead they will be tagged with the `seq` at -/// which they were received. +/// Emitted batches will not be ordered. ARROW_EXPORT std::function>()> MakeSinkNode(ExecNode* input, std::string label); +/// \brief Wrap an ExecBatch generator in a RecordBatchReader. +/// +/// The RecordBatchReader does not impose any ordering on emitted batches. +ARROW_EXPORT +std::shared_ptr MakeGeneratorReader( + std::shared_ptr, std::function>()>, + MemoryPool*); + /// \brief Make a node which excludes some rows from batches passed through it /// /// The filter Expression will be evaluated against each batch which is pushed to @@ -265,9 +273,15 @@ Result MakeFilterNode(ExecNode* input, std::string label, Expression /// this node to produce a corresponding output column. /// /// If exprs are not already bound, they will be bound against the input's schema. +/// If names are not provided, the string representations of exprs will be used. ARROW_EXPORT Result MakeProjectNode(ExecNode* input, std::string label, - std::vector exprs); + std::vector exprs, + std::vector names = {}); + +ARROW_EXPORT +Result MakeScalarAggregateNode(ExecNode* input, std::string label, + std::vector aggregates); } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 9ebafc42668..7d412e67c5c 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -206,7 +206,7 @@ Result MakeTestSourceNode(ExecPlan* plan, std::string label, bool slow) { DCHECK_GT(batches_with_schema.batches.size(), 0); - auto opt_batches = internal::MapVector( + auto opt_batches = ::arrow::internal::MapVector( [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, std::move(batches_with_schema.batches)); @@ -216,10 +216,10 @@ Result MakeTestSourceNode(ExecPlan* plan, std::string label, // emulate batches completing initial decode-after-scan on a cpu thread ARROW_ASSIGN_OR_RAISE( gen, MakeBackgroundGenerator(MakeVectorIterator(std::move(opt_batches)), - internal::GetCpuThreadPool())); + ::arrow::internal::GetCpuThreadPool())); // ensure that callbacks are not executed immediately on a background thread - gen = MakeTransferredGenerator(std::move(gen), internal::GetCpuThreadPool()); + gen = MakeTransferredGenerator(std::move(gen), ::arrow::internal::GetCpuThreadPool()); } else { gen = MakeVectorGenerator(std::move(opt_batches)); } @@ -245,7 +245,7 @@ Future> StartAndCollect( return AllComplete({plan->finished(), Future<>(collected_fut)}) .Then([collected_fut]() -> Result> { ARROW_ASSIGN_OR_RAISE(auto collected, collected_fut.result()); - return internal::MapVector( + return ::arrow::internal::MapVector( [](util::optional batch) { return std::move(*batch); }, std::move(collected)); }); @@ -412,7 +412,8 @@ TEST(ExecPlanExecution, SourceProjectSink) { ASSERT_OK_AND_ASSIGN(expr, expr.Bind(*basic_data.schema)); } - ASSERT_OK_AND_ASSIGN(auto projection, MakeProjectNode(source, "project", exprs)); + ASSERT_OK_AND_ASSIGN(auto projection, + MakeProjectNode(source, "project", exprs, {"!bool", "i32 + 1"})); auto sink_gen = MakeSinkNode(projection, "sink"); @@ -423,5 +424,28 @@ TEST(ExecPlanExecution, SourceProjectSink) { "[[null, 6], [true, 7], [true, 8]]")})))); } +TEST(ExecPlanExecution, SourceScalarAggSink) { + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + + auto basic_data = MakeBasicBatches(); + + ASSERT_OK_AND_ASSIGN(auto source, + MakeTestSourceNode(plan.get(), "source", basic_data, + /*parallel=*/false, /*slow=*/false)); + + ASSERT_OK_AND_ASSIGN(auto scalar_agg, + MakeScalarAggregateNode(source, "scalar_agg", + {{"sum", nullptr}, {"any", nullptr}})); + + auto sink_gen = MakeSinkNode(scalar_agg, "sink"); + + ASSERT_THAT( + StartAndCollect(plan.get(), sink_gen), + Finishes(ResultWith(UnorderedElementsAreArray({ + ExecBatchFromJSON({ValueDescr::Scalar(int64()), ValueDescr::Scalar(boolean())}, + "[[22, true]]"), + })))); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/test_util.cc b/cpp/src/arrow/compute/exec/test_util.cc index bd203b354f0..8cc6200ea40 100644 --- a/cpp/src/arrow/compute/exec/test_util.cc +++ b/cpp/src/arrow/compute/exec/test_util.cc @@ -134,7 +134,7 @@ ExecNode* MakeDummyNode(ExecPlan* plan, std::string label, std::vector& descrs, util::string_view json) { - auto fields = internal::MapVector( + auto fields = ::arrow::internal::MapVector( [](const ValueDescr& descr) { return field("", descr.type); }, descrs); ExecBatch batch{*RecordBatchFromJSON(schema(std::move(fields)), json)}; diff --git a/cpp/src/arrow/compute/kernel.cc b/cpp/src/arrow/compute/kernel.cc index 8fa740ed247..6cdd17adcc9 100644 --- a/cpp/src/arrow/compute/kernel.cc +++ b/cpp/src/arrow/compute/kernel.cc @@ -59,6 +59,26 @@ Result> KernelContext::AllocateBitmap(int64_t n return result; } +Status Kernel::InitAll(KernelContext* ctx, const KernelInitArgs& args, + std::vector>* states) { + for (auto& state : *states) { + ARROW_ASSIGN_OR_RAISE(state, args.kernel->init(ctx, args)); + } + return Status::OK(); +} + +Result> ScalarAggregateKernel::MergeAll( + const ScalarAggregateKernel* kernel, KernelContext* ctx, + std::vector> states) { + auto out = std::move(states.back()); + states.pop_back(); + ctx->SetState(out.get()); + for (auto& state : states) { + RETURN_NOT_OK(kernel->merge(ctx, std::move(*state), out.get())); + } + return std::move(out); +} + // ---------------------------------------------------------------------- // Some basic TypeMatcher implementations diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h index c88c924817c..50b1dd8e55e 100644 --- a/cpp/src/arrow/compute/kernel.h +++ b/cpp/src/arrow/compute/kernel.h @@ -522,6 +522,10 @@ struct Kernel { /// set up any options or state relevant for execution. KernelInit init; + /// \brief Create a vector of new KernelState for invocations of this kernel. + static Status InitAll(KernelContext*, const KernelInitArgs&, + std::vector>*); + /// \brief Indicates whether execution can benefit from parallelization /// (splitting large chunks into smaller chunks and using multiple /// threads). Some kernels may not support parallel execution at @@ -673,6 +677,12 @@ struct ScalarAggregateKernel : public Kernel { KernelSignature::Make(std::move(in_types), std::move(out_type)), std::move(init), std::move(consume), std::move(merge), std::move(finalize)) {} + /// \brief Merge a vector of KernelStates into a single KernelState. + /// The merged state will be returned and will be set on the KernelContext. + static Result> MergeAll( + const ScalarAggregateKernel* kernel, KernelContext* ctx, + std::vector> states); + ScalarAggregateConsume consume; ScalarAggregateMerge merge; ScalarAggregateFinalize finalize; diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h index 86e321ba522..e6755c05f5d 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h +++ b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h @@ -62,9 +62,9 @@ struct SumImpl : public ScalarAggregator { const auto& data = batch[0].array(); this->count = data->length - data->GetNullCount(); if (is_boolean_type::value) { - this->sum = static_cast(BooleanArray(data).true_count()); + this->sum += static_cast(BooleanArray(data).true_count()); } else { - this->sum = + this->sum += arrow::compute::detail::SumArray(*data); } return Status::OK(); diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index cc2e5bcda66..2f7a115bb4b 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -591,27 +591,15 @@ Result ToEnumeratedRecordBatch( const FragmentVector& fragments) { int num_fields = options.projected_schema->num_fields(); - ArrayVector columns(num_fields); - for (size_t i = 0; i < columns.size(); ++i) { - const Datum& value = batch->values[i]; - if (value.is_array()) { - columns[i] = value.make_array(); - continue; - } - ARROW_ASSIGN_OR_RAISE( - columns[i], MakeArrayFromScalar(*value.scalar(), batch->length, options.pool)); - } - EnumeratedRecordBatch out; out.fragment.index = batch->values[num_fields].scalar_as().value; - out.fragment.value = fragments[out.fragment.index]; out.fragment.last = false; // ignored during reordering + out.fragment.value = fragments[out.fragment.index]; out.record_batch.index = batch->values[num_fields + 1].scalar_as().value; - out.record_batch.value = - RecordBatch::Make(options.projected_schema, batch->length, std::move(columns)); out.record_batch.last = batch->values[num_fields + 2].scalar_as().value; - + ARROW_ASSIGN_OR_RAISE(out.record_batch.value, + batch->ToRecordBatch(options.projected_schema, options.pool)); return out; } } // namespace @@ -633,11 +621,12 @@ Result AsyncScanner::ScanBatchesUnorderedAsync( compute::MakeFilterNode(scan, "filter", scan_options_->filter)); auto exprs = scan_options_->projection.call()->arguments; - exprs.push_back(compute::field_ref("__fragment_index")); - exprs.push_back(compute::field_ref("__batch_index")); - exprs.push_back(compute::field_ref("__last_in_fragment")); - ARROW_ASSIGN_OR_RAISE(auto project, - compute::MakeProjectNode(filter, "project", std::move(exprs))); + auto names = checked_cast( + scan_options_->projection.call()->options.get()) + ->field_names; + ARROW_ASSIGN_OR_RAISE( + auto project, + MakeAugmentedProjectNode(filter, "project", std::move(exprs), std::move(names))); AsyncGenerator> sink_gen = compute::MakeSinkNode(project, "sink"); @@ -1176,5 +1165,89 @@ Result MakeScanNode(compute::ExecPlan* plan, return MakeScanNode(plan, std::move(fragments_gen), std::move(scan_options)); } +Result MakeAugmentedProjectNode( + compute::ExecNode* input, std::string label, std::vector exprs, + std::vector names) { + if (names.size() == 0) { + names.resize(exprs.size()); + for (size_t i = 0; i < exprs.size(); ++i) { + names[i] = exprs[i].ToString(); + } + } + + for (auto aug_name : {"__fragment_index", "__batch_index", "__last_in_fragment"}) { + exprs.push_back(compute::field_ref(aug_name)); + names.emplace_back(aug_name); + } + return compute::MakeProjectNode(input, std::move(label), std::move(exprs), + std::move(names)); +} + +Result>> MakeOrderedSinkNode( + compute::ExecNode* input, std::string label) { + auto unordered = compute::MakeSinkNode(input, std::move(label)); + + const Schema& schema = *input->output_schema(); + ARROW_ASSIGN_OR_RAISE(FieldPath match, FieldRef("__fragment_index").FindOne(schema)); + int i = match[0]; + auto fragment_index = [i](const compute::ExecBatch& batch) { + return batch.values[i].scalar_as().value; + }; + compute::ExecBatch before_any{{}, 0}; + before_any.values.resize(i + 1); + before_any.values.back() = Datum(-1); + + ARROW_ASSIGN_OR_RAISE(match, FieldRef("__batch_index").FindOne(schema)); + i = match[0]; + auto batch_index = [i](const compute::ExecBatch& batch) { + return batch.values[i].scalar_as().value; + }; + + ARROW_ASSIGN_OR_RAISE(match, FieldRef("__last_in_fragment").FindOne(schema)); + i = match[0]; + auto last_in_fragment = [i](const compute::ExecBatch& batch) { + return batch.values[i].scalar_as().value; + }; + + auto is_before_any = [=](const compute::ExecBatch& batch) { + return fragment_index(batch) < 0; + }; + + auto left_after_right = [=](const util::optional& left, + const util::optional& right) { + // Before any comes first + if (is_before_any(*left)) { + return false; + } + if (is_before_any(*right)) { + return true; + } + // Compare batches if fragment is the same + if (fragment_index(*left) == fragment_index(*right)) { + return batch_index(*left) > batch_index(*right); + } + // Otherwise compare fragment + return fragment_index(*left) > fragment_index(*right); + }; + + auto is_next = [=](const util::optional& prev, + const util::optional& next) { + // Only true if next is the first batch + if (is_before_any(*prev)) { + return fragment_index(*next) == 0 && batch_index(*next) == 0; + } + // If same fragment, compare batch index + if (fragment_index(*next) == fragment_index(*prev)) { + return batch_index(*next) == batch_index(*prev) + 1; + } + // Else only if next first batch of next fragment and prev is last batch of previous + return fragment_index(*next) == fragment_index(*prev) + 1 && + last_in_fragment(*prev) && batch_index(*next) == 0; + }; + + return MakeSequencingGenerator(std::move(unordered), left_after_right, is_next, + util::make_optional(std::move(before_any))); +} + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index c803cde1978..fc715206d7d 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -409,11 +409,25 @@ class ARROW_DS_EXPORT ScannerBuilder { /// \brief Construct a source ExecNode which yields batches from a dataset scan. /// -/// Does not construct associated filter or project nodes +/// Does not construct associated filter or project nodes. +/// Yielded batches will be augmented with fragment/batch indices to enable stable +/// ordering for simple ExecPlans. ARROW_DS_EXPORT Result MakeScanNode(compute::ExecPlan*, std::shared_ptr, std::shared_ptr); +/// \brief Construct a ProjectNode which preserves fragment/batch indices. +ARROW_DS_EXPORT Result MakeAugmentedProjectNode( + compute::ExecNode* input, std::string label, std::vector exprs, + std::vector names = {}); + +/// \brief Add a sink node which forwards to an AsyncGenerator +/// +/// Emitted batches will be ordered by fragment and batch indices, or an error +/// will be raised if those fields are not available in the input. +ARROW_DS_EXPORT Result>> +MakeOrderedSinkNode(compute::ExecNode*, std::string label); + /// @} /// \brief A trivial ScanTask that yields the RecordBatch of an array. diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index f567054bf91..74f558d1738 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -1323,5 +1323,86 @@ TEST(ScanNode, MaterializationOfVirtualColumn) { Finishes(ResultWith(UnorderedElementsAreArray(expected)))); } +TEST(ScanNode, MinimalEndToEnd) { + // NB: This test is here for didactic purposes + + // Specify a MemoryPool and ThreadPool for the ExecPlan + compute::ExecContext exec_context(default_memory_pool(), internal::GetCpuThreadPool()); + + // A ScanNode is constructed from an ExecPlan (into which it is inserted), + // a Dataset (whose batches will be scanned), and ScanOptions (to specify a filter for + // predicate pushdown, a projection to skip materialization of unnecessary columns, ...) + ASSERT_OK_AND_ASSIGN(std::shared_ptr plan, + compute::ExecPlan::Make(&exec_context)); + + std::shared_ptr dataset = std::make_shared( + TableFromJSON(schema({field("a", int32()), field("b", boolean())}), + { + R"([{"a": 1, "b": null}, + {"a": 2, "b": true}])", + R"([{"a": null, "b": true}, + {"a": 3, "b": false}])", + R"([{"a": null, "b": true}, + {"a": 4, "b": false}])", + R"([{"a": 5, "b": null}, + {"a": 6, "b": false}, + {"a": 7, "b": false}])", + })); + + auto options = std::make_shared(); + // sync scanning is not supported by ScanNode + options->use_async = true; + // for now, we must replicate the dataset schema here + options->dataset_schema = dataset->schema(); + // specify the filter + compute::Expression b_is_true = field_ref("b"); + ASSERT_OK_AND_ASSIGN(b_is_true, b_is_true.Bind(*dataset->schema())); + options->filter = b_is_true; + // for now, specify the projection as the full project expression (eventually this can + // just be a list of materialized field names) + compute::Expression a_times_2 = call("multiply", {field_ref("a"), literal(2)}); + ASSERT_OK_AND_ASSIGN(a_times_2, a_times_2.Bind(*dataset->schema())); + options->projection = call("project", {a_times_2}, compute::ProjectOptions{{"a * 2"}}); + + // construct the scan node + ASSERT_OK_AND_ASSIGN(compute::ExecNode * scan, + dataset::MakeScanNode(plan.get(), dataset, options)); + + // pipe the scan node into a filter node + ASSERT_OK_AND_ASSIGN(compute::ExecNode * filter, + compute::MakeFilterNode(scan, "filter", b_is_true)); + + // pipe the filter node into a project node + // NB: we're using the project node factory which preserves fragment/batch index + // tagging, so we *can* reorder later if we choose. The tags will not appear in + // our output. + ASSERT_OK_AND_ASSIGN(compute::ExecNode * project, + dataset::MakeAugmentedProjectNode(filter, "project", {a_times_2})); + + // finally, pipe the project node into a sink node + // NB: if we don't need ordering, we could use compute::MakeSinkNode instead + ASSERT_OK_AND_ASSIGN(auto sink_gen, dataset::MakeOrderedSinkNode(project, "sink")); + + // translate sink_gen (async) to sink_reader (sync) + std::shared_ptr sink_reader = compute::MakeGeneratorReader( + schema({field("a * 2", int32())}), std::move(sink_gen), exec_context.memory_pool()); + + // start the ExecPlan then wait 1s for completion + ASSERT_OK(plan->StartProducing()); + ASSERT_TRUE(plan->finished().Wait(/*seconds=*/1)) << "ExecPlan didn't finish within 1s"; + + // collect sink_reader into a Table + ASSERT_OK_AND_ASSIGN(auto collected, Table::FromRecordBatchReader(sink_reader.get())); + + auto expected = TableFromJSON(schema({field("a * 2", int32())}), { + R"([ + {"a * 2": 4}, + {"a * 2": null}, + {"a * 2": null} + ])"}); + + AssertTablesEqual(*expected, *collected, /*same_chunk_layout=*/false); +} + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index 201fc7e55b2..2ce99dc0791 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -544,7 +544,7 @@ class FileFormatScanMixin : public FileFormatFixtureMixin, opts_->use_threads = GetParam().use_threads; if (GetParam().use_async) { EXPECT_OK_AND_ASSIGN(auto batch_gen, fragment->ScanBatchesAsync(opts_)); - EXPECT_OK_AND_ASSIGN(auto batch_it, MakeGeneratorIterator(std::move(batch_gen))); + auto batch_it = MakeGeneratorIterator(std::move(batch_gen)); return batch_it; } EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_)); diff --git a/cpp/src/arrow/testing/matchers.h b/cpp/src/arrow/testing/matchers.h index f76c25dc096..b64269ea7a1 100644 --- a/cpp/src/arrow/testing/matchers.h +++ b/cpp/src/arrow/testing/matchers.h @@ -57,10 +57,7 @@ class FutureMatcher { *listener << "which didn't finish within " << wait_seconds_ << " seconds"; return false; } - - const Result& maybe_value = fut.result(); - testing::StringMatchResultListener value_listener; - return result_matcher_.MatchAndExplain(maybe_value, &value_listener); + return result_matcher_.MatchAndExplain(fut.result(), listener); } const testing::Matcher> result_matcher_; diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index 18149884204..c2aad6cd680 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -1492,7 +1492,7 @@ class GeneratorIterator { /// \brief Converts an AsyncGenerator to an Iterator by blocking until each future /// is finished template -Result> MakeGeneratorIterator(AsyncGenerator source) { +Iterator MakeGeneratorIterator(AsyncGenerator source) { return Iterator(GeneratorIterator(std::move(source))); } diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc index 361ce3eacf0..343eb9b6c4b 100644 --- a/cpp/src/arrow/util/async_generator_test.cc +++ b/cpp/src/arrow/util/async_generator_test.cc @@ -618,7 +618,7 @@ TEST(TestAsyncUtil, SynchronousFinish) { TEST(TestAsyncUtil, GeneratorIterator) { auto generator = BackgroundAsyncVectorIt({1, 2, 3}); - ASSERT_OK_AND_ASSIGN(auto iterator, MakeGeneratorIterator(std::move(generator))); + auto iterator = MakeGeneratorIterator(std::move(generator)); ASSERT_OK_AND_EQ(TestInt(1), iterator.Next()); ASSERT_OK_AND_EQ(TestInt(2), iterator.Next()); ASSERT_OK_AND_EQ(TestInt(3), iterator.Next());