diff --git a/cpp/examples/arrow/compute_register_example.cc b/cpp/examples/arrow/compute_register_example.cc index 6e5ff015387..0d09e2b6d61 100644 --- a/cpp/examples/arrow/compute_register_example.cc +++ b/cpp/examples/arrow/compute_register_example.cc @@ -95,8 +95,8 @@ class ExampleNode : public cp::ExecNode { void ResumeProducing(ExecNode* output) override {} void PauseProducing(ExecNode* output) override {} - void StopProducing(ExecNode* output) override { inputs_[0]->StopProducing(this); } - void StopProducing() override { inputs_[0]->StopProducing(); } + void StopProducing(ExecNode* output) override {} + void StopProducing() override {} void InputReceived(ExecNode* input, cp::ExecBatch batch) override {} void ErrorReceived(ExecNode* input, arrow::Status error) override {} diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index f8a522a2735..d717210e019 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -1044,6 +1044,8 @@ ExecContext::ExecContext(MemoryPool* pool, ::arrow::internal::Executor* executor FunctionRegistry* func_registry) : pool_(pool), executor_(executor) { this->func_registry_ = func_registry == nullptr ? GetFunctionRegistry() : func_registry; + DCHECK_NE(nullptr, executor_); + this->use_threads_ = executor_->GetCapacity() > 1; } CpuInfo* ExecContext::cpu_info() const { return CpuInfo::GetInstance(); } diff --git a/cpp/src/arrow/compute/exec.h b/cpp/src/arrow/compute/exec.h index faebddb7334..68d55dc40b2 100644 --- a/cpp/src/arrow/compute/exec.h +++ b/cpp/src/arrow/compute/exec.h @@ -34,6 +34,7 @@ #include "arrow/result.h" #include "arrow/type_fwd.h" #include "arrow/util/macros.h" +#include "arrow/util/thread_pool.h" #include "arrow/util/type_fwd.h" #include "arrow/util/visibility.h" @@ -60,9 +61,10 @@ static constexpr int64_t kDefaultExecChunksize = UINT16_MAX; class ARROW_EXPORT ExecContext { public: // If no function registry passed, the default is used. - explicit ExecContext(MemoryPool* pool = default_memory_pool(), - ::arrow::internal::Executor* executor = NULLPTR, - FunctionRegistry* func_registry = NULLPTR); + explicit ExecContext( + MemoryPool* pool = default_memory_pool(), + ::arrow::internal::Executor* executor = ::arrow::internal::GetCpuThreadPool(), + FunctionRegistry* func_registry = NULLPTR); /// \brief The MemoryPool used for allocations, default is /// default_memory_pool(). diff --git a/cpp/src/arrow/compute/exec/aggregate_node.cc b/cpp/src/arrow/compute/exec/aggregate_node.cc index b4982ef111f..3f7d01b60fb 100644 --- a/cpp/src/arrow/compute/exec/aggregate_node.cc +++ b/cpp/src/arrow/compute/exec/aggregate_node.cc @@ -246,7 +246,6 @@ class ScalarAggregateNode : public ExecNode { if (input_counter_.Cancel()) { finished_.MarkFinished(); } - inputs_[0]->StopProducing(this); } Future<> finished() override { return finished_; } @@ -531,7 +530,7 @@ class GroupByNode : public ExecNode { auto executor = ctx_->executor(); for (int i = 0; i < num_output_batches; ++i) { - if (executor) { + if (ctx_->use_threads()) { // bail if StopProducing was called if (finished_.is_finished()) break; @@ -610,7 +609,6 @@ class GroupByNode : public ExecNode { if (output_counter_.Cancel()) { finished_.MarkFinished(); } - inputs_[0]->StopProducing(this); } void StopProducing() override { StopProducing(outputs_[0]); } diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index c383c6092af..55ab4d1cb4d 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -360,16 +360,11 @@ bool ExecNode::ErrorIfNotOk(Status status) { } MapNode::MapNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, bool async_mode) + std::shared_ptr output_schema, bool use_threads) : ExecNode(plan, std::move(inputs), /*input_labels=*/{"target"}, std::move(output_schema), - /*num_outputs=*/1) { - if (async_mode) { - executor_ = plan_->exec_context()->executor(); - } else { - executor_ = nullptr; - } -} + /*num_outputs=*/1), + use_threads_(use_threads) {} void MapNode::ErrorReceived(ExecNode* input, Status error) { DCHECK_EQ(input, inputs_[0]); @@ -406,13 +401,14 @@ void MapNode::StopProducing(ExecNode* output) { void MapNode::StopProducing() { EVENT(span_, "StopProducing"); - if (executor_) { + if (use_threads_) { + // If we are using tasks we may have a bunch of queued tasks that we should + // cancel this->stop_source_.RequestStop(); } if (input_counter_.Cancel()) { this->Finish(); } - inputs_[0]->StopProducing(this); } Future<> MapNode::finished() { return finished_; } @@ -436,15 +432,16 @@ void MapNode::SubmitTask(std::function(ExecBatch)> map_fn, return Status::OK(); }; - if (executor_) { + if (use_threads_) { status = task_group_.AddTask([this, task]() -> Result> { - return this->executor_->Submit(this->stop_source_.token(), [this, task]() { - auto status = task(); - if (this->input_counter_.Increment()) { - this->Finish(status); - } - return status; - }); + return this->plan()->exec_context()->executor()->Submit( + this->stop_source_.token(), [this, task]() { + auto status = task(); + if (this->input_counter_.Increment()) { + this->Finish(status); + } + return status; + }); }); } else { status = task(); @@ -458,13 +455,12 @@ void MapNode::SubmitTask(std::function(ExecBatch)> map_fn, if (input_counter_.Cancel()) { this->Finish(status); } - inputs_[0]->StopProducing(this); return; } } void MapNode::Finish(Status finish_st /*= Status::OK()*/) { - if (executor_) { + if (use_threads_) { task_group_.End().AddCallback([this, finish_st](const Status& st) { Status final_status = finish_st & st; this->finished_.MarkFinished(final_status); diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index 85bfd5f756d..7fbc50c594f 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -273,7 +273,7 @@ class ARROW_EXPORT ExecNode { class MapNode : public ExecNode { public: MapNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, bool async_mode); + std::shared_ptr output_schema, bool use_threads); void ErrorReceived(ExecNode* input, Status error) override; @@ -303,7 +303,10 @@ class MapNode : public ExecNode { // The task group for the corresponding batches util::AsyncTaskGroup task_group_; - ::arrow::internal::Executor* executor_; + // If true then tasks will be spawned for each item + // + // If false the item will be processed immediately and synchronously + bool use_threads_; // Variable used to cancel remaining tasks in the executor StopSource stop_source_; diff --git a/cpp/src/arrow/compute/exec/hash_join_node_test.cc b/cpp/src/arrow/compute/exec/hash_join_node_test.cc index 96469a78ab2..5fa334378ec 100644 --- a/cpp/src/arrow/compute/exec/hash_join_node_test.cc +++ b/cpp/src/arrow/compute/exec/hash_join_node_test.cc @@ -63,13 +63,24 @@ BatchesWithSchema GenerateBatchesFromString( return out_batches; } +std::unique_ptr SimpleExecContext( + bool parallel, std::shared_ptr<::arrow::internal::ThreadPool>* owned_thread_pool) { + if (parallel) { + return arrow::internal::make_unique(); + } else { + EXPECT_OK_AND_ASSIGN(*owned_thread_pool, ::arrow::internal::ThreadPool::Make(1)); + return arrow::internal::make_unique(default_memory_pool(), + owned_thread_pool->get()); + } +} + void CheckRunOutput(JoinType type, const BatchesWithSchema& l_batches, const BatchesWithSchema& r_batches, const std::vector& left_keys, const std::vector& right_keys, const BatchesWithSchema& exp_batches, bool parallel = false) { - auto exec_ctx = arrow::internal::make_unique( - default_memory_pool(), parallel ? arrow::internal::GetCpuThreadPool() : nullptr); + std::shared_ptr<::arrow::internal::ThreadPool> owned_thread_pool; + auto exec_ctx = SimpleExecContext(parallel, &owned_thread_pool); ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get())); @@ -889,8 +900,8 @@ void HashJoinWithExecPlan(Random64Bit& rng, bool parallel, const std::vector>& l, const std::vector>& r, int num_batches_l, int num_batches_r, std::shared_ptr* output) { - auto exec_ctx = arrow::internal::make_unique( - default_memory_pool(), parallel ? arrow::internal::GetCpuThreadPool() : nullptr); + std::shared_ptr<::arrow::internal::ThreadPool> owned_thread_pool; + auto exec_ctx = SimpleExecContext(parallel, &owned_thread_pool); ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get())); @@ -998,8 +1009,8 @@ TEST(HashJoin, Random) { #endif for (int test_id = 0; test_id < num_tests; ++test_id) { bool parallel = (rng.from_range(0, 1) == 1); - auto exec_ctx = arrow::internal::make_unique( - default_memory_pool(), parallel ? arrow::internal::GetCpuThreadPool() : nullptr); + std::shared_ptr<::arrow::internal::ThreadPool> owned_thread_pool; + auto exec_ctx = SimpleExecContext(parallel, &owned_thread_pool); // Constraints RandomDataTypeConstraints type_constraints; @@ -1282,8 +1293,8 @@ void TestHashJoinDictionaryHelper( } } - auto exec_ctx = arrow::internal::make_unique( - default_memory_pool(), parallel ? arrow::internal::GetCpuThreadPool() : nullptr); + std::shared_ptr<::arrow::internal::ThreadPool> owned_thread_pool; + auto exec_ctx = SimpleExecContext(parallel, &owned_thread_pool); ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get())); ASSERT_OK_AND_ASSIGN( ExecNode * l_source, @@ -1684,8 +1695,8 @@ TEST(HashJoin, DictNegative) { ExecBatch::Make({i == 2 ? datumSecondB : datumSecondA, i == 3 ? datumSecondB : datumSecondA})); - auto exec_ctx = - arrow::internal::make_unique(default_memory_pool(), nullptr); + std::shared_ptr<::arrow::internal::ThreadPool> owned_thread_pool; + auto exec_ctx = SimpleExecContext(/*parallel=*/false, &owned_thread_pool); ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get())); ASSERT_OK_AND_ASSIGN( ExecNode * l_source, @@ -1713,6 +1724,7 @@ TEST(HashJoin, DictNegative) { EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT( NotImplemented, ::testing::HasSubstr("Unifying differing dictionaries"), StartAndCollect(plan.get(), sink_gen)); + ASSERT_FINISHES_OK(plan->finished()); } } @@ -1771,8 +1783,8 @@ TEST(HashJoin, ResidualFilter) { input_right.schema = schema({field("r1", int32()), field("r2", int32()), field("r_str", utf8())}); - auto exec_ctx = arrow::internal::make_unique( - default_memory_pool(), parallel ? arrow::internal::GetCpuThreadPool() : nullptr); + std::shared_ptr<::arrow::internal::ThreadPool> owned_thread_pool; + auto exec_ctx = SimpleExecContext(parallel, &owned_thread_pool); ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get())); AsyncGenerator> sink_gen; @@ -1848,9 +1860,8 @@ TEST(HashJoin, TrivialResidualFilter) { ])")}; input_right.schema = schema({field("r1", int32()), field("r_str", utf8())}); - auto exec_ctx = arrow::internal::make_unique( - default_memory_pool(), - parallel ? arrow::internal::GetCpuThreadPool() : nullptr); + std::shared_ptr<::arrow::internal::ThreadPool> owned_thread_pool; + auto exec_ctx = SimpleExecContext(parallel, &owned_thread_pool); ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get())); AsyncGenerator> sink_gen; diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index e176c701b65..773e42f2a7a 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -478,6 +478,11 @@ TEST(ExecPlanExecution, SourceSinkError) { ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), Finishes(Raises(StatusCode::Invalid, HasSubstr("Artificial")))); + // Note: the plan may or may not be finished at this point. When an error + // hits the sink node it starts to mark itself finished but before that it emits + // the error to the producer which will cause the above wait to finish (possibly + // before the plan has marked itself finished). So we wait for the plan to finish. + ASSERT_FINISHES_OK(plan->finished()); } TEST(ExecPlanExecution, SourceConsumingSink) { @@ -527,7 +532,7 @@ TEST(ExecPlanExecution, SourceConsumingSink) { } } -TEST(ExecPlanExecution, SourceTableConsumingSink) { +TEST(ExecPlanExecution, SourceTableSink) { for (bool slow : {false, true}) { SCOPED_TRACE(slow ? "slowed" : "unslowed"); @@ -549,11 +554,12 @@ TEST(ExecPlanExecution, SourceTableConsumingSink) { ASSERT_OK(plan->StartProducing()); // Source should finish fairly quickly ASSERT_FINISHES_OK(source->finished()); - SleepABit(); - ASSERT_OK_AND_ASSIGN(auto actual, + ASSERT_OK_AND_ASSIGN(auto expected, TableFromExecBatches(basic_data.schema, basic_data.batches)); ASSERT_EQ(5, out->num_rows()); - AssertTablesEqual(*actual, *out); + ASSERT_OK_AND_ASSIGN(auto expected_sorted, SortTableOnAllFields(expected)); + ASSERT_OK_AND_ASSIGN(auto out_sorted, SortTableOnAllFields(out)); + AssertTablesEqual(*expected_sorted, *out_sorted); ASSERT_FINISHES_OK(plan->finished()); } } @@ -856,11 +862,11 @@ TEST(ExecPlanExecution, SourceGroupedSum) { }) .AddToPlan(plan.get())); - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), + ASSERT_THAT(StartAndCollectSortedByField(plan.get(), sink_gen, 0), Finishes(ResultWith(UnorderedElementsAreArray({ExecBatchFromJSON( {int64(), utf8()}, - parallel ? R"([[800, "alfa"], [1000, "beta"], [400, "gama"]])" - : R"([[8, "alfa"], [10, "beta"], [4, "gama"]])")})))); + parallel ? R"([[400, "gama"], [800, "alfa"], [1000, "beta"]])" + : R"([[4, "gama"], [8, "alfa"], [10, "beta"]])")})))); } } @@ -900,9 +906,9 @@ TEST(ExecPlanExecution, NestedSourceProjectGroupedSum) { auto input = MakeNestedBatches(); auto expected = ExecBatchFromJSON({int64(), boolean()}, R"([ - [null, true], + [5, null], [17, false], - [5, null] + [null, true] ])"); ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); @@ -926,7 +932,7 @@ TEST(ExecPlanExecution, NestedSourceProjectGroupedSum) { }) .AddToPlan(plan.get())); - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), + ASSERT_THAT(StartAndCollectSortedByField(plan.get(), sink_gen, 0), Finishes(ResultWith(UnorderedElementsAreArray({expected})))); } } @@ -962,10 +968,10 @@ TEST(ExecPlanExecution, SourceFilterProjectGroupedSumFilter) { }) .AddToPlan(plan.get())); - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), + ASSERT_THAT(StartAndCollectSortedByField(plan.get(), sink_gen, 0), Finishes(ResultWith(UnorderedElementsAreArray({ExecBatchFromJSON( - {int64(), utf8()}, parallel ? R"([[3600, "alfa"], [2000, "beta"]])" - : R"([[36, "alfa"], [20, "beta"]])")})))); + {int64(), utf8()}, parallel ? R"([[2000, "beta"], [3600, "alfa"]])" + : R"([[20, "beta"], [36, "alfa"]])")})))); } } @@ -1123,7 +1129,7 @@ TEST(ExecPlanExecution, AggregationPreservesOptions) { .AddToPlan(plan.get())); } - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), + ASSERT_THAT(StartAndCollectSortedByField(plan.get(), sink_gen, 1), Finishes(ResultWith(UnorderedElementsAreArray({ ExecBatchFromJSON({int64(), utf8()}, R"([[500, "alfa"], [200, "beta"], [200, "gama"]])"), @@ -1211,7 +1217,7 @@ TEST(ExecPlanExecution, ScalarSourceGroupedSum) { }) .AddToPlan(plan.get())); - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), + ASSERT_THAT(StartAndCollectSortedByField(plan.get(), sink_gen, 0), Finishes(ResultWith(UnorderedElementsAreArray({ ExecBatchFromJSON({int64(), boolean()}, R"([[6, true], [18, false]])"), })))); @@ -1223,8 +1229,8 @@ TEST(ExecPlanExecution, SelfInnerHashJoinSink) { auto input = MakeGroupableBatches(); - auto exec_ctx = arrow::internal::make_unique( - default_memory_pool(), parallel ? arrow::internal::GetCpuThreadPool() : nullptr); + auto exec_ctx = arrow::internal::make_unique(default_memory_pool()); + exec_ctx->set_use_threads(parallel); ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get())); AsyncGenerator> sink_gen; @@ -1280,8 +1286,8 @@ TEST(ExecPlanExecution, SelfOuterHashJoinSink) { auto input = MakeGroupableBatches(); - auto exec_ctx = arrow::internal::make_unique( - default_memory_pool(), parallel ? arrow::internal::GetCpuThreadPool() : nullptr); + auto exec_ctx = arrow::internal::make_unique(default_memory_pool()); + exec_ctx->set_use_threads(parallel); ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get())); AsyncGenerator> sink_gen; diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index 13564c736b5..d2525325854 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -53,7 +53,10 @@ class SinkNode : public ExecNode { util::BackpressureOptions backpressure) : ExecNode(plan, std::move(inputs), {"collected"}, {}, /*num_outputs=*/0), - producer_(MakeProducer(generator, std::move(backpressure))) {} + producer_(MakeProducer(generator, std::move(backpressure))) { + DCHECK_EQ(1, inputs_.size()); + output_schema_ = inputs_[0]->output_schema(); + } static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { @@ -96,9 +99,9 @@ class SinkNode : public ExecNode { void StopProducing() override { EVENT(span_, "StopProducing"); - - Finish(); - inputs_[0]->StopProducing(this); + if (input_counter_.Cancel()) { + Finish(); + } } Future<> finished() override { return finished_; } @@ -128,7 +131,6 @@ class SinkNode : public ExecNode { if (input_counter_.Cancel()) { Finish(); } - inputs_[0]->StopProducing(this); } void InputFinished(ExecNode* input, int total_batches) override { @@ -140,9 +142,8 @@ class SinkNode : public ExecNode { protected: virtual void Finish() { - if (producer_.Close()) { - finished_.MarkFinished(); - } + producer_.Close(); + finished_.MarkFinished(); } AtomicCounter input_counter_; @@ -193,8 +194,9 @@ class ConsumingSinkNode : public ExecNode { void StopProducing() override { EVENT(span_, "StopProducing"); - Finish(Status::Invalid("ExecPlan was stopped early")); - inputs_[0]->StopProducing(this); + if (input_counter_.Cancel()) { + Finish(Status::OK()); + } } Future<> finished() override { return finished_; } @@ -218,7 +220,6 @@ class ConsumingSinkNode : public ExecNode { if (input_counter_.Cancel()) { Finish(std::move(consumption_status)); } - inputs_[0]->StopProducing(this); return; } @@ -234,8 +235,6 @@ class ConsumingSinkNode : public ExecNode { if (input_counter_.Cancel()) { Finish(std::move(error)); } - - inputs_[0]->StopProducing(this); } void InputFinished(ExecNode* input, int total_batches) override { diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 777faf22192..9f05bfd4f36 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -88,14 +88,12 @@ struct SourceNode : ExecNode { CallbackOptions options; auto executor = plan()->exec_context()->executor(); - if (executor) { - // These options will transfer execution to the desired Executor if necessary. - // This can happen for in-memory scans where batches didn't require - // any CPU work to decode. Otherwise, parsing etc should have already - // been placed us on the desired Executor and no queues will be pushed to. - options.executor = executor; - options.should_schedule = ShouldSchedule::IfDifferentExecutor; - } + // These options will transfer execution to the desired Executor if necessary. + // This can happen for in-memory scans where batches didn't require + // any CPU work to decode. Otherwise, parsing etc should have already + // been placed us on the desired Executor and no queues will be pushed to. + options.executor = executor; + options.should_schedule = ShouldSchedule::IfDifferentExecutor; finished_ = Loop([this, executor, options] { std::unique_lock lock(mutex_); diff --git a/cpp/src/arrow/compute/exec/test_util.cc b/cpp/src/arrow/compute/exec/test_util.cc index 7733ea46084..c9839a6c567 100644 --- a/cpp/src/arrow/compute/exec/test_util.cc +++ b/cpp/src/arrow/compute/exec/test_util.cc @@ -102,9 +102,6 @@ struct DummyNode : ExecNode { void StopProducing() override { if (started_) { - for (const auto& input : inputs_) { - input->StopProducing(this); - } if (stop_producing_) { stop_producing_(this); } @@ -129,6 +126,41 @@ struct DummyNode : ExecNode { bool started_ = false; }; +std::shared_ptr AnonymousSchema(const ExecBatch& batch) { + FieldVector fields; + for (int i = 0; i < batch.num_values(); i++) { + fields.push_back(field("f" + std::to_string(i), batch[i].type())); + } + return schema(std::move(fields)); +} + +Result> ExecBatchToAnonymousRecordBatch( + const ExecBatch& batch) { + return batch.ToRecordBatch(AnonymousSchema(batch)); +} + +Result> ExecBatchesToAnonymousTable( + const std::vector& batches) { + DCHECK_GT(batches.size(), 0); + std::shared_ptr schema = AnonymousSchema(batches[0]); + return TableFromExecBatches(schema, batches); +} + +std::vector TableToExecBatches(const Table& table) { + std::vector exec_batches; + std::shared_ptr sample_array = table.column(0); + for (int i = 0; i < sample_array->num_chunks(); i++) { + std::vector datums; + for (int j = 0; j < table.num_columns(); j++) { + DCHECK_EQ(sample_array->chunk(i)->length(), table.column(j)->chunk(i)->length()); + datums.push_back(table.column(j)->chunk(i)); + } + exec_batches.push_back( + ExecBatch(std::move(datums), sample_array->chunk(i)->length())); + } + return exec_batches; +} + } // namespace ExecNode* MakeDummyNode(ExecPlan* plan, std::string label, std::vector inputs, @@ -181,6 +213,14 @@ Future> StartAndCollect( }); } +Future> StartAndCollectSortedByField( + ExecPlan* plan, AsyncGenerator> gen, int field_index) { + return StartAndCollect(plan, std::move(gen)) + .Then([field_index](const std::vector& batches) { + return SortBatchesByField(batches, field_index); + }); +} + BatchesWithSchema MakeBasicBatches() { BatchesWithSchema out; out.batches = { @@ -231,6 +271,24 @@ Result> SortTableOnAllFields(const std::shared_ptr
return tab_sorted.table(); } +Result SortBatchByField(const ExecBatch& batch, int field_index) { + ARROW_ASSIGN_OR_RAISE(auto sort_indices, + SortIndices(batch[field_index], SortOptions{})); + ARROW_ASSIGN_OR_RAISE(auto record_batch, ExecBatchToAnonymousRecordBatch(batch)); + ARROW_ASSIGN_OR_RAISE(auto sorted_data, Take(record_batch, sort_indices)); + return ExecBatch(*sorted_data.record_batch()); +} + +Result> SortBatchesByField(const std::vector& batches, + int field_index) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr
table, + ExecBatchesToAnonymousTable(batches)); + ARROW_ASSIGN_OR_RAISE(auto sort_indices, + SortIndices(table->column(field_index), SortOptions{})); + ARROW_ASSIGN_OR_RAISE(auto sorted_data, Take(table, sort_indices)); + return TableToExecBatches(*sorted_data.table()); +} + void AssertTablesEqual(const std::shared_ptr
& exp, const std::shared_ptr
& act) { ASSERT_EQ(exp->num_columns(), act->num_columns()); diff --git a/cpp/src/arrow/compute/exec/test_util.h b/cpp/src/arrow/compute/exec/test_util.h index 9347d1343f1..d53c53533ff 100644 --- a/cpp/src/arrow/compute/exec/test_util.h +++ b/cpp/src/arrow/compute/exec/test_util.h @@ -86,6 +86,10 @@ ARROW_TESTING_EXPORT Future> StartAndCollect( ExecPlan* plan, AsyncGenerator> gen); +ARROW_TESTING_EXPORT +Future> StartAndCollectSortedByField( + ExecPlan* plan, AsyncGenerator> gen, int field_index); + ARROW_TESTING_EXPORT BatchesWithSchema MakeBasicBatches(); @@ -99,6 +103,13 @@ BatchesWithSchema MakeRandomBatches(const std::shared_ptr& schema, ARROW_TESTING_EXPORT Result> SortTableOnAllFields(const std::shared_ptr
& tab); +ARROW_TESTING_EXPORT +Result SortBatchByField(const ExecBatch& batch, int field_index); + +ARROW_TESTING_EXPORT +Result> SortBatchesByField(const std::vector& batches, + int field_index); + ARROW_TESTING_EXPORT void AssertTablesEqual(const std::shared_ptr
& exp, const std::shared_ptr
& act); diff --git a/cpp/src/arrow/compute/exec/union_node.cc b/cpp/src/arrow/compute/exec/union_node.cc index 9232516cc6d..827421610d8 100644 --- a/cpp/src/arrow/compute/exec/union_node.cc +++ b/cpp/src/arrow/compute/exec/union_node.cc @@ -131,18 +131,12 @@ class UnionNode : public ExecNode { if (batch_count_.Cancel()) { finished_.MarkFinished(); } - for (auto&& input : inputs_) { - input->StopProducing(this); - } } void StopProducing() override { if (batch_count_.Cancel()) { finished_.MarkFinished(); } - for (auto&& input : inputs_) { - input->StopProducing(this); - } } Future<> finished() override { return finished_; } diff --git a/cpp/src/arrow/compute/exec_test.cc b/cpp/src/arrow/compute/exec_test.cc index 198cb84ff5e..3a63b8f6649 100644 --- a/cpp/src/arrow/compute/exec_test.cc +++ b/cpp/src/arrow/compute/exec_test.cc @@ -44,6 +44,7 @@ #include "arrow/util/cpu_info.h" #include "arrow/util/logging.h" #include "arrow/util/make_unique.h" +#include "arrow/util/thread_pool.h" namespace arrow { @@ -70,10 +71,12 @@ TEST(ExecContext, BasicWorkings) { // Now, let's customize all the things LoggingMemoryPool my_pool(default_memory_pool()); std::unique_ptr custom_reg = FunctionRegistry::Make(); - ExecContext ctx(&my_pool, /*executor=*/nullptr, custom_reg.get()); + ASSERT_OK_AND_ASSIGN(auto executor, ::arrow::internal::ThreadPool::Make(1)); + ExecContext ctx(&my_pool, executor.get(), custom_reg.get()); ASSERT_EQ(custom_reg.get(), ctx.func_registry()); ASSERT_EQ(&my_pool, ctx.memory_pool()); + ASSERT_EQ(executor.get(), ctx.executor()); ctx.set_exec_chunksize(1 << 20); ASSERT_EQ(1 << 20, ctx.exec_chunksize()); diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index e292cf4a9bc..eefcad87486 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -81,7 +81,8 @@ InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches, } Result InMemoryFragment::ScanBatchesAsync( - const std::shared_ptr& options) { + const std::shared_ptr& options, + ::arrow::internal::Executor* cpu_executor) { struct State { State(std::shared_ptr fragment, int64_t batch_size) : fragment(std::move(fragment)), diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index 9f4fee52154..d12ee85c9d5 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -31,6 +31,7 @@ #include "arrow/util/macros.h" #include "arrow/util/mutex.h" #include "arrow/util/optional.h" +#include "arrow/util/thread_pool.h" namespace arrow { namespace dataset { @@ -55,9 +56,10 @@ class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this { /// The schema is cached after being read once, or may be specified at construction. Result> ReadPhysicalSchema(); - /// An asynchronous version of Scan + /// \brief Scans the fragment and returns the data as batches virtual Result ScanBatchesAsync( - const std::shared_ptr& options) = 0; + const std::shared_ptr& options, + ::arrow::internal::Executor* cpu_executor) = 0; /// \brief Count the number of rows in this fragment matching the filter using metadata /// only. That is, this method may perform I/O, but will not load data. @@ -119,7 +121,8 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment { compute::Expression = compute::literal(true)); Result ScanBatchesAsync( - const std::shared_ptr& options) override; + const std::shared_ptr& options, + ::arrow::internal::Executor* cpu_executor) override; Future> CountRows( compute::Expression predicate, const std::shared_ptr& options) override; diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index f4551c27590..cedaa7008f3 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -118,9 +118,10 @@ Result> FileFragment::ReadPhysicalSchemaImpl() { } Result FileFragment::ScanBatchesAsync( - const std::shared_ptr& options) { + const std::shared_ptr& options, + ::arrow::internal::Executor* cpu_executor) { auto self = std::dynamic_pointer_cast(shared_from_this()); - return format_->ScanBatchesAsync(options, self); + return format_->ScanBatchesAsync(options, self, cpu_executor); } Future> FileFragment::CountRows( @@ -332,13 +333,10 @@ class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer { util::SerializedAsyncTaskGroup task_group_; }; -} // namespace - -Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_options, - std::shared_ptr scanner) { +Future<> DoWriteAsync(const FileSystemDatasetWriteOptions& write_options, + std::shared_ptr scanner, + ::arrow::internal::Executor* cpu_executor) { const io::IOContext& io_context = scanner->options()->io_context; - auto cpu_executor = - scanner->options()->use_threads ? ::arrow::internal::GetCpuThreadPool() : nullptr; std::shared_ptr exec_context = std::make_shared(io_context.pool(), cpu_executor); @@ -366,7 +364,26 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio .AddToPlan(plan.get())); RETURN_NOT_OK(plan->StartProducing()); - return plan->finished().status(); + return plan->finished().Then([plan, exec_context]() { + // Keep plan and exec_context alive until the plan is finished + }); +} + +} // namespace + +Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_options, + std::shared_ptr scanner) { + if (scanner->options()->use_threads) { + return DoWriteAsync(std::move(write_options), std::move(scanner), + ::arrow::internal::GetCpuThreadPool()) + .status(); + } else { + return ::arrow::internal::SerialExecutor::RunInSerialExecutor< + ::arrow::internal::Empty>( + [write_options, scanner](::arrow::internal::Executor* executor) { + return DoWriteAsync(std::move(write_options), std::move(scanner), executor); + }); + } } Result MakeWriteNode(compute::ExecPlan* plan, diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 07b156778f6..6df5457a4f0 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -147,9 +147,26 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this> Inspect(const FileSource& source) const = 0; + /// \brief Scan the fragment returning the data as a stream of batches + /// + /// Typically it is expected that each generator task will involve some amount + /// of I/O work and some amount of CPU work. The format is responsible for transferring + /// the work back and forth between the I/O executor (obtained from the filesystem) and + /// CPU executor (passed in directly) as needed. + /// + /// If the underlying format does not support asynchronous operation then the correct + /// workaround is to schedule the entire task on the I/O executor. There is typically + /// no need to transfer back to the CPU executor as this will be done automatically by + /// the scan node. + /// + /// If the underlying format spends a lot of time blocked on I/O then it may be + /// neccesary to make the I/O executor considerably larger than the # of cores on the + /// device to ensure maximum parallelism. This could potentially lead to excess context + /// switching which is why an async approach is preferred. virtual Result ScanBatchesAsync( const std::shared_ptr& options, - const std::shared_ptr& file) const = 0; + const std::shared_ptr& file, + ::arrow::internal::Executor* cpu_executor) const = 0; virtual Future> CountRows( const std::shared_ptr& file, compute::Expression predicate, @@ -182,7 +199,8 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this ScanBatchesAsync( - const std::shared_ptr& options) override; + const std::shared_ptr& options, + ::arrow::internal::Executor* cpu_executor) override; Future> CountRows( compute::Expression predicate, const std::shared_ptr& options) override; diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index 1cc7957083f..3c57b2a1e30 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -46,7 +46,6 @@ namespace arrow { using internal::checked_cast; using internal::checked_pointer_cast; using internal::Executor; -using internal::SerialExecutor; namespace dataset { @@ -245,11 +244,11 @@ Result> CsvFileFormat::Inspect(const FileSource& source) Result CsvFileFormat::ScanBatchesAsync( const std::shared_ptr& scan_options, - const std::shared_ptr& file) const { + const std::shared_ptr& file, + ::arrow::internal::Executor* cpu_executor) const { auto this_ = checked_pointer_cast(shared_from_this()); auto source = file->source(); - auto reader_fut = - OpenReaderAsync(source, *this, scan_options, ::arrow::internal::GetCpuThreadPool()); + auto reader_fut = OpenReaderAsync(source, *this, scan_options, cpu_executor); return GeneratorFromReader(std::move(reader_fut), scan_options->batch_size); } diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index 83dbb88b85f..7fdb82afbe4 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -55,7 +55,8 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { Result ScanBatchesAsync( const std::shared_ptr& scan_options, - const std::shared_ptr& file) const override; + const std::shared_ptr& file, + ::arrow::internal::Executor* cpu_executor) const override; Future> CountRows( const std::shared_ptr& file, compute::Expression predicate, diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc index 00644b46ebc..a63d2cf7ebd 100644 --- a/cpp/src/arrow/dataset/file_csv_test.cc +++ b/cpp/src/arrow/dataset/file_csv_test.cc @@ -98,7 +98,9 @@ class TestCsvFileFormat : public FileFormatFixtureMixin, } RecordBatchIterator Batches(Fragment* fragment) { - EXPECT_OK_AND_ASSIGN(auto batch_gen, fragment->ScanBatchesAsync(opts_)); + EXPECT_OK_AND_ASSIGN( + auto batch_gen, + fragment->ScanBatchesAsync(opts_, ::arrow::internal::GetCpuThreadPool())); return MakeGeneratorIterator(batch_gen); } }; diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index e386c7dea8d..46100003f93 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -120,7 +120,8 @@ Result> IpcFileFormat::Inspect(const FileSource& source) Result IpcFileFormat::ScanBatchesAsync( const std::shared_ptr& options, - const std::shared_ptr& file) const { + const std::shared_ptr& file, + ::arrow::internal::Executor* cpu_executor) const { auto self = shared_from_this(); auto source = file->source(); auto open_reader = OpenReaderAsync(source); @@ -143,10 +144,10 @@ Result IpcFileFormat::ScanBatchesAsync( RecordBatchGenerator generator; if (ipc_scan_options->cache_options) { // Transferring helps performance when coalescing - ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator( - /*coalesce=*/true, options->io_context, - *ipc_scan_options->cache_options, - ::arrow::internal::GetCpuThreadPool())); + ARROW_ASSIGN_OR_RAISE(generator, + reader->GetRecordBatchGenerator( + /*coalesce=*/true, options->io_context, + *ipc_scan_options->cache_options, cpu_executor)); } else { ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator( /*coalesce=*/false, options->io_context)); diff --git a/cpp/src/arrow/dataset/file_ipc.h b/cpp/src/arrow/dataset/file_ipc.h index 29ce6be61d6..a0079424ec6 100644 --- a/cpp/src/arrow/dataset/file_ipc.h +++ b/cpp/src/arrow/dataset/file_ipc.h @@ -54,7 +54,8 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat { Result ScanBatchesAsync( const std::shared_ptr& options, - const std::shared_ptr& file) const override; + const std::shared_ptr& file, + ::arrow::internal::Executor* cpu_executor) const override; Future> CountRows( const std::shared_ptr& file, compute::Expression predicate, diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc index b085ad6a1de..e4c6e12569b 100644 --- a/cpp/src/arrow/dataset/file_ipc_test.cc +++ b/cpp/src/arrow/dataset/file_ipc_test.cc @@ -166,7 +166,8 @@ TEST_P(TestIpcFileFormatScan, FragmentScanOptions) { fragment_scan_options->options = std::make_shared(); fragment_scan_options->options->max_recursion_depth = 0; opts_->fragment_scan_options = fragment_scan_options; - ASSERT_OK_AND_ASSIGN(auto batch_gen, fragment->ScanBatchesAsync(opts_)); + ASSERT_OK_AND_ASSIGN(auto batch_gen, fragment->ScanBatchesAsync( + opts_, ::arrow::internal::GetCpuThreadPool())); ASSERT_FINISHES_AND_RAISES(Invalid, CollectAsyncGenerator(batch_gen)); } INSTANTIATE_TEST_SUITE_P(TestScan, TestIpcFileFormatScan, diff --git a/cpp/src/arrow/dataset/file_orc.cc b/cpp/src/arrow/dataset/file_orc.cc index 49102f3deae..46c8dd21ee6 100644 --- a/cpp/src/arrow/dataset/file_orc.cc +++ b/cpp/src/arrow/dataset/file_orc.cc @@ -154,7 +154,8 @@ Result> OrcFileFormat::Inspect(const FileSource& source) Result OrcFileFormat::ScanBatchesAsync( const std::shared_ptr& options, - const std::shared_ptr& file) const { + const std::shared_ptr& file, + ::arrow::internal::Executor* cpu_executor) const { // TODO investigate "true" async version // (https://issues.apache.org/jira/browse/ARROW-13795) ARROW_ASSIGN_OR_RAISE(auto task_iter, OrcScanTaskIterator::Make(options, file)); diff --git a/cpp/src/arrow/dataset/file_orc.h b/cpp/src/arrow/dataset/file_orc.h index 5bbe4df24ad..3805f81dddc 100644 --- a/cpp/src/arrow/dataset/file_orc.h +++ b/cpp/src/arrow/dataset/file_orc.h @@ -53,7 +53,8 @@ class ARROW_DS_EXPORT OrcFileFormat : public FileFormat { Result ScanBatchesAsync( const std::shared_ptr& options, - const std::shared_ptr& file) const override; + const std::shared_ptr& file, + ::arrow::internal::Executor* cpu_executor) const override; Future> CountRows( const std::shared_ptr& file, compute::Expression predicate, diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 5fbd457eccd..73cea9bb0e7 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -375,7 +375,8 @@ Future> ParquetFileFormat::GetReader Result ParquetFileFormat::ScanBatchesAsync( const std::shared_ptr& options, - const std::shared_ptr& file) const { + const std::shared_ptr& file, + ::arrow::internal::Executor* cpu_executor) const { auto parquet_fragment = checked_pointer_cast(file); std::vector row_groups; bool pre_filtered = false; @@ -409,10 +410,9 @@ Result ParquetFileFormat::ScanBatchesAsync( // Assume 1 row group corresponds to 1 batch (this factor could be // improved by looking at metadata) int row_group_readahead = options->batch_readahead; - ARROW_ASSIGN_OR_RAISE( - auto generator, reader->GetRecordBatchGenerator( - reader, row_groups, column_projection, - ::arrow::internal::GetCpuThreadPool(), row_group_readahead)); + ARROW_ASSIGN_OR_RAISE(auto generator, reader->GetRecordBatchGenerator( + reader, row_groups, column_projection, + cpu_executor, row_group_readahead)); return generator; }; return MakeFromFuture(GetReaderAsync(parquet_fragment->source(), options) diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 6f2f5420681..b9f60da83a4 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -97,7 +97,8 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { Result ScanBatchesAsync( const std::shared_ptr& options, - const std::shared_ptr& file) const override; + const std::shared_ptr& file, + ::arrow::internal::Executor* cpu_executor) const override; Future> CountRows( const std::shared_ptr& file, compute::Expression predicate, diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index d5c7a0b9850..ec3fa28507c 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -122,7 +122,9 @@ class ParquetFormatHelper { class TestParquetFileFormat : public FileFormatFixtureMixin { public: RecordBatchIterator Batches(Fragment* fragment) { - EXPECT_OK_AND_ASSIGN(auto batch_gen, fragment->ScanBatchesAsync(opts_)); + EXPECT_OK_AND_ASSIGN( + auto batch_gen, + fragment->ScanBatchesAsync(opts_, ::arrow::internal::GetCpuThreadPool())); return MakeGeneratorIterator(batch_gen); } @@ -586,8 +588,10 @@ TEST_P(TestParquetFileFormatScan, ExplicitRowGroupSelection) { SetFilter(greater(field_ref("i64"), literal(3))); CountRowsAndBatchesInScan(row_groups_fragment({2, 3, 4, 5}), 4 + 5 + 6, 3); - ASSERT_OK_AND_ASSIGN(auto batch_gen, - row_groups_fragment({kNumRowGroups + 1})->ScanBatchesAsync(opts_)); + ASSERT_OK_AND_ASSIGN( + auto batch_gen, + row_groups_fragment({kNumRowGroups + 1}) + ->ScanBatchesAsync(opts_, ::arrow::internal::GetCpuThreadPool())); Status scan_status = CollectAsyncGenerator(batch_gen).status(); EXPECT_RAISES_WITH_MESSAGE_THAT( diff --git a/cpp/src/arrow/dataset/file_test.cc b/cpp/src/arrow/dataset/file_test.cc index cc89c163cb7..db3ba88eb42 100644 --- a/cpp/src/arrow/dataset/file_test.cc +++ b/cpp/src/arrow/dataset/file_test.cc @@ -91,7 +91,8 @@ class MockFileFormat : public FileFormat { public: Result ScanBatchesAsync( const std::shared_ptr& options, - const std::shared_ptr& file) const override { + const std::shared_ptr& file, + ::arrow::internal::Executor* cpu_executor) const override { auto sch = schema({field("i32", int32())}); RecordBatchVector batches; for (int i = 0; i < kNumBatches; i++) { @@ -119,7 +120,8 @@ class MockFileFormat : public FileFormat { TEST(FileFormat, ScanAsync) { MockFileFormat format; auto scan_options = std::make_shared(); - ASSERT_OK_AND_ASSIGN(auto batch_gen, format.ScanBatchesAsync(scan_options, nullptr)); + ASSERT_OK_AND_ASSIGN(auto batch_gen, + format.ScanBatchesAsync(scan_options, nullptr, nullptr)); ASSERT_FINISHES_OK_AND_ASSIGN(auto batches, CollectAsyncGenerator(batch_gen)); ASSERT_EQ(kNumBatches, static_cast(batches.size())); for (int i = 0; i < kNumBatches; i++) { diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index b958f7b9e62..3d132aa4e39 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -186,6 +186,7 @@ class AsyncScanner : public Scanner, public std::enable_shared_from_this> TakeRows(const Array& indices) override; Result> Head(int64_t num_rows) override; Result> ToTable() override; + Future CountRowsAsync(Executor* cpu_executor); Result CountRows() override; Result> ToRecordBatchReader() override; const std::shared_ptr& dataset() const override; @@ -194,8 +195,7 @@ class AsyncScanner : public Scanner, public std::enable_shared_from_this ScanBatchesAsync(Executor* executor); Future<> VisitBatchesAsync(std::function visitor, Executor* executor); - Result ScanBatchesUnorderedAsync( - Executor* executor, bool sequence_fragments = false); + Result ScanBatchesUnorderedAsync(Executor* executor); Future> ToTableAsync(Executor* executor); Result GetFragments() const; @@ -205,8 +205,9 @@ class AsyncScanner : public Scanner, public std::enable_shared_from_this FragmentToBatches( const Enumerated>& fragment, - const std::shared_ptr& options) { - ARROW_ASSIGN_OR_RAISE(auto batch_gen, fragment.value->ScanBatchesAsync(options)); + const std::shared_ptr& options, Executor* cpu_executor) { + ARROW_ASSIGN_OR_RAISE(auto batch_gen, + fragment.value->ScanBatchesAsync(options, cpu_executor)); ArrayVector columns; for (const auto& field : options->dataset_schema->fields()) { // TODO(ARROW-7051): use helper to make empty batch @@ -228,11 +229,12 @@ Result FragmentToBatches( } Result> FragmentsToBatches( - FragmentGenerator fragment_gen, const std::shared_ptr& options) { + FragmentGenerator fragment_gen, const std::shared_ptr& options, + Executor* cpu_executor) { auto enumerated_fragment_gen = MakeEnumeratedGenerator(std::move(fragment_gen)); return MakeMappedGenerator(std::move(enumerated_fragment_gen), [=](const Enumerated>& fragment) { - return FragmentToBatches(fragment, options); + return FragmentToBatches(fragment, options, cpu_executor); }); } @@ -248,13 +250,12 @@ class OneShotFragment : public Fragment { return Status::OK(); } Result ScanBatchesAsync( - const std::shared_ptr& options) override { + const std::shared_ptr& options, Executor* cpu_executor) override { RETURN_NOT_OK(CheckConsumed()); ARROW_ASSIGN_OR_RAISE( auto background_gen, MakeBackgroundGenerator(std::move(batch_it_), options->io_context.executor())); - return MakeTransferredGenerator(std::move(background_gen), - ::arrow::internal::GetCpuThreadPool()); + return MakeTransferredGenerator(std::move(background_gen), cpu_executor); } std::string type_name() const override { return "one-shot"; } @@ -276,20 +277,18 @@ Result AsyncScanner::GetFragments() const { } Result AsyncScanner::ScanBatches() { - ARROW_ASSIGN_OR_RAISE(auto batches_gen, - ScanBatchesAsync(::arrow::internal::GetCpuThreadPool())); - return MakeGeneratorIterator(std::move(batches_gen)); + return SerialExecutor::IterateGenerator( + [this](Executor* executor) { return ScanBatchesAsync(executor); }); } Result AsyncScanner::ScanBatchesUnordered() { - ARROW_ASSIGN_OR_RAISE(auto batches_gen, - ScanBatchesUnorderedAsync(::arrow::internal::GetCpuThreadPool())); - return MakeGeneratorIterator(std::move(batches_gen)); + return SerialExecutor::IterateGenerator( + [this](Executor* executor) { return ScanBatchesUnorderedAsync(executor); }); } Result> AsyncScanner::ToTable() { - auto table_fut = ToTableAsync(::arrow::internal::GetCpuThreadPool()); - return table_fut.result(); + return SerialExecutor::RunInSerialExecutor>( + [this](Executor* executor) { return ToTableAsync(executor); }); } Result AsyncScanner::ScanBatchesUnorderedAsync() { @@ -314,13 +313,8 @@ Result ToEnumeratedRecordBatch( } Result AsyncScanner::ScanBatchesUnorderedAsync( - Executor* cpu_executor, bool sequence_fragments) { - if (!scan_options_->use_threads) { - cpu_executor = nullptr; - } - + Executor* cpu_executor) { RETURN_NOT_OK(NormalizeScanOptions(scan_options_, dataset_->schema())); - auto exec_context = std::make_shared(scan_options_->pool, cpu_executor); @@ -337,8 +331,7 @@ Result AsyncScanner::ScanBatchesUnorderedAsync( RETURN_NOT_OK( compute::Declaration::Sequence( { - {"scan", ScanNodeOptions{dataset_, scan_options_, backpressure.toggle, - sequence_fragments}}, + {"scan", ScanNodeOptions{dataset_, scan_options_, backpressure.toggle}}, {"filter", compute::FilterNodeOptions{scan_options_->filter}}, {"augmented_project", compute::ProjectNodeOptions{std::move(exprs), std::move(names)}}, @@ -484,8 +477,7 @@ Result AsyncScanner::ScanBatchesAsync() { Result AsyncScanner::ScanBatchesAsync( Executor* cpu_executor) { - ARROW_ASSIGN_OR_RAISE(auto unordered, ScanBatchesUnorderedAsync( - cpu_executor, /*sequence_fragments=*/true)); + ARROW_ASSIGN_OR_RAISE(auto unordered, ScanBatchesUnorderedAsync(cpu_executor)); // We need an initial value sentinel, so we use one with fragment.index < 0 auto is_before_any = [](const EnumeratedRecordBatch& batch) { return batch.fragment.index < 0; @@ -597,33 +589,40 @@ Future> AsyncScanner::ToTableAsync(Executor* cpu_executor }); } -Result AsyncScanner::CountRows() { +Future AsyncScanner::CountRowsAsync(Executor* cpu_executor) { ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments()); - auto cpu_executor = - scan_options_->use_threads ? ::arrow::internal::GetCpuThreadPool() : nullptr; - compute::ExecContext exec_context(scan_options_->pool, cpu_executor); + std::shared_ptr exec_context = + std::make_shared(scan_options_->pool, cpu_executor); - ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(&exec_context)); + ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(exec_context.get())); // Drop projection since we only need to count rows - const auto options = std::make_shared(*scan_options_); + auto options = std::make_shared(*scan_options_); ARROW_ASSIGN_OR_RAISE(auto empty_projection, ProjectionDescr::FromNames(std::vector(), *scan_options_->dataset_schema)); SetProjection(options.get(), empty_projection); - std::atomic total{0}; + struct CountRowsState { + explicit CountRowsState(std::shared_ptr options) + : options(std::move(options)) {} + + std::shared_ptr options; + std::atomic total{0}; + }; + std::shared_ptr state = std::make_shared(options); + CountRowsState* state_ptr = state.get(); fragment_gen = MakeMappedGenerator( - std::move(fragment_gen), [&](const std::shared_ptr& fragment) { - return fragment->CountRows(options->filter, options) + std::move(fragment_gen), [state_ptr](const std::shared_ptr& fragment) { + return fragment->CountRows(state_ptr->options->filter, state_ptr->options) .Then([&, fragment](util::optional fast_count) mutable -> std::shared_ptr { if (fast_count) { // fast path: got row count directly; skip scanning this fragment - total += *fast_count; - return std::make_shared(options->dataset_schema, - RecordBatchVector{}); + state_ptr->total += *fast_count; + return std::make_shared( + state_ptr->options->dataset_schema, RecordBatchVector{}); } // slow path: actually filter this fragment's batches @@ -636,10 +635,10 @@ Result AsyncScanner::CountRows() { RETURN_NOT_OK( compute::Declaration::Sequence( { - {"scan", ScanNodeOptions{std::make_shared( - scan_options_->dataset_schema, - std::move(fragment_gen)), - options}}, + {"scan", + ScanNodeOptions{std::make_shared(options->dataset_schema, + std::move(fragment_gen)), + options}}, {"project", compute::ProjectNodeOptions{{options->filter}, {"mask"}}}, {"aggregate", compute::AggregateNodeOptions{{compute::internal::Aggregate{ "sum", nullptr}}, @@ -650,13 +649,19 @@ Result AsyncScanner::CountRows() { .AddToPlan(plan.get())); RETURN_NOT_OK(plan->StartProducing()); - auto maybe_slow_count = sink_gen().result(); - plan->finished().Wait(); + return plan->finished().Then( + [sink_gen, state, plan, exec_context]() -> Result { + auto maybe_slow_count = sink_gen().result(); + ARROW_ASSIGN_OR_RAISE(auto slow_count, maybe_slow_count); + state->total += slow_count->values[0].scalar_as().value; - ARROW_ASSIGN_OR_RAISE(auto slow_count, maybe_slow_count); - total += slow_count->values[0].scalar_as().value; + return state->total.load(); + }); +} - return total.load(); +Result AsyncScanner::CountRows() { + return SerialExecutor::RunInSerialExecutor( + [this](Executor* executor) { return CountRowsAsync(executor); }); } Result> AsyncScanner::ToRecordBatchReader() { @@ -831,7 +836,6 @@ Result MakeScanNode(compute::ExecPlan* plan, auto scan_options = scan_node_options.scan_options; auto dataset = scan_node_options.dataset; const auto& backpressure_toggle = scan_node_options.backpressure_toggle; - bool require_sequenced_output = scan_node_options.require_sequenced_output; RETURN_NOT_OK(NormalizeScanOptions(scan_options, dataset->schema())); @@ -841,17 +845,12 @@ Result MakeScanNode(compute::ExecPlan* plan, auto fragment_gen = MakeVectorGenerator(std::move(fragments_vec)); ARROW_ASSIGN_OR_RAISE(auto batch_gen_gen, - FragmentsToBatches(std::move(fragment_gen), scan_options)); + FragmentsToBatches(std::move(fragment_gen), scan_options, + plan->exec_context()->executor())); AsyncGenerator merged_batch_gen; - if (require_sequenced_output) { - ARROW_ASSIGN_OR_RAISE(merged_batch_gen, - MakeSequencedMergedGenerator(std::move(batch_gen_gen), - scan_options->fragment_readahead)); - } else { - merged_batch_gen = - MakeMergedGenerator(std::move(batch_gen_gen), scan_options->fragment_readahead); - } + merged_batch_gen = + MakeMergedGenerator(std::move(batch_gen_gen), scan_options->fragment_readahead); auto batch_gen = MakeReadaheadGenerator(std::move(merged_batch_gen), scan_options->fragment_readahead); diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 013ba092b0c..7a1ef3c4992 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -406,15 +406,21 @@ class ARROW_DS_EXPORT ScanNodeOptions : public compute::ExecNodeOptions { std::shared_ptr dataset, std::shared_ptr scan_options, std::shared_ptr backpressure_toggle = NULLPTR, bool require_sequenced_output = false) - : dataset(std::move(dataset)), - scan_options(std::move(scan_options)), - backpressure_toggle(std::move(backpressure_toggle)), - require_sequenced_output(require_sequenced_output) {} + : dataset(std::move(dataset)), scan_options(std::move(scan_options)) {} + /// \brief The dataset to scan std::shared_ptr dataset; + /// \brief Instructions on how to scan the dataset. + /// + /// The use_threads option will be ignored. The scanner will always + /// use the exec plan's executor (which may be a serial executor) std::shared_ptr scan_options; + /// \brief An optional toggle to allow pausing the scan. + /// + /// The node will cease reading from the scanner while the toggle is closed. + /// The scanner will continue until its readahead queue has filled up at which + /// point scanning will pause. std::shared_ptr backpressure_toggle; - bool require_sequenced_output; }; /// @} diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 7fedb7d3c72..a93ddacbf30 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -48,6 +48,7 @@ using testing::UnorderedElementsAreArray; namespace arrow { +using internal::Executor; using internal::GetCpuThreadPool; using internal::Iota; @@ -481,8 +482,8 @@ class CountRowsOnlyFragment : public InMemoryFragment { } return Future>::MakeFinished(sum); } - Result ScanBatchesAsync( - const std::shared_ptr&) override { + Result ScanBatchesAsync(const std::shared_ptr&, + Executor* cpu_executor) override { return Status::Invalid("Don't scan me!"); } }; @@ -495,8 +496,8 @@ class ScanOnlyFragment : public InMemoryFragment { compute::Expression predicate, const std::shared_ptr&) override { return Future>::MakeFinished(util::nullopt); } - Result ScanBatchesAsync( - const std::shared_ptr&) override { + Result ScanBatchesAsync(const std::shared_ptr&, + Executor*) override { return MakeVectorGenerator(record_batches_); } }; @@ -588,7 +589,7 @@ class FailingFragment : public InMemoryFragment { public: using InMemoryFragment::InMemoryFragment; Result ScanBatchesAsync( - const std::shared_ptr& options) override { + const std::shared_ptr& options, Executor*) override { struct { Future> operator()() { if (index > 16) { @@ -612,7 +613,7 @@ class FailingScanFragment : public InMemoryFragment { // There are two places to fail - during iteration (covered by FailingFragment) or at // the initial scan (covered here) Result ScanBatchesAsync( - const std::shared_ptr& options) override { + const std::shared_ptr& options, Executor*) override { return Status::Invalid("Oh no, we failed!"); } }; @@ -767,7 +768,7 @@ class ControlledFragment : public Fragment { std::string type_name() const override { return "scanner_test.cc::ControlledFragment"; } Result ScanBatchesAsync( - const std::shared_ptr& options) override { + const std::shared_ptr& options, Executor*) override { return tracking_generator_; }; diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index 3f826fa09c9..6524f6b5b8e 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -158,7 +158,9 @@ class DatasetFixtureMixin : public ::testing::Test { /// record batches yielded by the data fragment. void AssertFragmentEquals(RecordBatchReader* expected, Fragment* fragment, bool ensure_drained = true) { - ASSERT_OK_AND_ASSIGN(auto batch_gen, fragment->ScanBatchesAsync(options_)); + ASSERT_OK_AND_ASSIGN( + auto batch_gen, + fragment->ScanBatchesAsync(options_, ::arrow::internal::GetCpuThreadPool())); AssertScanTaskEquals(expected, batch_gen); if (ensure_drained) { @@ -581,7 +583,9 @@ class FileFormatScanMixin : public FileFormatFixtureMixin, // Scan the fragment directly, without using the scanner. RecordBatchIterator PhysicalBatches(std::shared_ptr fragment) { opts_->use_threads = GetParam().use_threads; - EXPECT_OK_AND_ASSIGN(auto batch_gen, fragment->ScanBatchesAsync(opts_)); + EXPECT_OK_AND_ASSIGN( + auto batch_gen, + fragment->ScanBatchesAsync(opts_, ::arrow::internal::GetCpuThreadPool())); auto batch_it = MakeGeneratorIterator(std::move(batch_gen)); return batch_it; } @@ -880,7 +884,8 @@ class DummyFileFormat : public FileFormat { /// \brief Open a file for scanning (always returns an empty generator) Result ScanBatchesAsync( const std::shared_ptr& options, - const std::shared_ptr& fragment) const override { + const std::shared_ptr& fragment, + ::arrow::internal::Executor* cpu_executor) const override { return MakeEmptyGenerator>(); } @@ -920,7 +925,8 @@ class JSONRecordBatchFileFormat : public FileFormat { Result ScanBatchesAsync( const std::shared_ptr& options, - const std::shared_ptr& fragment) const override { + const std::shared_ptr& fragment, + ::arrow::internal::Executor* cpu_executor) const override { ARROW_ASSIGN_OR_RAISE(auto file, fragment->source().Open()); ARROW_ASSIGN_OR_RAISE(int64_t size, file->GetSize()); ARROW_ASSIGN_OR_RAISE(auto buffer, file->Read(size)); diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index d132198a259..4b7fffbf899 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -1445,23 +1445,6 @@ AsyncGenerator MakeMergedGenerator(AsyncGenerator> source, return MergedGenerator(std::move(source), max_subscriptions); } -template -Result> MakeSequencedMergedGenerator( - AsyncGenerator> source, int max_subscriptions) { - if (max_subscriptions < 0) { - return Status::Invalid("max_subscriptions must be a positive integer"); - } - if (max_subscriptions == 1) { - return Status::Invalid("Use MakeConcatenatedGenerator if max_subscriptions is 1"); - } - AsyncGenerator> autostarting_source = MakeMappedGenerator( - std::move(source), - [](const AsyncGenerator& sub) { return MakeAutoStartingGenerator(sub); }); - AsyncGenerator> sub_readahead = - MakeSerialReadaheadGenerator(std::move(autostarting_source), max_subscriptions - 1); - return MakeConcatenatedGenerator(std::move(sub_readahead)); -} - /// \brief Create a generator that takes in a stream of generators and pulls from each /// one in sequence. /// diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc index 724ad6651eb..bc2bbc5f01c 100644 --- a/cpp/src/arrow/util/async_generator_test.cc +++ b/cpp/src/arrow/util/async_generator_test.cc @@ -796,151 +796,6 @@ TEST_P(AutoStartingGeneratorTestFixture, CopySafe) { INSTANTIATE_TEST_SUITE_P(AutoStartingGeneratorTests, AutoStartingGeneratorTestFixture, ::testing::Values(false, true)); -class SeqMergedGeneratorTestFixture : public ::testing::Test { - protected: - SeqMergedGeneratorTestFixture() : tracked_source_(push_source_) {} - - void BeginCaptureOutput(AsyncGenerator gen) { - finished_ = VisitAsyncGenerator(std::move(gen), [this](TestInt val) { - sink_.push_back(val.value); - return Status::OK(); - }); - } - - void EmitItem(int sub_index, int value) { - EXPECT_LT(sub_index, push_subs_.size()); - push_subs_[sub_index].producer().Push(value); - } - - void EmitErrorItem(int sub_index) { - EXPECT_LT(sub_index, push_subs_.size()); - push_subs_[sub_index].producer().Push(Status::Invalid("XYZ")); - } - - void EmitSub() { - PushGenerator sub; - util::TrackingGenerator tracked_sub(sub); - tracked_subs_.push_back(tracked_sub); - push_subs_.push_back(std::move(sub)); - push_source_.producer().Push(std::move(tracked_sub)); - } - - void EmitErrorSub() { push_source_.producer().Push(Status::Invalid("XYZ")); } - - void FinishSub(int sub_index) { - EXPECT_LT(sub_index, tracked_subs_.size()); - push_subs_[sub_index].producer().Close(); - } - - void FinishSubs() { push_source_.producer().Close(); } - - void AssertFinishedOk() { ASSERT_FINISHES_OK(finished_); } - - void AssertFailed() { ASSERT_FINISHES_AND_RAISES(Invalid, finished_); } - - int NumItemsAskedFor(int sub_index) { - EXPECT_LT(sub_index, tracked_subs_.size()); - return tracked_subs_[sub_index].num_read(); - } - - int NumSubsAskedFor() { return tracked_source_.num_read(); } - - void AssertRead(std::vector values) { - ASSERT_EQ(values.size(), sink_.size()); - for (std::size_t i = 0; i < sink_.size(); i++) { - ASSERT_EQ(values[i], sink_[i]); - } - } - - PushGenerator> push_source_; - std::vector> push_subs_; - std::vector> tracked_subs_; - util::TrackingGenerator> tracked_source_; - Future<> finished_; - std::vector sink_; -}; - -TEST_F(SeqMergedGeneratorTestFixture, Basic) { - ASSERT_OK_AND_ASSIGN( - AsyncGenerator gen, - MakeSequencedMergedGenerator( - static_cast>>(tracked_source_), 4)); - // Should not initially ask for anything - ASSERT_EQ(0, NumSubsAskedFor()); - BeginCaptureOutput(gen); - // Should not read ahead async-reentrantly from source - ASSERT_EQ(1, NumSubsAskedFor()); - EmitSub(); - ASSERT_EQ(2, NumSubsAskedFor()); - // Should immediately start polling - ASSERT_EQ(1, NumItemsAskedFor(0)); - EmitSub(); - EmitSub(); - EmitSub(); - EmitSub(); - // Should limit how many subs it reads ahead - ASSERT_EQ(4, NumSubsAskedFor()); - // Should immediately start polling subs even if they aren't yet active - ASSERT_EQ(1, NumItemsAskedFor(1)); - ASSERT_EQ(1, NumItemsAskedFor(2)); - ASSERT_EQ(1, NumItemsAskedFor(3)); - // Items emitted on non-active subs should not be delivered and should not trigger - // further polling on the inactive sub - EmitItem(1, 0); - ASSERT_EQ(1, NumItemsAskedFor(1)); - AssertRead({}); - EmitItem(0, 1); - AssertRead({1}); - ASSERT_EQ(2, NumItemsAskedFor(0)); - EmitItem(0, 2); - AssertRead({1, 2}); - ASSERT_EQ(3, NumItemsAskedFor(0)); - // On finish it should move to the next sub and pull 1 item - FinishSub(0); - ASSERT_EQ(5, NumSubsAskedFor()); - ASSERT_EQ(2, NumItemsAskedFor(1)); - AssertRead({1, 2, 0}); - // Now finish all the subs and make sure an empty sub is ok - FinishSub(1); - FinishSub(2); - FinishSub(3); - FinishSub(4); - ASSERT_EQ(6, NumSubsAskedFor()); - FinishSubs(); - AssertFinishedOk(); -} - -TEST_F(SeqMergedGeneratorTestFixture, ErrorItem) { - ASSERT_OK_AND_ASSIGN( - AsyncGenerator gen, - MakeSequencedMergedGenerator( - static_cast>>(tracked_source_), 4)); - BeginCaptureOutput(gen); - EmitSub(); - EmitSub(); - EmitErrorItem(1); - // It will still read from the active sub and won't notice the error until it switches - // to the failing sub - EmitItem(0, 0); - AssertRead({0}); - FinishSub(0); - AssertFailed(); - FinishSub(1); - FinishSubs(); -} - -TEST_F(SeqMergedGeneratorTestFixture, ErrorSub) { - ASSERT_OK_AND_ASSIGN( - AsyncGenerator gen, - MakeSequencedMergedGenerator( - static_cast>>(tracked_source_), 4)); - BeginCaptureOutput(gen); - EmitSub(); - EmitErrorSub(); - FinishSub(0); - AssertFailed(); -} - TEST(TestAsyncUtil, FromVector) { AsyncGenerator gen; { diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 0e52c0a3981..354ec477678 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -29,8 +29,6 @@ #include #include -#include - namespace compute = ::arrow::compute; std::shared_ptr make_compute_options(std::string func_name,