From 10afd61e32c54ec0f0768d4d859b7263fef8b618 Mon Sep 17 00:00:00 2001 From: Alexander Date: Wed, 22 Sep 2021 22:14:59 -0500 Subject: [PATCH 01/21] init wip wip w second attemp --- cpp/src/arrow/CMakeLists.txt | 1 + cpp/src/arrow/compute/exec/filter_node.cc | 45 +++++++++++-- cpp/src/arrow/dataset/scanner_test.cc | 4 +- cpp/src/arrow/util/task_scheduler.cc | 77 ++++++++++++++++++++++ cpp/src/arrow/util/task_scheduler.h | 80 +++++++++++++++++++++++ cpp/src/arrow/util/thread_pool.h | 5 +- 6 files changed, 202 insertions(+), 10 deletions(-) create mode 100644 cpp/src/arrow/util/task_scheduler.cc create mode 100644 cpp/src/arrow/util/task_scheduler.h diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index b82cccf24df..bdf863660ea 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -218,6 +218,7 @@ set(ARROW_SRCS util/task_group.cc util/tdigest.cc util/thread_pool.cc + util/task_scheduler.cc util/time.cc util/trie.cc util/unreachable.cc diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index 81fa9b17645..e5583d7d9ac 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -24,10 +24,11 @@ #include "arrow/compute/exec/util.h" #include "arrow/datum.h" #include "arrow/result.h" +#include "arrow/util/async_util.h" #include "arrow/util/checked_cast.h" #include "arrow/util/future.h" #include "arrow/util/logging.h" - +#include "arrow/util/thread_pool.h" namespace arrow { using internal::checked_cast; @@ -98,12 +99,32 @@ class FilterNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { DCHECK_EQ(input, inputs_[0]); + auto executor = plan()->exec_context()->executor(); + if (executor) { + auto maybe_future = executor->Submit([this, batch] { + auto maybe_filtered = DoFilter(std::move(batch)); + if (ErrorIfNotOk(maybe_filtered.status())) + Status::Invalid("Filter output not OK"); + + maybe_filtered->guarantee = batch.guarantee; + this->outputs_[0]->InputReceived(this, maybe_filtered.MoveValueUnsafe()); + return Status::OK(); + }); + if (!maybe_future.ok()) { + outputs_[0]->ErrorReceived(this, maybe_future.status()); + } - auto maybe_filtered = DoFilter(std::move(batch)); - if (ErrorIfNotOk(maybe_filtered.status())) return; + auto status = task_group_.AddTask(maybe_future.MoveValueUnsafe()); + if (!status.ok()) { + outputs_[0]->ErrorReceived(this, std::move(status)); + } + } else { + auto maybe_filtered = DoFilter(std::move(batch)); + if (ErrorIfNotOk(maybe_filtered.status())) return; - maybe_filtered->guarantee = batch.guarantee; - outputs_[0]->InputReceived(this, maybe_filtered.MoveValueUnsafe()); + maybe_filtered->guarantee = batch.guarantee; + outputs_[0]->InputReceived(this, maybe_filtered.MoveValueUnsafe()); + } } void ErrorReceived(ExecNode* input, Status error) override { @@ -114,6 +135,9 @@ class FilterNode : public ExecNode { void InputFinished(ExecNode* input, int total_batches) override { DCHECK_EQ(input, inputs_[0]); outputs_[0]->InputFinished(this, total_batches); + + task_group_.WaitForTasksToFinish().AddCallback( + [this](const Status& status) { this->finished_.MarkFinished(status); }); } Status StartProducing() override { return Status::OK(); } @@ -129,13 +153,22 @@ class FilterNode : public ExecNode { void StopProducing() override { inputs_[0]->StopProducing(this); } - Future<> finished() override { return inputs_[0]->finished(); } + Future<> finished() override { + auto executor = plan()->exec_context()->executor(); + if (executor) { + return finished_; + } + return inputs_[0]->finished(); + } protected: std::string ToStringExtra() const override { return "filter=" + filter_.ToString(); } private: Expression filter_; + + Future<> finished_ = Future<>::Make(); + util::AsyncTaskGroup task_group_; }; } // namespace diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 0c6c3277290..d0aaf6fb6bd 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -38,6 +38,7 @@ #include "arrow/testing/matchers.h" #include "arrow/testing/util.h" #include "arrow/util/range.h" +#include "arrow/util/task_scheduler.h" #include "arrow/util/vector.h" using testing::ElementsAre; @@ -1402,7 +1403,8 @@ TEST(ScanNode, MinimalEndToEnd) { // NB: This test is here for didactic purposes // Specify a MemoryPool and ThreadPool for the ExecPlan - compute::ExecContext exec_context(default_memory_pool(), GetCpuThreadPool()); + compute::ExecContext exec_context(default_memory_pool(), + ::arrow::internal::GetCpuTaskScheduler()); // ensure arrow::dataset node factories are in the registry arrow::dataset::internal::Initialize(); diff --git a/cpp/src/arrow/util/task_scheduler.cc b/cpp/src/arrow/util/task_scheduler.cc new file mode 100644 index 00000000000..dc340ed84e4 --- /dev/null +++ b/cpp/src/arrow/util/task_scheduler.cc @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/util/task_scheduler.h" +#include +#include +#include "arrow/util/thread_pool.h" + +namespace arrow { + +namespace internal { + +TaskScheduler::TaskScheduler() : Executor() {} + +TaskScheduler::~TaskScheduler() {} + +int TaskScheduler::GetCapacity() { return thread_pool_->GetCapacity(); } + +bool TaskScheduler::OwnsThisThread() { return thread_pool_->OwnsThisThread(); } + +Status TaskScheduler::SpawnReal(TaskHints hints, FnOnce task, + StopToken stop_token, StopCallback&& stop_callback) { + auto status = thread_pool_->SpawnReal(hints, std::forward>(task), + std::move(stop_token), std::move(stop_callback)); + return status; +} + +Result> TaskScheduler::Make(int threads) { + auto scheduler = std::shared_ptr(new TaskScheduler()); + auto maybe_pool = ThreadPool::Make(threads); + if (!maybe_pool.ok()) { + maybe_pool.status().Abort("Failed to create global CPU thread pool"); + } + scheduler->thread_pool_ = maybe_pool.ValueOrDie(); + + return scheduler; +} + +Result> TaskScheduler::MakeEternal(int threads) { + auto scheduler = std::shared_ptr(new TaskScheduler()); + auto maybe_pool = ThreadPool::MakeEternal(ThreadPool::DefaultCapacity()); + if (!maybe_pool.ok()) { + maybe_pool.status().Abort("Failed to create global CPU thread pool"); + } + scheduler->thread_pool_ = maybe_pool.ValueOrDie(); + return scheduler; +} + +std::shared_ptr TaskScheduler::MakeCpuTaskScheduler() { + auto maybe_scheduler = TaskScheduler::MakeEternal(ThreadPool::DefaultCapacity()); + if (!maybe_scheduler.ok()) { + maybe_scheduler.status().Abort("Failed to create global CPU thread pool"); + } + return *std::move(maybe_scheduler); +} + +TaskScheduler* GetCpuTaskScheduler() { + static std::shared_ptr singleton = TaskScheduler::MakeCpuTaskScheduler(); + return singleton.get(); +} + +} // namespace internal +} // namespace arrow diff --git a/cpp/src/arrow/util/task_scheduler.h b/cpp/src/arrow/util/task_scheduler.h new file mode 100644 index 00000000000..de964c4be77 --- /dev/null +++ b/cpp/src/arrow/util/task_scheduler.h @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#ifndef _WIN32 +#include +#endif + +#include +#include +#include +#include +#include +#include + +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/cancel.h" +#include "arrow/util/functional.h" +#include "arrow/util/future.h" +#include "arrow/util/macros.h" +#include "arrow/util/thread_pool.h" +#include "arrow/util/visibility.h" + +#if defined(_MSC_VER) +// Disable harmless warning for decorated name length limit +#pragma warning(disable : 4503) +#endif + +namespace arrow { +namespace internal { + +class ARROW_EXPORT TaskScheduler : public Executor { + public: + static Result> Make(int threads); + static Result> MakeEternal(int threads); + + ~TaskScheduler() override; + + int GetCapacity() override; + + bool OwnsThisThread() override; + + std::shared_ptr pool() { return thread_pool_; } + + protected: + friend ARROW_EXPORT TaskScheduler* GetCpuTaskScheduler(); + + TaskScheduler(); + + Status SpawnReal(TaskHints hints, FnOnce task, StopToken, + StopCallback&&) override; + + static std::shared_ptr MakeCpuTaskScheduler(); + + private: + std::shared_ptr thread_pool_; + // std::queue> task_queue_; + std::atomic active_tasks_counter_; +}; + +ARROW_EXPORT TaskScheduler* GetCpuTaskScheduler(); + +} // namespace internal +} // namespace arrow \ No newline at end of file diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index 9ac8e36a3d8..493fa3b6804 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -341,6 +341,8 @@ class ARROW_EXPORT ThreadPool : public Executor { // tasks are finished. Status Shutdown(bool wait = true); + Status SpawnReal(TaskHints hints, FnOnce task, StopToken, + StopCallback&&) override; struct State; protected: @@ -350,9 +352,6 @@ class ARROW_EXPORT ThreadPool : public Executor { ThreadPool(); - Status SpawnReal(TaskHints hints, FnOnce task, StopToken, - StopCallback&&) override; - // Collect finished worker threads, making sure the OS threads have exited void CollectFinishedWorkersUnlocked(); // Launch a given number of additional workers From 3494a48ae01fee51a117e10f9e3a5a13cfab55d0 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 23 Sep 2021 10:44:23 -0500 Subject: [PATCH 02/21] use a task_queue minor change test CI --- cpp/src/arrow/compute/exec/filter_node.cc | 41 ++++++++++++++++------- cpp/src/arrow/util/task_scheduler.h | 3 +- 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index e5583d7d9ac..9ba0e066daf 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -99,13 +99,14 @@ class FilterNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { DCHECK_EQ(input, inputs_[0]); + if (finished_.is_finished()) { + return; + } auto executor = plan()->exec_context()->executor(); if (executor) { auto maybe_future = executor->Submit([this, batch] { auto maybe_filtered = DoFilter(std::move(batch)); - if (ErrorIfNotOk(maybe_filtered.status())) - Status::Invalid("Filter output not OK"); - + if (ErrorIfNotOk(maybe_filtered.status())) return Status::OK(); maybe_filtered->guarantee = batch.guarantee; this->outputs_[0]->InputReceived(this, maybe_filtered.MoveValueUnsafe()); return Status::OK(); @@ -121,10 +122,13 @@ class FilterNode : public ExecNode { } else { auto maybe_filtered = DoFilter(std::move(batch)); if (ErrorIfNotOk(maybe_filtered.status())) return; - maybe_filtered->guarantee = batch.guarantee; outputs_[0]->InputReceived(this, maybe_filtered.MoveValueUnsafe()); } + if (batch_count_.Increment()) { + task_group_.WaitForTasksToFinish().AddCallback( + [this](const Status& status) { this->finished_.MarkFinished(status); }); + } } void ErrorReceived(ExecNode* input, Status error) override { @@ -134,13 +138,20 @@ class FilterNode : public ExecNode { void InputFinished(ExecNode* input, int total_batches) override { DCHECK_EQ(input, inputs_[0]); - outputs_[0]->InputFinished(this, total_batches); - task_group_.WaitForTasksToFinish().AddCallback( - [this](const Status& status) { this->finished_.MarkFinished(status); }); + total_batches_.fetch_add(total_batches); + + outputs_[0]->InputFinished(this, total_batches); + if (batch_count_.SetTotal(total_batches_.load())) { + task_group_.WaitForTasksToFinish().AddCallback( + [this](const Status& status) { this->finished_.MarkFinished(status); }); + } } - Status StartProducing() override { return Status::OK(); } + Status StartProducing() override { + finished_ = Future<>::Make(); + return Status::OK(); + } void PauseProducing(ExecNode* output) override {} @@ -151,7 +162,12 @@ class FilterNode : public ExecNode { StopProducing(); } - void StopProducing() override { inputs_[0]->StopProducing(this); } + void StopProducing() override { + if (batch_count_.Cancel()) { + finished_.MarkFinished(); + } + inputs_[0]->StopProducing(this); + } Future<> finished() override { auto executor = plan()->exec_context()->executor(); @@ -166,15 +182,14 @@ class FilterNode : public ExecNode { private: Expression filter_; - - Future<> finished_ = Future<>::Make(); + AtomicCounter batch_count_; + std::atomic total_batches_{0}; + Future<> finished_ = Future<>::MakeFinished(); util::AsyncTaskGroup task_group_; }; - } // namespace namespace internal { - void RegisterFilterNode(ExecFactoryRegistry* registry) { DCHECK_OK(registry->AddFactory("filter", FilterNode::Make)); } diff --git a/cpp/src/arrow/util/task_scheduler.h b/cpp/src/arrow/util/task_scheduler.h index de964c4be77..7c27522c27c 100644 --- a/cpp/src/arrow/util/task_scheduler.h +++ b/cpp/src/arrow/util/task_scheduler.h @@ -71,7 +71,8 @@ class ARROW_EXPORT TaskScheduler : public Executor { private: std::shared_ptr thread_pool_; // std::queue> task_queue_; - std::atomic active_tasks_counter_; + // std::atomic active_tasks_counter_; + // }; ARROW_EXPORT TaskScheduler* GetCpuTaskScheduler(); From 2c2ae119f02d78b284b32fd866521964fb45460e Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 23 Sep 2021 17:33:51 -0500 Subject: [PATCH 03/21] bug solved :D minor fix --- cpp/src/arrow/compute/exec/filter_node.cc | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index 9ba0e066daf..a4fe049b481 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -164,18 +164,13 @@ class FilterNode : public ExecNode { void StopProducing() override { if (batch_count_.Cancel()) { - finished_.MarkFinished(); + task_group_.WaitForTasksToFinish().AddCallback( + [this](const Status& status) { this->finished_.MarkFinished(status); }); } inputs_[0]->StopProducing(this); } - Future<> finished() override { - auto executor = plan()->exec_context()->executor(); - if (executor) { - return finished_; - } - return inputs_[0]->finished(); - } + Future<> finished() override { return finished_; } protected: std::string ToStringExtra() const override { return "filter=" + filter_.ToString(); } From ca48f026f7300a0a65378956a0c3c412a08e03a9 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 23 Sep 2021 17:34:03 -0500 Subject: [PATCH 04/21] refactor filter_node --- cpp/src/arrow/compute/exec/exec_plan.cc | 15 ++++++++ cpp/src/arrow/compute/exec/exec_plan.h | 13 +++++++ cpp/src/arrow/compute/exec/filter_node.cc | 45 ++++++----------------- 3 files changed, 39 insertions(+), 34 deletions(-) diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index 502f0d06693..3b7b93be7f8 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -32,6 +32,7 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/logging.h" #include "arrow/util/optional.h" +#include "arrow/util/thread_pool.h" namespace arrow { @@ -224,6 +225,7 @@ ExecNode::ExecNode(ExecPlan* plan, NodeVector inputs, for (auto input : inputs_) { input->outputs_.push_back(this); } + finished_ = Future<>::Make(); } Status ExecNode::Validate() const { @@ -287,6 +289,19 @@ bool ExecNode::ErrorIfNotOk(Status status) { return true; } +Status ExecNode::SubmitTask(std::function task) { + auto executor = plan()->exec_context()->executor(); + auto maybe_future = executor->Submit(std::move(task)); + if (!maybe_future.ok()) { + outputs_[0]->ErrorReceived(this, maybe_future.status()); + } + auto status = task_group_.AddTask(maybe_future.MoveValueUnsafe()); + if (!status.ok()) { + outputs_[0]->ErrorReceived(this, std::move(status)); + } + return Status::OK(); +} + std::shared_ptr MakeGeneratorReader( std::shared_ptr schema, std::function>()> gen, MemoryPool* pool) { diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index 73611bc992c..82f884e9a3e 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -23,8 +23,10 @@ #include #include "arrow/compute/exec.h" +#include "arrow/compute/exec/util.h" #include "arrow/compute/type_fwd.h" #include "arrow/type_fwd.h" +#include "arrow/util/async_util.h" #include "arrow/util/macros.h" #include "arrow/util/optional.h" #include "arrow/util/visibility.h" @@ -221,6 +223,10 @@ class ARROW_EXPORT ExecNode { std::string ToString() const; + bool has_executor() { return plan()->exec_context()->executor() != nullptr; } + + Status SubmitTask(std::function task); + protected: ExecNode(ExecPlan* plan, NodeVector inputs, std::vector input_labels, std::shared_ptr output_schema, int num_outputs); @@ -241,6 +247,13 @@ class ARROW_EXPORT ExecNode { std::shared_ptr output_schema_; int num_outputs_; NodeVector outputs_; + + // Counter of batches received + AtomicCounter batch_count_; + // Future to sync finished + Future<> finished_ = Future<>::MakeFinished(); + // Async task group used when executor is available + util::AsyncTaskGroup task_group_; }; /// \brief An extensible registry for factories of ExecNodes diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index a4fe049b481..cf97e2e3bad 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -21,14 +21,12 @@ #include "arrow/compute/exec.h" #include "arrow/compute/exec/expression.h" #include "arrow/compute/exec/options.h" -#include "arrow/compute/exec/util.h" #include "arrow/datum.h" #include "arrow/result.h" -#include "arrow/util/async_util.h" #include "arrow/util/checked_cast.h" #include "arrow/util/future.h" #include "arrow/util/logging.h" -#include "arrow/util/thread_pool.h" + namespace arrow { using internal::checked_cast; @@ -102,28 +100,17 @@ class FilterNode : public ExecNode { if (finished_.is_finished()) { return; } - auto executor = plan()->exec_context()->executor(); - if (executor) { - auto maybe_future = executor->Submit([this, batch] { - auto maybe_filtered = DoFilter(std::move(batch)); - if (ErrorIfNotOk(maybe_filtered.status())) return Status::OK(); - maybe_filtered->guarantee = batch.guarantee; - this->outputs_[0]->InputReceived(this, maybe_filtered.MoveValueUnsafe()); - return Status::OK(); - }); - if (!maybe_future.ok()) { - outputs_[0]->ErrorReceived(this, maybe_future.status()); - } - - auto status = task_group_.AddTask(maybe_future.MoveValueUnsafe()); - if (!status.ok()) { - outputs_[0]->ErrorReceived(this, std::move(status)); - } - } else { + auto task = [this, batch]() { auto maybe_filtered = DoFilter(std::move(batch)); - if (ErrorIfNotOk(maybe_filtered.status())) return; + if (ErrorIfNotOk(maybe_filtered.status())) return Status::OK(); maybe_filtered->guarantee = batch.guarantee; outputs_[0]->InputReceived(this, maybe_filtered.MoveValueUnsafe()); + return Status::OK(); + }; + if (this->has_executor()) { + DCHECK_OK(this->SubmitTask(task)); + } else { + DCHECK_OK(task()); } if (batch_count_.Increment()) { task_group_.WaitForTasksToFinish().AddCallback( @@ -138,20 +125,14 @@ class FilterNode : public ExecNode { void InputFinished(ExecNode* input, int total_batches) override { DCHECK_EQ(input, inputs_[0]); - - total_batches_.fetch_add(total_batches); - outputs_[0]->InputFinished(this, total_batches); - if (batch_count_.SetTotal(total_batches_.load())) { + if (batch_count_.SetTotal(total_batches)) { task_group_.WaitForTasksToFinish().AddCallback( [this](const Status& status) { this->finished_.MarkFinished(status); }); } } - Status StartProducing() override { - finished_ = Future<>::Make(); - return Status::OK(); - } + Status StartProducing() override { return Status::OK(); } void PauseProducing(ExecNode* output) override {} @@ -177,10 +158,6 @@ class FilterNode : public ExecNode { private: Expression filter_; - AtomicCounter batch_count_; - std::atomic total_batches_{0}; - Future<> finished_ = Future<>::MakeFinished(); - util::AsyncTaskGroup task_group_; }; } // namespace From 054a6de003aa47eb8a871cd87fc74c2d33d15a77 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 23 Sep 2021 17:35:29 -0500 Subject: [PATCH 05/21] make_task for project_node --- cpp/src/arrow/compute/exec/project_node.cc | 39 +++++++++++++++++----- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index 70908dc4924..26bda036051 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -91,12 +91,25 @@ class ProjectNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { DCHECK_EQ(input, inputs_[0]); - - auto maybe_projected = DoProject(std::move(batch)); - if (ErrorIfNotOk(maybe_projected.status())) return; - - maybe_projected->guarantee = batch.guarantee; - outputs_[0]->InputReceived(this, maybe_projected.MoveValueUnsafe()); + if (finished_.is_finished()) { + return; + } + auto task = [this, batch]() { + auto maybe_projected = DoProject(std::move(batch)); + if (ErrorIfNotOk(maybe_projected.status())) return Status::OK(); + maybe_projected->guarantee = batch.guarantee; + outputs_[0]->InputReceived(this, maybe_projected.MoveValueUnsafe()); + return Status::OK(); + }; + if (this->has_executor()) { + DCHECK_OK(this->SubmitTask(task)); + } else { + DCHECK_OK(task()); + } + if (batch_count_.Increment()) { + task_group_.WaitForTasksToFinish().AddCallback( + [this](const Status& status) { this->finished_.MarkFinished(status); }); + } } void ErrorReceived(ExecNode* input, Status error) override { @@ -107,6 +120,10 @@ class ProjectNode : public ExecNode { void InputFinished(ExecNode* input, int total_batches) override { DCHECK_EQ(input, inputs_[0]); outputs_[0]->InputFinished(this, total_batches); + if (batch_count_.SetTotal(total_batches)) { + task_group_.WaitForTasksToFinish().AddCallback( + [this](const Status& status) { this->finished_.MarkFinished(status); }); + } } Status StartProducing() override { return Status::OK(); } @@ -120,9 +137,15 @@ class ProjectNode : public ExecNode { StopProducing(); } - void StopProducing() override { inputs_[0]->StopProducing(this); } + void StopProducing() override { + if (batch_count_.Cancel()) { + task_group_.WaitForTasksToFinish().AddCallback( + [this](const Status& status) { this->finished_.MarkFinished(status); }); + } + inputs_[0]->StopProducing(this); + } - Future<> finished() override { return inputs_[0]->finished(); } + Future<> finished() override { return finished_; } protected: std::string ToStringExtra() const override { From 02041c956204d830ac4a0ef4644befa8efd7b814 Mon Sep 17 00:00:00 2001 From: Alexander Date: Tue, 28 Sep 2021 09:30:31 -0500 Subject: [PATCH 06/21] last update --- cpp/src/arrow/CMakeLists.txt | 1 - cpp/src/arrow/compute/exec/exec_plan.cc | 16 ++--- cpp/src/arrow/compute/exec/exec_plan.h | 13 +++- cpp/src/arrow/compute/exec/filter_node.cc | 30 ++++++-- cpp/src/arrow/compute/exec/project_node.cc | 33 ++++++--- cpp/src/arrow/dataset/scanner_test.cc | 3 +- cpp/src/arrow/util/task_scheduler.cc | 77 -------------------- cpp/src/arrow/util/task_scheduler.h | 81 ---------------------- 8 files changed, 66 insertions(+), 188 deletions(-) delete mode 100644 cpp/src/arrow/util/task_scheduler.cc delete mode 100644 cpp/src/arrow/util/task_scheduler.h diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index bdf863660ea..b82cccf24df 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -218,7 +218,6 @@ set(ARROW_SRCS util/task_group.cc util/tdigest.cc util/thread_pool.cc - util/task_scheduler.cc util/time.cc util/trie.cc util/unreachable.cc diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index 3b7b93be7f8..715629e1ec1 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -226,6 +226,11 @@ ExecNode::ExecNode(ExecPlan* plan, NodeVector inputs, input->outputs_.push_back(this); } finished_ = Future<>::Make(); + auto executor = plan->exec_context()->executor(); + if (executor) { + StopToken stop_token = stop_source_.token(); + task_group_ = TaskGroup::MakeThreaded(executor, stop_token); + } } Status ExecNode::Validate() const { @@ -290,15 +295,8 @@ bool ExecNode::ErrorIfNotOk(Status status) { } Status ExecNode::SubmitTask(std::function task) { - auto executor = plan()->exec_context()->executor(); - auto maybe_future = executor->Submit(std::move(task)); - if (!maybe_future.ok()) { - outputs_[0]->ErrorReceived(this, maybe_future.status()); - } - auto status = task_group_.AddTask(maybe_future.MoveValueUnsafe()); - if (!status.ok()) { - outputs_[0]->ErrorReceived(this, std::move(status)); - } + DCHECK(task_group_ != nullptr); + task_group_->Append(std::move(task)); return Status::OK(); } diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index 82f884e9a3e..3f3bc7cd675 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -27,11 +27,16 @@ #include "arrow/compute/type_fwd.h" #include "arrow/type_fwd.h" #include "arrow/util/async_util.h" +#include "arrow/util/cancel.h" #include "arrow/util/macros.h" #include "arrow/util/optional.h" +#include "arrow/util/task_group.h" #include "arrow/util/visibility.h" namespace arrow { + +using internal::TaskGroup; + namespace compute { class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this { @@ -248,12 +253,14 @@ class ARROW_EXPORT ExecNode { int num_outputs_; NodeVector outputs_; - // Counter of batches received + // Counter for the number of batches received AtomicCounter batch_count_; // Future to sync finished Future<> finished_ = Future<>::MakeFinished(); - // Async task group used when executor is available - util::AsyncTaskGroup task_group_; + // Variable used to stop producing from source + StopSource stop_source_; + // The task group for the corresponding batches + std::shared_ptr task_group_{nullptr}; }; /// \brief An extensible registry for factories of ExecNodes diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index cf97e2e3bad..a5b87dbad98 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -102,7 +102,7 @@ class FilterNode : public ExecNode { } auto task = [this, batch]() { auto maybe_filtered = DoFilter(std::move(batch)); - if (ErrorIfNotOk(maybe_filtered.status())) return Status::OK(); + if (ErrorIfNotOk(maybe_filtered.status())) return maybe_filtered.status(); maybe_filtered->guarantee = batch.guarantee; outputs_[0]->InputReceived(this, maybe_filtered.MoveValueUnsafe()); return Status::OK(); @@ -113,8 +113,13 @@ class FilterNode : public ExecNode { DCHECK_OK(task()); } if (batch_count_.Increment()) { - task_group_.WaitForTasksToFinish().AddCallback( - [this](const Status& status) { this->finished_.MarkFinished(status); }); + if (this->has_executor()) { + task_group_->FinishAsync().AddCallback([this](const Status& status) { + if (!this->finished_.is_finished()) this->finished_.MarkFinished(status); + }); + } else { + this->finished_.MarkFinished(); + } } } @@ -127,8 +132,13 @@ class FilterNode : public ExecNode { DCHECK_EQ(input, inputs_[0]); outputs_[0]->InputFinished(this, total_batches); if (batch_count_.SetTotal(total_batches)) { - task_group_.WaitForTasksToFinish().AddCallback( - [this](const Status& status) { this->finished_.MarkFinished(status); }); + if (this->has_executor()) { + task_group_->FinishAsync().AddCallback([this](const Status& status) { + if (!this->finished_.is_finished()) this->finished_.MarkFinished(status); + }); + } else { + this->finished_.MarkFinished(); + } } } @@ -145,8 +155,14 @@ class FilterNode : public ExecNode { void StopProducing() override { if (batch_count_.Cancel()) { - task_group_.WaitForTasksToFinish().AddCallback( - [this](const Status& status) { this->finished_.MarkFinished(status); }); + if (this->has_executor()) { + this->stop_source_.RequestStop(); + task_group_->FinishAsync().AddCallback([this](const Status& status) { + if (!this->finished_.is_finished()) this->finished_.MarkFinished(status); + }); + } else { + this->finished_.MarkFinished(); + } } inputs_[0]->StopProducing(this); } diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index 26bda036051..18145bde20d 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -96,19 +96,25 @@ class ProjectNode : public ExecNode { } auto task = [this, batch]() { auto maybe_projected = DoProject(std::move(batch)); - if (ErrorIfNotOk(maybe_projected.status())) return Status::OK(); + if (ErrorIfNotOk(maybe_projected.status())) return maybe_projected.status(); maybe_projected->guarantee = batch.guarantee; outputs_[0]->InputReceived(this, maybe_projected.MoveValueUnsafe()); return Status::OK(); }; + if (this->has_executor()) { DCHECK_OK(this->SubmitTask(task)); } else { DCHECK_OK(task()); } if (batch_count_.Increment()) { - task_group_.WaitForTasksToFinish().AddCallback( - [this](const Status& status) { this->finished_.MarkFinished(status); }); + if (this->has_executor()) { + task_group_->FinishAsync().AddCallback([this](const Status& status) { + if (!this->finished_.is_finished()) this->finished_.MarkFinished(status); + }); + } else { + this->finished_.MarkFinished(); + } } } @@ -119,11 +125,16 @@ class ProjectNode : public ExecNode { void InputFinished(ExecNode* input, int total_batches) override { DCHECK_EQ(input, inputs_[0]); - outputs_[0]->InputFinished(this, total_batches); if (batch_count_.SetTotal(total_batches)) { - task_group_.WaitForTasksToFinish().AddCallback( - [this](const Status& status) { this->finished_.MarkFinished(status); }); + if (this->has_executor()) { + task_group_->FinishAsync().AddCallback([this](const Status& status) { + if (!this->finished_.is_finished()) this->finished_.MarkFinished(status); + }); + } else { + this->finished_.MarkFinished(); + } } + outputs_[0]->InputFinished(this, total_batches); } Status StartProducing() override { return Status::OK(); } @@ -139,8 +150,14 @@ class ProjectNode : public ExecNode { void StopProducing() override { if (batch_count_.Cancel()) { - task_group_.WaitForTasksToFinish().AddCallback( - [this](const Status& status) { this->finished_.MarkFinished(status); }); + if (this->has_executor()) { + this->stop_source_.RequestStop(); + task_group_->FinishAsync().AddCallback([this](const Status& status) { + if (!this->finished_.is_finished()) this->finished_.MarkFinished(status); + }); + } else { + this->finished_.MarkFinished(); + } } inputs_[0]->StopProducing(this); } diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index d0aaf6fb6bd..6235cf2fd50 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -38,7 +38,6 @@ #include "arrow/testing/matchers.h" #include "arrow/testing/util.h" #include "arrow/util/range.h" -#include "arrow/util/task_scheduler.h" #include "arrow/util/vector.h" using testing::ElementsAre; @@ -1404,7 +1403,7 @@ TEST(ScanNode, MinimalEndToEnd) { // Specify a MemoryPool and ThreadPool for the ExecPlan compute::ExecContext exec_context(default_memory_pool(), - ::arrow::internal::GetCpuTaskScheduler()); + ::arrow::internal::GetCpuThreadPool()); // ensure arrow::dataset node factories are in the registry arrow::dataset::internal::Initialize(); diff --git a/cpp/src/arrow/util/task_scheduler.cc b/cpp/src/arrow/util/task_scheduler.cc deleted file mode 100644 index dc340ed84e4..00000000000 --- a/cpp/src/arrow/util/task_scheduler.cc +++ /dev/null @@ -1,77 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "arrow/util/task_scheduler.h" -#include -#include -#include "arrow/util/thread_pool.h" - -namespace arrow { - -namespace internal { - -TaskScheduler::TaskScheduler() : Executor() {} - -TaskScheduler::~TaskScheduler() {} - -int TaskScheduler::GetCapacity() { return thread_pool_->GetCapacity(); } - -bool TaskScheduler::OwnsThisThread() { return thread_pool_->OwnsThisThread(); } - -Status TaskScheduler::SpawnReal(TaskHints hints, FnOnce task, - StopToken stop_token, StopCallback&& stop_callback) { - auto status = thread_pool_->SpawnReal(hints, std::forward>(task), - std::move(stop_token), std::move(stop_callback)); - return status; -} - -Result> TaskScheduler::Make(int threads) { - auto scheduler = std::shared_ptr(new TaskScheduler()); - auto maybe_pool = ThreadPool::Make(threads); - if (!maybe_pool.ok()) { - maybe_pool.status().Abort("Failed to create global CPU thread pool"); - } - scheduler->thread_pool_ = maybe_pool.ValueOrDie(); - - return scheduler; -} - -Result> TaskScheduler::MakeEternal(int threads) { - auto scheduler = std::shared_ptr(new TaskScheduler()); - auto maybe_pool = ThreadPool::MakeEternal(ThreadPool::DefaultCapacity()); - if (!maybe_pool.ok()) { - maybe_pool.status().Abort("Failed to create global CPU thread pool"); - } - scheduler->thread_pool_ = maybe_pool.ValueOrDie(); - return scheduler; -} - -std::shared_ptr TaskScheduler::MakeCpuTaskScheduler() { - auto maybe_scheduler = TaskScheduler::MakeEternal(ThreadPool::DefaultCapacity()); - if (!maybe_scheduler.ok()) { - maybe_scheduler.status().Abort("Failed to create global CPU thread pool"); - } - return *std::move(maybe_scheduler); -} - -TaskScheduler* GetCpuTaskScheduler() { - static std::shared_ptr singleton = TaskScheduler::MakeCpuTaskScheduler(); - return singleton.get(); -} - -} // namespace internal -} // namespace arrow diff --git a/cpp/src/arrow/util/task_scheduler.h b/cpp/src/arrow/util/task_scheduler.h deleted file mode 100644 index 7c27522c27c..00000000000 --- a/cpp/src/arrow/util/task_scheduler.h +++ /dev/null @@ -1,81 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#ifndef _WIN32 -#include -#endif - -#include -#include -#include -#include -#include -#include - -#include "arrow/result.h" -#include "arrow/status.h" -#include "arrow/util/cancel.h" -#include "arrow/util/functional.h" -#include "arrow/util/future.h" -#include "arrow/util/macros.h" -#include "arrow/util/thread_pool.h" -#include "arrow/util/visibility.h" - -#if defined(_MSC_VER) -// Disable harmless warning for decorated name length limit -#pragma warning(disable : 4503) -#endif - -namespace arrow { -namespace internal { - -class ARROW_EXPORT TaskScheduler : public Executor { - public: - static Result> Make(int threads); - static Result> MakeEternal(int threads); - - ~TaskScheduler() override; - - int GetCapacity() override; - - bool OwnsThisThread() override; - - std::shared_ptr pool() { return thread_pool_; } - - protected: - friend ARROW_EXPORT TaskScheduler* GetCpuTaskScheduler(); - - TaskScheduler(); - - Status SpawnReal(TaskHints hints, FnOnce task, StopToken, - StopCallback&&) override; - - static std::shared_ptr MakeCpuTaskScheduler(); - - private: - std::shared_ptr thread_pool_; - // std::queue> task_queue_; - // std::atomic active_tasks_counter_; - // -}; - -ARROW_EXPORT TaskScheduler* GetCpuTaskScheduler(); - -} // namespace internal -} // namespace arrow \ No newline at end of file From b1633cdfed095fc9002d4a1dee32476f8cb1e768 Mon Sep 17 00:00:00 2001 From: Alexander Date: Tue, 28 Sep 2021 09:50:35 -0500 Subject: [PATCH 07/21] minor changes --- cpp/src/arrow/compute/exec/exec_plan.cc | 27 ++++++++++++++-- cpp/src/arrow/compute/exec/exec_plan.h | 10 +++++- cpp/src/arrow/compute/exec/filter_node.cc | 35 ++------------------- cpp/src/arrow/compute/exec/project_node.cc | 36 ++-------------------- 4 files changed, 40 insertions(+), 68 deletions(-) diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index 715629e1ec1..d12d65f4582 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -295,11 +295,34 @@ bool ExecNode::ErrorIfNotOk(Status status) { } Status ExecNode::SubmitTask(std::function task) { - DCHECK(task_group_ != nullptr); - task_group_->Append(std::move(task)); + if (finished_.is_finished()) { + return Status::OK(); + } + if (this->has_executor()) { + DCHECK(task_group_ != nullptr); + task_group_->Append(std::move(task)); + } else { + std::move(task)(); + } + if (batch_count_.Increment()) { + this->MarkFinished(); + } return Status::OK(); } +void ExecNode::MarkFinished(bool request_stop) { + if (this->has_executor()) { + if (request_stop) { + this->stop_source_.RequestStop(); + } + task_group_->FinishAsync().AddCallback([this](const Status& status) { + if (!this->finished_.is_finished()) this->finished_.MarkFinished(status); + }); + } else { + this->finished_.MarkFinished(); + } +} + std::shared_ptr MakeGeneratorReader( std::shared_ptr schema, std::function>()> gen, MemoryPool* pool) { diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index 3f3bc7cd675..65f5dc982a3 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -228,10 +228,18 @@ class ARROW_EXPORT ExecNode { std::string ToString() const; + /// \brief Is an executor available? bool has_executor() { return plan()->exec_context()->executor() != nullptr; } + /// \brief submit a task, independently if an executor is available or not Status SubmitTask(std::function task); + /// \brief Mark Future<> `finished_` as Finished, independently if an executor is + /// available or not + /// + /// A boolean var `request_stop` can be send to cancel remaining tasks in the executor. + void MarkFinished(bool request_stop = false); + protected: ExecNode(ExecPlan* plan, NodeVector inputs, std::vector input_labels, std::shared_ptr output_schema, int num_outputs); @@ -257,7 +265,7 @@ class ARROW_EXPORT ExecNode { AtomicCounter batch_count_; // Future to sync finished Future<> finished_ = Future<>::MakeFinished(); - // Variable used to stop producing from source + // Variable used to cancel remaining tasks in the executor StopSource stop_source_; // The task group for the corresponding batches std::shared_ptr task_group_{nullptr}; diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index a5b87dbad98..09eac8a7f54 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -97,9 +97,6 @@ class FilterNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { DCHECK_EQ(input, inputs_[0]); - if (finished_.is_finished()) { - return; - } auto task = [this, batch]() { auto maybe_filtered = DoFilter(std::move(batch)); if (ErrorIfNotOk(maybe_filtered.status())) return maybe_filtered.status(); @@ -107,20 +104,7 @@ class FilterNode : public ExecNode { outputs_[0]->InputReceived(this, maybe_filtered.MoveValueUnsafe()); return Status::OK(); }; - if (this->has_executor()) { - DCHECK_OK(this->SubmitTask(task)); - } else { - DCHECK_OK(task()); - } - if (batch_count_.Increment()) { - if (this->has_executor()) { - task_group_->FinishAsync().AddCallback([this](const Status& status) { - if (!this->finished_.is_finished()) this->finished_.MarkFinished(status); - }); - } else { - this->finished_.MarkFinished(); - } - } + DCHECK_OK(this->SubmitTask(task)); } void ErrorReceived(ExecNode* input, Status error) override { @@ -132,13 +116,7 @@ class FilterNode : public ExecNode { DCHECK_EQ(input, inputs_[0]); outputs_[0]->InputFinished(this, total_batches); if (batch_count_.SetTotal(total_batches)) { - if (this->has_executor()) { - task_group_->FinishAsync().AddCallback([this](const Status& status) { - if (!this->finished_.is_finished()) this->finished_.MarkFinished(status); - }); - } else { - this->finished_.MarkFinished(); - } + this->MarkFinished(); } } @@ -155,14 +133,7 @@ class FilterNode : public ExecNode { void StopProducing() override { if (batch_count_.Cancel()) { - if (this->has_executor()) { - this->stop_source_.RequestStop(); - task_group_->FinishAsync().AddCallback([this](const Status& status) { - if (!this->finished_.is_finished()) this->finished_.MarkFinished(status); - }); - } else { - this->finished_.MarkFinished(); - } + this->MarkFinished(/*request_stop=*/true); } inputs_[0]->StopProducing(this); } diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index 18145bde20d..751a57e85f2 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -91,9 +91,6 @@ class ProjectNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { DCHECK_EQ(input, inputs_[0]); - if (finished_.is_finished()) { - return; - } auto task = [this, batch]() { auto maybe_projected = DoProject(std::move(batch)); if (ErrorIfNotOk(maybe_projected.status())) return maybe_projected.status(); @@ -101,21 +98,7 @@ class ProjectNode : public ExecNode { outputs_[0]->InputReceived(this, maybe_projected.MoveValueUnsafe()); return Status::OK(); }; - - if (this->has_executor()) { - DCHECK_OK(this->SubmitTask(task)); - } else { - DCHECK_OK(task()); - } - if (batch_count_.Increment()) { - if (this->has_executor()) { - task_group_->FinishAsync().AddCallback([this](const Status& status) { - if (!this->finished_.is_finished()) this->finished_.MarkFinished(status); - }); - } else { - this->finished_.MarkFinished(); - } - } + DCHECK_OK(this->SubmitTask(task)); } void ErrorReceived(ExecNode* input, Status error) override { @@ -126,13 +109,7 @@ class ProjectNode : public ExecNode { void InputFinished(ExecNode* input, int total_batches) override { DCHECK_EQ(input, inputs_[0]); if (batch_count_.SetTotal(total_batches)) { - if (this->has_executor()) { - task_group_->FinishAsync().AddCallback([this](const Status& status) { - if (!this->finished_.is_finished()) this->finished_.MarkFinished(status); - }); - } else { - this->finished_.MarkFinished(); - } + this->MarkFinished(); } outputs_[0]->InputFinished(this, total_batches); } @@ -150,14 +127,7 @@ class ProjectNode : public ExecNode { void StopProducing() override { if (batch_count_.Cancel()) { - if (this->has_executor()) { - this->stop_source_.RequestStop(); - task_group_->FinishAsync().AddCallback([this](const Status& status) { - if (!this->finished_.is_finished()) this->finished_.MarkFinished(status); - }); - } else { - this->finished_.MarkFinished(); - } + this->MarkFinished(/*request_stop=*/true); } inputs_[0]->StopProducing(this); } From 30bb0d190c9872c3f39d1b065affa00400123a70 Mon Sep 17 00:00:00 2001 From: Alexander Date: Tue, 28 Sep 2021 15:51:43 -0500 Subject: [PATCH 08/21] addressing feedback minor changes --- cpp/src/arrow/compute/exec/exec_plan.cc | 2 +- cpp/src/arrow/compute/exec/exec_plan.h | 26 ++++++++++++------------- cpp/src/arrow/util/thread_pool.h | 5 +++-- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index d12d65f4582..db80d32bce9 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -302,7 +302,7 @@ Status ExecNode::SubmitTask(std::function task) { DCHECK(task_group_ != nullptr); task_group_->Append(std::move(task)); } else { - std::move(task)(); + return task(); } if (batch_count_.Increment()) { this->MarkFinished(); diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index 65f5dc982a3..6ea4d21f36f 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -228,18 +228,6 @@ class ARROW_EXPORT ExecNode { std::string ToString() const; - /// \brief Is an executor available? - bool has_executor() { return plan()->exec_context()->executor() != nullptr; } - - /// \brief submit a task, independently if an executor is available or not - Status SubmitTask(std::function task); - - /// \brief Mark Future<> `finished_` as Finished, independently if an executor is - /// available or not - /// - /// A boolean var `request_stop` can be send to cancel remaining tasks in the executor. - void MarkFinished(bool request_stop = false); - protected: ExecNode(ExecPlan* plan, NodeVector inputs, std::vector input_labels, std::shared_ptr output_schema, int num_outputs); @@ -251,6 +239,18 @@ class ARROW_EXPORT ExecNode { /// Provide extra info to include in the string representation. virtual std::string ToStringExtra() const; + /// \brief Is an executor available? + bool has_executor() const { return plan_->exec_context()->executor() != nullptr; } + + /// \brief submit a task, independently if an executor is available or not + Status SubmitTask(std::function task); + + /// \brief Mark Future<> `finished_` as Finished, independently if an executor is + /// available or not + /// + /// A boolean var `request_stop` can be send to cancel remaining tasks in the executor. + void MarkFinished(bool request_stop = false); + ExecPlan* plan_; std::string label_; @@ -264,7 +264,7 @@ class ARROW_EXPORT ExecNode { // Counter for the number of batches received AtomicCounter batch_count_; // Future to sync finished - Future<> finished_ = Future<>::MakeFinished(); + Future<> finished_; // Variable used to cancel remaining tasks in the executor StopSource stop_source_; // The task group for the corresponding batches diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index 493fa3b6804..9ac8e36a3d8 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -341,8 +341,6 @@ class ARROW_EXPORT ThreadPool : public Executor { // tasks are finished. Status Shutdown(bool wait = true); - Status SpawnReal(TaskHints hints, FnOnce task, StopToken, - StopCallback&&) override; struct State; protected: @@ -352,6 +350,9 @@ class ARROW_EXPORT ThreadPool : public Executor { ThreadPool(); + Status SpawnReal(TaskHints hints, FnOnce task, StopToken, + StopCallback&&) override; + // Collect finished worker threads, making sure the OS threads have exited void CollectFinishedWorkersUnlocked(); // Launch a given number of additional workers From 0867cede843ff020c3cf555cfa35ca55ab36c9b6 Mon Sep 17 00:00:00 2001 From: Alexander Date: Fri, 1 Oct 2021 09:59:23 -0500 Subject: [PATCH 09/21] ready to final review --- cpp/src/arrow/CMakeLists.txt | 1 + .../compute/exec/exec_node_runner_impl.cc | 146 ++++++++++++++++++ .../compute/exec/exec_node_runner_impl.h | 63 ++++++++ cpp/src/arrow/compute/exec/exec_plan.cc | 36 ----- cpp/src/arrow/compute/exec/exec_plan.h | 24 --- cpp/src/arrow/compute/exec/filter_node.cc | 43 ++++-- cpp/src/arrow/compute/exec/project_node.cc | 40 +++-- 7 files changed, 267 insertions(+), 86 deletions(-) create mode 100644 cpp/src/arrow/compute/exec/exec_node_runner_impl.cc create mode 100644 cpp/src/arrow/compute/exec/exec_node_runner_impl.h diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index b82cccf24df..292548afcfc 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -375,6 +375,7 @@ if(ARROW_COMPUTE) compute/exec/aggregate_node.cc compute/exec/exec_plan.cc compute/exec/expression.cc + compute/exec/exec_node_runner_impl.cc compute/exec/filter_node.cc compute/exec/project_node.cc compute/exec/source_node.cc diff --git a/cpp/src/arrow/compute/exec/exec_node_runner_impl.cc b/cpp/src/arrow/compute/exec/exec_node_runner_impl.cc new file mode 100644 index 00000000000..a16f7b36d60 --- /dev/null +++ b/cpp/src/arrow/compute/exec/exec_node_runner_impl.cc @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "arrow/compute/exec/exec_node_runner_impl.h" + +#include +#include +#include + +#include "arrow/compute/exec/options.h" +#include "arrow/compute/exec/util.h" +#include "arrow/record_batch.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/util/async_util.h" +#include "arrow/util/cancel.h" +#include "arrow/util/thread_pool.h" + +namespace arrow { +namespace compute { + +class SimpleSyncRunnerImpl : public ExecNodeRunnerImpl { + public: + SimpleSyncRunnerImpl(ExecContext* ctx) : ctx_(ctx) {} + + Status SubmitTask(std::function task) override { + if (finished_.is_finished()) { + return Status::OK(); + } + auto status = task(); + if (!status.ok()) { + return status; + } + if (input_counter_.Increment()) { + this->MarkFinished(); + } + return Status::OK(); + } + + void MarkFinished(Status status = Status::OK()) override { + this->finished_.MarkFinished(status); + } + + void InputFinished(int total_batches) override { + if (input_counter_.SetTotal(total_batches)) { + this->MarkFinished(); + } + } + + void StopProducing() override { + if (input_counter_.Cancel()) { + this->MarkFinished(); + } + } + + protected: + // Executor context + ExecContext* ctx_; + + // Counter for the number of batches received + AtomicCounter input_counter_; +}; + +class SimpleParallelRunner : public SimpleSyncRunnerImpl { + public: + SimpleParallelRunner(ExecContext* ctx) : SimpleSyncRunnerImpl(ctx) { + executor_ = ctx->executor(); + } + + Status SubmitTask(std::function task) override { + if (finished_.is_finished()) { + return Status::OK(); + } + task_group_.AddTask([this, task]() -> Result> { + return this->executor_->Submit(this->stop_source_.token(), + [task]() { return task(); }); + }); + if (input_counter_.Increment()) { + this->MarkFinished(); + } + return Status::OK(); + } + + void MarkFinished(Status status = Status::OK()) override { + if (!status.ok()) { + StopProducing(); + bool cancelled = this->Cancel(); + DCHECK(cancelled); + this->MarkFinished(status); + return; + } + task_group_.End().AddCallback([this](const Status& status) { + if (!status.ok()) { + this->finished_.MarkFinished(); + } else if (!this->finished_.is_finished()) { + this->finished_.MarkFinished(status); + } + }); + } + + void StopProducing() override { + this->stop_source_.RequestStop(); + if (input_counter_.Cancel()) { + this->MarkFinished(); + } + } + + protected: + // The task group for the corresponding batches + util::AsyncTaskGroup task_group_; + + // Executor + ::arrow::internal::Executor* executor_; + + // Variable used to cancel remaining tasks in the executor + StopSource stop_source_; +}; + +Result> ExecNodeRunnerImpl::MakeSimpleSyncRunner( + ExecContext* ctx) { + std::unique_ptr impl{new SimpleSyncRunnerImpl(ctx)}; + return std::move(impl); +} + +Result> ExecNodeRunnerImpl::MakeSimpleParallelRunner( + ExecContext* ctx) { + std::unique_ptr impl{new SimpleParallelRunner(ctx)}; + return std::move(impl); +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/exec_node_runner_impl.h b/cpp/src/arrow/compute/exec/exec_node_runner_impl.h new file mode 100644 index 00000000000..f8b4e04ffe6 --- /dev/null +++ b/cpp/src/arrow/compute/exec/exec_node_runner_impl.h @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "arrow/compute/exec/util.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/util/future.h" + +namespace arrow { +namespace compute { + +class ExecNodeRunnerImpl { + public: + bool Cancel() { return input_counter_.Cancel(); } + + Future<> finished() { return finished_; } + + virtual ~ExecNodeRunnerImpl() = default; + + virtual Status SubmitTask(std::function task) = 0; + + virtual void MarkFinished(Status s = Status::OK()) = 0; + + virtual void InputFinished(int total_batches) = 0; + + virtual void StopProducing() = 0; + + static Result> MakeSimpleSyncRunner( + ExecContext* ctx); + + static Result> MakeSimpleParallelRunner( + ExecContext* ctx); + + protected: + // Counter for the number of batches received + AtomicCounter input_counter_; + // Future to sync finished + Future<> finished_ = Future<>::Make(); +}; + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index db80d32bce9..502f0d06693 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -32,7 +32,6 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/logging.h" #include "arrow/util/optional.h" -#include "arrow/util/thread_pool.h" namespace arrow { @@ -225,12 +224,6 @@ ExecNode::ExecNode(ExecPlan* plan, NodeVector inputs, for (auto input : inputs_) { input->outputs_.push_back(this); } - finished_ = Future<>::Make(); - auto executor = plan->exec_context()->executor(); - if (executor) { - StopToken stop_token = stop_source_.token(); - task_group_ = TaskGroup::MakeThreaded(executor, stop_token); - } } Status ExecNode::Validate() const { @@ -294,35 +287,6 @@ bool ExecNode::ErrorIfNotOk(Status status) { return true; } -Status ExecNode::SubmitTask(std::function task) { - if (finished_.is_finished()) { - return Status::OK(); - } - if (this->has_executor()) { - DCHECK(task_group_ != nullptr); - task_group_->Append(std::move(task)); - } else { - return task(); - } - if (batch_count_.Increment()) { - this->MarkFinished(); - } - return Status::OK(); -} - -void ExecNode::MarkFinished(bool request_stop) { - if (this->has_executor()) { - if (request_stop) { - this->stop_source_.RequestStop(); - } - task_group_->FinishAsync().AddCallback([this](const Status& status) { - if (!this->finished_.is_finished()) this->finished_.MarkFinished(status); - }); - } else { - this->finished_.MarkFinished(); - } -} - std::shared_ptr MakeGeneratorReader( std::shared_ptr schema, std::function>()> gen, MemoryPool* pool) { diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index 6ea4d21f36f..e97af4d34cf 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -23,20 +23,14 @@ #include #include "arrow/compute/exec.h" -#include "arrow/compute/exec/util.h" #include "arrow/compute/type_fwd.h" #include "arrow/type_fwd.h" -#include "arrow/util/async_util.h" -#include "arrow/util/cancel.h" #include "arrow/util/macros.h" #include "arrow/util/optional.h" -#include "arrow/util/task_group.h" #include "arrow/util/visibility.h" namespace arrow { -using internal::TaskGroup; - namespace compute { class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this { @@ -242,15 +236,6 @@ class ARROW_EXPORT ExecNode { /// \brief Is an executor available? bool has_executor() const { return plan_->exec_context()->executor() != nullptr; } - /// \brief submit a task, independently if an executor is available or not - Status SubmitTask(std::function task); - - /// \brief Mark Future<> `finished_` as Finished, independently if an executor is - /// available or not - /// - /// A boolean var `request_stop` can be send to cancel remaining tasks in the executor. - void MarkFinished(bool request_stop = false); - ExecPlan* plan_; std::string label_; @@ -260,15 +245,6 @@ class ARROW_EXPORT ExecNode { std::shared_ptr output_schema_; int num_outputs_; NodeVector outputs_; - - // Counter for the number of batches received - AtomicCounter batch_count_; - // Future to sync finished - Future<> finished_; - // Variable used to cancel remaining tasks in the executor - StopSource stop_source_; - // The task group for the corresponding batches - std::shared_ptr task_group_{nullptr}; }; /// \brief An extensible registry for factories of ExecNodes diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index 09eac8a7f54..97d949afe03 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -26,7 +26,7 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/future.h" #include "arrow/util/logging.h" - +#include "exec_node_runner_impl.h" namespace arrow { using internal::checked_cast; @@ -37,11 +37,13 @@ namespace { class FilterNode : public ExecNode { public: FilterNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, Expression filter) + std::shared_ptr output_schema, Expression filter, + std::unique_ptr impl) : ExecNode(plan, std::move(inputs), /*input_labels=*/{"target"}, std::move(output_schema), /*num_outputs=*/1), - filter_(std::move(filter)) {} + filter_(std::move(filter)), + impl_(std::move(impl)) {} static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { @@ -60,9 +62,16 @@ class FilterNode : public ExecNode { filter_expression.ToString(), " evaluates to ", filter_expression.type()->ToString()); } - + std::unique_ptr impl; + if (plan->exec_context()->executor() == nullptr) { + ARROW_ASSIGN_OR_RAISE( + impl, ExecNodeRunnerImpl::MakeSimpleSyncRunner(plan->exec_context())); + } else { + ARROW_ASSIGN_OR_RAISE( + impl, ExecNodeRunnerImpl::MakeSimpleParallelRunner(plan->exec_context())); + } return plan->EmplaceNode(plan, std::move(inputs), std::move(schema), - std::move(filter_expression)); + std::move(filter_expression), std::move(impl)); } const char* kind_name() const override { return "FilterNode"; } @@ -99,12 +108,21 @@ class FilterNode : public ExecNode { DCHECK_EQ(input, inputs_[0]); auto task = [this, batch]() { auto maybe_filtered = DoFilter(std::move(batch)); - if (ErrorIfNotOk(maybe_filtered.status())) return maybe_filtered.status(); + if (ErrorIfNotOk(maybe_filtered.status())) { + return maybe_filtered.status(); + } maybe_filtered->guarantee = batch.guarantee; outputs_[0]->InputReceived(this, maybe_filtered.MoveValueUnsafe()); return Status::OK(); }; - DCHECK_OK(this->SubmitTask(task)); + auto status = impl_->SubmitTask(task); + if (!status.ok()) { + StopProducing(); + bool cancelled = impl_->Cancel(); + DCHECK(cancelled); + impl_->MarkFinished(status); + return; + } } void ErrorReceived(ExecNode* input, Status error) override { @@ -115,9 +133,7 @@ class FilterNode : public ExecNode { void InputFinished(ExecNode* input, int total_batches) override { DCHECK_EQ(input, inputs_[0]); outputs_[0]->InputFinished(this, total_batches); - if (batch_count_.SetTotal(total_batches)) { - this->MarkFinished(); - } + impl_->InputFinished(total_batches); } Status StartProducing() override { return Status::OK(); } @@ -132,19 +148,18 @@ class FilterNode : public ExecNode { } void StopProducing() override { - if (batch_count_.Cancel()) { - this->MarkFinished(/*request_stop=*/true); - } + impl_->StopProducing(); inputs_[0]->StopProducing(this); } - Future<> finished() override { return finished_; } + Future<> finished() override { return impl_->finished(); } protected: std::string ToStringExtra() const override { return "filter=" + filter_.ToString(); } private: Expression filter_; + std::unique_ptr impl_; }; } // namespace diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index 751a57e85f2..2a03b28bbe9 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -29,6 +29,7 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/future.h" #include "arrow/util/logging.h" +#include "exec_node_runner_impl.h" namespace arrow { @@ -40,11 +41,13 @@ namespace { class ProjectNode : public ExecNode { public: ProjectNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, std::vector exprs) + std::shared_ptr output_schema, std::vector exprs, + std::unique_ptr impl) : ExecNode(plan, std::move(inputs), /*input_labels=*/{"target"}, std::move(output_schema), /*num_outputs=*/1), - exprs_(std::move(exprs)) {} + exprs_(std::move(exprs)), + impl_(std::move(impl)) {} static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { @@ -70,9 +73,17 @@ class ProjectNode : public ExecNode { fields[i] = field(std::move(names[i]), expr.type()); ++i; } - + std::unique_ptr impl; + if (plan->exec_context()->executor() == nullptr) { + ARROW_ASSIGN_OR_RAISE( + impl, ExecNodeRunnerImpl::MakeSimpleSyncRunner(plan->exec_context())); + } else { + ARROW_ASSIGN_OR_RAISE( + impl, ExecNodeRunnerImpl::MakeSimpleParallelRunner(plan->exec_context())); + } return plan->EmplaceNode(plan, std::move(inputs), - schema(std::move(fields)), std::move(exprs)); + schema(std::move(fields)), std::move(exprs), + std::move(impl)); } const char* kind_name() const override { return "ProjectNode"; } @@ -94,11 +105,19 @@ class ProjectNode : public ExecNode { auto task = [this, batch]() { auto maybe_projected = DoProject(std::move(batch)); if (ErrorIfNotOk(maybe_projected.status())) return maybe_projected.status(); + maybe_projected->guarantee = batch.guarantee; outputs_[0]->InputReceived(this, maybe_projected.MoveValueUnsafe()); return Status::OK(); }; - DCHECK_OK(this->SubmitTask(task)); + auto status = impl_->SubmitTask(task); + if (!status.ok()) { + StopProducing(); + bool cancelled = impl_->Cancel(); + DCHECK(cancelled); + impl_->MarkFinished(status); + return; + } } void ErrorReceived(ExecNode* input, Status error) override { @@ -108,10 +127,8 @@ class ProjectNode : public ExecNode { void InputFinished(ExecNode* input, int total_batches) override { DCHECK_EQ(input, inputs_[0]); - if (batch_count_.SetTotal(total_batches)) { - this->MarkFinished(); - } outputs_[0]->InputFinished(this, total_batches); + impl_->InputFinished(total_batches); } Status StartProducing() override { return Status::OK(); } @@ -126,13 +143,11 @@ class ProjectNode : public ExecNode { } void StopProducing() override { - if (batch_count_.Cancel()) { - this->MarkFinished(/*request_stop=*/true); - } + impl_->StopProducing(); inputs_[0]->StopProducing(this); } - Future<> finished() override { return finished_; } + Future<> finished() override { return impl_->finished(); } protected: std::string ToStringExtra() const override { @@ -152,6 +167,7 @@ class ProjectNode : public ExecNode { private: std::vector exprs_; + std::unique_ptr impl_; }; } // namespace From 93595ad35f80de200ee4a1573444db0c97e3b89a Mon Sep 17 00:00:00 2001 From: Alexander Date: Fri, 1 Oct 2021 16:30:19 -0500 Subject: [PATCH 10/21] update > addr > CI --- cpp/src/arrow/compute/exec/exec_node_runner_impl.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec/exec_node_runner_impl.cc b/cpp/src/arrow/compute/exec/exec_node_runner_impl.cc index a16f7b36d60..395ba6a8300 100644 --- a/cpp/src/arrow/compute/exec/exec_node_runner_impl.cc +++ b/cpp/src/arrow/compute/exec/exec_node_runner_impl.cc @@ -85,10 +85,13 @@ class SimpleParallelRunner : public SimpleSyncRunnerImpl { if (finished_.is_finished()) { return Status::OK(); } - task_group_.AddTask([this, task]() -> Result> { + auto status = task_group_.AddTask([this, task]() -> Result> { return this->executor_->Submit(this->stop_source_.token(), [task]() { return task(); }); }); + if (!status.ok()) { + return status; + } if (input_counter_.Increment()) { this->MarkFinished(); } From 173a373649d7ae5dd4f9b5675a7fc243029a6d62 Mon Sep 17 00:00:00 2001 From: Alexander Date: Sat, 2 Oct 2021 16:59:04 -0500 Subject: [PATCH 11/21] last fixes --- cpp/src/arrow/compute/exec/exec_node_runner_impl.cc | 12 ++++++------ cpp/src/arrow/compute/exec/filter_node.cc | 6 +++--- cpp/src/arrow/compute/exec/project_node.cc | 6 +++--- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/cpp/src/arrow/compute/exec/exec_node_runner_impl.cc b/cpp/src/arrow/compute/exec/exec_node_runner_impl.cc index 395ba6a8300..90f424496d6 100644 --- a/cpp/src/arrow/compute/exec/exec_node_runner_impl.cc +++ b/cpp/src/arrow/compute/exec/exec_node_runner_impl.cc @@ -35,7 +35,7 @@ namespace compute { class SimpleSyncRunnerImpl : public ExecNodeRunnerImpl { public: - SimpleSyncRunnerImpl(ExecContext* ctx) : ctx_(ctx) {} + explicit SimpleSyncRunnerImpl(ExecContext* ctx) : ctx_(ctx) {} Status SubmitTask(std::function task) override { if (finished_.is_finished()) { @@ -77,7 +77,7 @@ class SimpleSyncRunnerImpl : public ExecNodeRunnerImpl { class SimpleParallelRunner : public SimpleSyncRunnerImpl { public: - SimpleParallelRunner(ExecContext* ctx) : SimpleSyncRunnerImpl(ctx) { + explicit SimpleParallelRunner(ExecContext* ctx) : SimpleSyncRunnerImpl(ctx) { executor_ = ctx->executor(); } @@ -100,10 +100,10 @@ class SimpleParallelRunner : public SimpleSyncRunnerImpl { void MarkFinished(Status status = Status::OK()) override { if (!status.ok()) { - StopProducing(); - bool cancelled = this->Cancel(); - DCHECK(cancelled); - this->MarkFinished(status); + this->StopProducing(); + if (input_counter_.Cancel()) { + this->MarkFinished(); + } return; } task_group_.End().AddCallback([this](const Status& status) { diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index 97d949afe03..06f923accdc 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -118,9 +118,9 @@ class FilterNode : public ExecNode { auto status = impl_->SubmitTask(task); if (!status.ok()) { StopProducing(); - bool cancelled = impl_->Cancel(); - DCHECK(cancelled); - impl_->MarkFinished(status); + if (impl_->Cancel()) { + impl_->MarkFinished(status); + } return; } } diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index 2a03b28bbe9..8941e11511c 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -113,9 +113,9 @@ class ProjectNode : public ExecNode { auto status = impl_->SubmitTask(task); if (!status.ok()) { StopProducing(); - bool cancelled = impl_->Cancel(); - DCHECK(cancelled); - impl_->MarkFinished(status); + if (impl_->Cancel()) { + impl_->MarkFinished(status); + } return; } } From 5d9367ea597cc0904d4470d03488dfa20678a40d Mon Sep 17 00:00:00 2001 From: Alexander Date: Sat, 2 Oct 2021 17:04:18 -0500 Subject: [PATCH 12/21] update sink_node: DCHECK(cancelled); // could generates an error - auto cancelled = impl_->Cancel(); - DCHECK(cancelled); // could generates an error - impl_->MarkFinished(status); + if (impl_->Cancel()) { + impl_->MarkFinished(status); + } --- cpp/src/arrow/compute/exec/sink_node.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index d84f3c44115..e1c10edd8a9 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -274,9 +274,9 @@ struct OrderBySinkNode final : public SinkNode { plan()->exec_context()->memory_pool()); if (ErrorIfNotOk(maybe_batch.status())) { StopProducing(); - bool cancelled = input_counter_.Cancel(); - DCHECK(cancelled); - finished_.MarkFinished(maybe_batch.status()); + if (input_counter_.Cancel()) { + finished_.MarkFinished(maybe_batch.status()); + } return; } auto record_batch = maybe_batch.MoveValueUnsafe(); From 6da90b0c58476d8239d5430ad2e0143710b2af3b Mon Sep 17 00:00:00 2001 From: Alexander Date: Sat, 2 Oct 2021 19:47:57 -0500 Subject: [PATCH 13/21] minor fix lint fix --- cpp/src/arrow/compute/exec/exec_plan.h | 3 --- cpp/src/arrow/compute/exec/filter_node.cc | 2 +- cpp/src/arrow/compute/exec/project_node.cc | 2 +- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index e97af4d34cf..571e3fb32c9 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -233,9 +233,6 @@ class ARROW_EXPORT ExecNode { /// Provide extra info to include in the string representation. virtual std::string ToStringExtra() const; - /// \brief Is an executor available? - bool has_executor() const { return plan_->exec_context()->executor() != nullptr; } - ExecPlan* plan_; std::string label_; diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index 06f923accdc..da29394ec22 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -19,6 +19,7 @@ #include "arrow/compute/api_vector.h" #include "arrow/compute/exec.h" +#include "arrow/compute/exec/exec_node_runner_impl.h" #include "arrow/compute/exec/expression.h" #include "arrow/compute/exec/options.h" #include "arrow/datum.h" @@ -26,7 +27,6 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/future.h" #include "arrow/util/logging.h" -#include "exec_node_runner_impl.h" namespace arrow { using internal::checked_cast; diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index 8941e11511c..9c6a4c45044 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -21,6 +21,7 @@ #include "arrow/compute/api_vector.h" #include "arrow/compute/exec.h" +#include "arrow/compute/exec/exec_node_runner_impl.h" #include "arrow/compute/exec/expression.h" #include "arrow/compute/exec/options.h" #include "arrow/compute/exec/util.h" @@ -29,7 +30,6 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/future.h" #include "arrow/util/logging.h" -#include "exec_node_runner_impl.h" namespace arrow { From d5950fb945fcdef4838002155243279fc34c6f82 Mon Sep 17 00:00:00 2001 From: Alexander Date: Tue, 5 Oct 2021 22:22:26 -0500 Subject: [PATCH 14/21] create MapNode --- cpp/src/arrow/CMakeLists.txt | 1 - .../compute/exec/exec_node_runner_impl.cc | 149 ------------------ .../compute/exec/exec_node_runner_impl.h | 63 -------- cpp/src/arrow/compute/exec/exec_plan.h | 107 +++++++++++++ cpp/src/arrow/compute/exec/filter_node.cc | 62 +------- cpp/src/arrow/compute/exec/project_node.cc | 63 +------- 6 files changed, 119 insertions(+), 326 deletions(-) delete mode 100644 cpp/src/arrow/compute/exec/exec_node_runner_impl.cc delete mode 100644 cpp/src/arrow/compute/exec/exec_node_runner_impl.h diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 292548afcfc..b82cccf24df 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -375,7 +375,6 @@ if(ARROW_COMPUTE) compute/exec/aggregate_node.cc compute/exec/exec_plan.cc compute/exec/expression.cc - compute/exec/exec_node_runner_impl.cc compute/exec/filter_node.cc compute/exec/project_node.cc compute/exec/source_node.cc diff --git a/cpp/src/arrow/compute/exec/exec_node_runner_impl.cc b/cpp/src/arrow/compute/exec/exec_node_runner_impl.cc deleted file mode 100644 index 90f424496d6..00000000000 --- a/cpp/src/arrow/compute/exec/exec_node_runner_impl.cc +++ /dev/null @@ -1,149 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -#include "arrow/compute/exec/exec_node_runner_impl.h" - -#include -#include -#include - -#include "arrow/compute/exec/options.h" -#include "arrow/compute/exec/util.h" -#include "arrow/record_batch.h" -#include "arrow/result.h" -#include "arrow/status.h" -#include "arrow/type.h" -#include "arrow/util/async_util.h" -#include "arrow/util/cancel.h" -#include "arrow/util/thread_pool.h" - -namespace arrow { -namespace compute { - -class SimpleSyncRunnerImpl : public ExecNodeRunnerImpl { - public: - explicit SimpleSyncRunnerImpl(ExecContext* ctx) : ctx_(ctx) {} - - Status SubmitTask(std::function task) override { - if (finished_.is_finished()) { - return Status::OK(); - } - auto status = task(); - if (!status.ok()) { - return status; - } - if (input_counter_.Increment()) { - this->MarkFinished(); - } - return Status::OK(); - } - - void MarkFinished(Status status = Status::OK()) override { - this->finished_.MarkFinished(status); - } - - void InputFinished(int total_batches) override { - if (input_counter_.SetTotal(total_batches)) { - this->MarkFinished(); - } - } - - void StopProducing() override { - if (input_counter_.Cancel()) { - this->MarkFinished(); - } - } - - protected: - // Executor context - ExecContext* ctx_; - - // Counter for the number of batches received - AtomicCounter input_counter_; -}; - -class SimpleParallelRunner : public SimpleSyncRunnerImpl { - public: - explicit SimpleParallelRunner(ExecContext* ctx) : SimpleSyncRunnerImpl(ctx) { - executor_ = ctx->executor(); - } - - Status SubmitTask(std::function task) override { - if (finished_.is_finished()) { - return Status::OK(); - } - auto status = task_group_.AddTask([this, task]() -> Result> { - return this->executor_->Submit(this->stop_source_.token(), - [task]() { return task(); }); - }); - if (!status.ok()) { - return status; - } - if (input_counter_.Increment()) { - this->MarkFinished(); - } - return Status::OK(); - } - - void MarkFinished(Status status = Status::OK()) override { - if (!status.ok()) { - this->StopProducing(); - if (input_counter_.Cancel()) { - this->MarkFinished(); - } - return; - } - task_group_.End().AddCallback([this](const Status& status) { - if (!status.ok()) { - this->finished_.MarkFinished(); - } else if (!this->finished_.is_finished()) { - this->finished_.MarkFinished(status); - } - }); - } - - void StopProducing() override { - this->stop_source_.RequestStop(); - if (input_counter_.Cancel()) { - this->MarkFinished(); - } - } - - protected: - // The task group for the corresponding batches - util::AsyncTaskGroup task_group_; - - // Executor - ::arrow::internal::Executor* executor_; - - // Variable used to cancel remaining tasks in the executor - StopSource stop_source_; -}; - -Result> ExecNodeRunnerImpl::MakeSimpleSyncRunner( - ExecContext* ctx) { - std::unique_ptr impl{new SimpleSyncRunnerImpl(ctx)}; - return std::move(impl); -} - -Result> ExecNodeRunnerImpl::MakeSimpleParallelRunner( - ExecContext* ctx) { - std::unique_ptr impl{new SimpleParallelRunner(ctx)}; - return std::move(impl); -} - -} // namespace compute -} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/exec_node_runner_impl.h b/cpp/src/arrow/compute/exec/exec_node_runner_impl.h deleted file mode 100644 index f8b4e04ffe6..00000000000 --- a/cpp/src/arrow/compute/exec/exec_node_runner_impl.h +++ /dev/null @@ -1,63 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include - -#include "arrow/compute/exec/util.h" -#include "arrow/result.h" -#include "arrow/status.h" -#include "arrow/type.h" -#include "arrow/util/future.h" - -namespace arrow { -namespace compute { - -class ExecNodeRunnerImpl { - public: - bool Cancel() { return input_counter_.Cancel(); } - - Future<> finished() { return finished_; } - - virtual ~ExecNodeRunnerImpl() = default; - - virtual Status SubmitTask(std::function task) = 0; - - virtual void MarkFinished(Status s = Status::OK()) = 0; - - virtual void InputFinished(int total_batches) = 0; - - virtual void StopProducing() = 0; - - static Result> MakeSimpleSyncRunner( - ExecContext* ctx); - - static Result> MakeSimpleParallelRunner( - ExecContext* ctx); - - protected: - // Counter for the number of batches received - AtomicCounter input_counter_; - // Future to sync finished - Future<> finished_ = Future<>::Make(); -}; - -} // namespace compute -} // namespace arrow diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index 571e3fb32c9..3dceb620232 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -23,10 +23,14 @@ #include #include "arrow/compute/exec.h" +#include "arrow/compute/exec/util.h" #include "arrow/compute/type_fwd.h" #include "arrow/type_fwd.h" +#include "arrow/util/async_util.h" +#include "arrow/util/cancel.h" #include "arrow/util/macros.h" #include "arrow/util/optional.h" +#include "arrow/util/thread_pool.h" #include "arrow/util/visibility.h" namespace arrow { @@ -244,6 +248,109 @@ class ARROW_EXPORT ExecNode { NodeVector outputs_; }; +class MapNode : public ExecNode { + public: + MapNode(ExecPlan* plan, std::vector inputs, + std::shared_ptr output_schema) + : ExecNode(plan, std::move(inputs), /*input_labels=*/{"target"}, + std::move(output_schema), + /*num_outputs=*/1) { + executor_ = plan_->exec_context()->executor(); + } + + void SubmitTask(std::function task) { + if (finished_.is_finished()) { + return; + } + Status status; + if (executor_) { + status = task_group_.AddTask([this, task]() -> Result> { + return this->executor_->Submit(this->stop_source_.token(), [this, task]() { + auto status = task(); + if (this->input_counter_.Increment()) { + this->MarkFinished(status); + } + return status; + }); + }); + } else { + status = task(); + if (input_counter_.Increment()) { + this->MarkFinished(); + } + } + if (!status.ok()) { + StopProducing(); + return; + } + } + + void MarkFinished(Status status = Status::OK()) { + if (executor_) { + if (!status.ok()) { + this->StopProducing(); + } else { + task_group_.End().AddCallback( + [this](const Status& status) { this->finished_.MarkFinished(status); }); + } + } else { + this->finished_.MarkFinished(status); + } + } + + void ErrorReceived(ExecNode* input, Status error) override { + DCHECK_EQ(input, inputs_[0]); + outputs_[0]->ErrorReceived(this, std::move(error)); + } + + void InputFinished(ExecNode* input, int total_batches) override { + DCHECK_EQ(input, inputs_[0]); + outputs_[0]->InputFinished(this, total_batches); + if (input_counter_.SetTotal(total_batches)) { + this->MarkFinished(); + } + } + + Status StartProducing() override { return Status::OK(); } + + void PauseProducing(ExecNode* output) override {} + + void ResumeProducing(ExecNode* output) override {} + + void StopProducing(ExecNode* output) override { + DCHECK_EQ(output, outputs_[0]); + StopProducing(); + } + + void StopProducing() override { + if (executor_) { + this->stop_source_.RequestStop(); + } + if (input_counter_.Cancel()) { + this->MarkFinished(); + } + inputs_[0]->StopProducing(this); + } + + Future<> finished() override { return finished_; } + + protected: + // Counter for the number of batches received + AtomicCounter input_counter_; + + // Future to sync finished + Future<> finished_ = Future<>::Make(); + + // The task group for the corresponding batches + util::AsyncTaskGroup task_group_; + + // Executor + ::arrow::internal::Executor* executor_; + + // Variable used to cancel remaining tasks in the executor + StopSource stop_source_; +}; + /// \brief An extensible registry for factories of ExecNodes class ARROW_EXPORT ExecFactoryRegistry { public: diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index da29394ec22..c916f8aec99 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -19,7 +19,6 @@ #include "arrow/compute/api_vector.h" #include "arrow/compute/exec.h" -#include "arrow/compute/exec/exec_node_runner_impl.h" #include "arrow/compute/exec/expression.h" #include "arrow/compute/exec/options.h" #include "arrow/datum.h" @@ -34,16 +33,12 @@ using internal::checked_cast; namespace compute { namespace { -class FilterNode : public ExecNode { +class FilterNode : public MapNode { public: FilterNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, Expression filter, - std::unique_ptr impl) - : ExecNode(plan, std::move(inputs), /*input_labels=*/{"target"}, - std::move(output_schema), - /*num_outputs=*/1), - filter_(std::move(filter)), - impl_(std::move(impl)) {} + std::shared_ptr output_schema, Expression filter) + : MapNode(plan, std::move(inputs), std::move(output_schema)), + filter_(std::move(filter)) {} static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { @@ -62,16 +57,8 @@ class FilterNode : public ExecNode { filter_expression.ToString(), " evaluates to ", filter_expression.type()->ToString()); } - std::unique_ptr impl; - if (plan->exec_context()->executor() == nullptr) { - ARROW_ASSIGN_OR_RAISE( - impl, ExecNodeRunnerImpl::MakeSimpleSyncRunner(plan->exec_context())); - } else { - ARROW_ASSIGN_OR_RAISE( - impl, ExecNodeRunnerImpl::MakeSimpleParallelRunner(plan->exec_context())); - } return plan->EmplaceNode(plan, std::move(inputs), std::move(schema), - std::move(filter_expression), std::move(impl)); + std::move(filter_expression)); } const char* kind_name() const override { return "FilterNode"; } @@ -115,51 +102,14 @@ class FilterNode : public ExecNode { outputs_[0]->InputReceived(this, maybe_filtered.MoveValueUnsafe()); return Status::OK(); }; - auto status = impl_->SubmitTask(task); - if (!status.ok()) { - StopProducing(); - if (impl_->Cancel()) { - impl_->MarkFinished(status); - } - return; - } - } - - void ErrorReceived(ExecNode* input, Status error) override { - DCHECK_EQ(input, inputs_[0]); - outputs_[0]->ErrorReceived(this, std::move(error)); - } - - void InputFinished(ExecNode* input, int total_batches) override { - DCHECK_EQ(input, inputs_[0]); - outputs_[0]->InputFinished(this, total_batches); - impl_->InputFinished(total_batches); + this->SubmitTask(task); } - Status StartProducing() override { return Status::OK(); } - - void PauseProducing(ExecNode* output) override {} - - void ResumeProducing(ExecNode* output) override {} - - void StopProducing(ExecNode* output) override { - DCHECK_EQ(output, outputs_[0]); - StopProducing(); - } - - void StopProducing() override { - impl_->StopProducing(); - inputs_[0]->StopProducing(this); - } - - Future<> finished() override { return impl_->finished(); } - protected: std::string ToStringExtra() const override { return "filter=" + filter_.ToString(); } private: Expression filter_; - std::unique_ptr impl_; }; } // namespace diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index 9c6a4c45044..d046ea9077c 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -21,7 +21,6 @@ #include "arrow/compute/api_vector.h" #include "arrow/compute/exec.h" -#include "arrow/compute/exec/exec_node_runner_impl.h" #include "arrow/compute/exec/expression.h" #include "arrow/compute/exec/options.h" #include "arrow/compute/exec/util.h" @@ -38,16 +37,12 @@ using internal::checked_cast; namespace compute { namespace { -class ProjectNode : public ExecNode { +class ProjectNode : public MapNode { public: ProjectNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, std::vector exprs, - std::unique_ptr impl) - : ExecNode(plan, std::move(inputs), /*input_labels=*/{"target"}, - std::move(output_schema), - /*num_outputs=*/1), - exprs_(std::move(exprs)), - impl_(std::move(impl)) {} + std::shared_ptr output_schema, std::vector exprs) + : MapNode(plan, std::move(inputs), std::move(output_schema)), + exprs_(std::move(exprs)) {} static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { @@ -73,17 +68,8 @@ class ProjectNode : public ExecNode { fields[i] = field(std::move(names[i]), expr.type()); ++i; } - std::unique_ptr impl; - if (plan->exec_context()->executor() == nullptr) { - ARROW_ASSIGN_OR_RAISE( - impl, ExecNodeRunnerImpl::MakeSimpleSyncRunner(plan->exec_context())); - } else { - ARROW_ASSIGN_OR_RAISE( - impl, ExecNodeRunnerImpl::MakeSimpleParallelRunner(plan->exec_context())); - } return plan->EmplaceNode(plan, std::move(inputs), - schema(std::move(fields)), std::move(exprs), - std::move(impl)); + schema(std::move(fields)), std::move(exprs)); } const char* kind_name() const override { return "ProjectNode"; } @@ -110,45 +96,9 @@ class ProjectNode : public ExecNode { outputs_[0]->InputReceived(this, maybe_projected.MoveValueUnsafe()); return Status::OK(); }; - auto status = impl_->SubmitTask(task); - if (!status.ok()) { - StopProducing(); - if (impl_->Cancel()) { - impl_->MarkFinished(status); - } - return; - } - } - - void ErrorReceived(ExecNode* input, Status error) override { - DCHECK_EQ(input, inputs_[0]); - outputs_[0]->ErrorReceived(this, std::move(error)); - } - - void InputFinished(ExecNode* input, int total_batches) override { - DCHECK_EQ(input, inputs_[0]); - outputs_[0]->InputFinished(this, total_batches); - impl_->InputFinished(total_batches); + this->SubmitTask(task); } - Status StartProducing() override { return Status::OK(); } - - void PauseProducing(ExecNode* output) override {} - - void ResumeProducing(ExecNode* output) override {} - - void StopProducing(ExecNode* output) override { - DCHECK_EQ(output, outputs_[0]); - StopProducing(); - } - - void StopProducing() override { - impl_->StopProducing(); - inputs_[0]->StopProducing(this); - } - - Future<> finished() override { return impl_->finished(); } - protected: std::string ToStringExtra() const override { std::stringstream ss; @@ -167,7 +117,6 @@ class ProjectNode : public ExecNode { private: std::vector exprs_; - std::unique_ptr impl_; }; } // namespace From aff9d66b357a288d7f5550d7d1e5cafe4845244b Mon Sep 17 00:00:00 2001 From: Alexander Date: Tue, 5 Oct 2021 23:20:38 -0500 Subject: [PATCH 15/21] minor changes --- cpp/src/arrow/compute/exec/exec_plan.h | 27 +++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index 3dceb620232..fc4ba3bbdfe 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -268,7 +268,7 @@ class MapNode : public ExecNode { return this->executor_->Submit(this->stop_source_.token(), [this, task]() { auto status = task(); if (this->input_counter_.Increment()) { - this->MarkFinished(status); + this->Finish(status); } return status; }); @@ -276,25 +276,26 @@ class MapNode : public ExecNode { } else { status = task(); if (input_counter_.Increment()) { - this->MarkFinished(); + this->Finish(status); } } if (!status.ok()) { - StopProducing(); + if (input_counter_.Cancel()) { + this->Finish(status); + } + inputs_[0]->StopProducing(this); return; } } - void MarkFinished(Status status = Status::OK()) { + void Finish(Status finish_st = Status::OK()) { if (executor_) { - if (!status.ok()) { - this->StopProducing(); - } else { - task_group_.End().AddCallback( - [this](const Status& status) { this->finished_.MarkFinished(status); }); - } + task_group_.End().AddCallback([this, finish_st](const Status& st) { + Status final_status = finish_st & st; + this->finished_.MarkFinished(final_status); + }); } else { - this->finished_.MarkFinished(status); + this->finished_.MarkFinished(finish_st); } } @@ -307,7 +308,7 @@ class MapNode : public ExecNode { DCHECK_EQ(input, inputs_[0]); outputs_[0]->InputFinished(this, total_batches); if (input_counter_.SetTotal(total_batches)) { - this->MarkFinished(); + this->Finish(); } } @@ -327,7 +328,7 @@ class MapNode : public ExecNode { this->stop_source_.RequestStop(); } if (input_counter_.Cancel()) { - this->MarkFinished(); + this->Finish(); } inputs_[0]->StopProducing(this); } From 35324d8ec518b7adee68f1c53af951229cdce212 Mon Sep 17 00:00:00 2001 From: Alexander Date: Wed, 6 Oct 2021 10:05:34 -0500 Subject: [PATCH 16/21] minor fixes --- cpp/src/arrow/compute/exec/exec_plan.h | 94 +++++++++++++--------- cpp/src/arrow/compute/exec/filter_node.cc | 12 +-- cpp/src/arrow/compute/exec/project_node.cc | 11 +-- 3 files changed, 60 insertions(+), 57 deletions(-) diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index fc4ba3bbdfe..e2dc91db8b0 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -248,6 +248,14 @@ class ARROW_EXPORT ExecNode { NodeVector outputs_; }; +/// \brief MapNode is an ExecNode type class which process a task like filter/project +/// (See SubmitTask method) to each given ExecBatch object, which have one input, one +/// output, and are pure functions on the input +/// +/// A simple parallel runner is created with a "map_fn" which is just a function that +/// takes a batch in and returns a batch. This simple parallel runner also needs an +/// executor (use simple synchronous runner if there is no executor) + class MapNode : public ExecNode { public: MapNode(ExecPlan* plan, std::vector inputs, @@ -258,11 +266,57 @@ class MapNode : public ExecNode { executor_ = plan_->exec_context()->executor(); } - void SubmitTask(std::function task) { + void ErrorReceived(ExecNode* input, Status error) override { + DCHECK_EQ(input, inputs_[0]); + outputs_[0]->ErrorReceived(this, std::move(error)); + } + + void InputFinished(ExecNode* input, int total_batches) override { + DCHECK_EQ(input, inputs_[0]); + outputs_[0]->InputFinished(this, total_batches); + if (input_counter_.SetTotal(total_batches)) { + this->Finish(); + } + } + + Status StartProducing() override { return Status::OK(); } + + void PauseProducing(ExecNode* output) override {} + + void ResumeProducing(ExecNode* output) override {} + + void StopProducing(ExecNode* output) override { + DCHECK_EQ(output, outputs_[0]); + StopProducing(); + } + + void StopProducing() override { + if (executor_) { + this->stop_source_.RequestStop(); + } + if (input_counter_.Cancel()) { + this->Finish(); + } + inputs_[0]->StopProducing(this); + } + + Future<> finished() override { return finished_; } + + protected: + void SubmitTask(std::function(ExecBatch)> map_fn, ExecBatch batch) { + Status status; if (finished_.is_finished()) { return; } - Status status; + auto task = [this, map_fn, batch]() { + auto output_batch = map_fn(std::move(batch)); + if (ErrorIfNotOk(output_batch.status())) { + return output_batch.status(); + } + output_batch->guarantee = batch.guarantee; + outputs_[0]->InputReceived(this, output_batch.MoveValueUnsafe()); + return Status::OK(); + }; if (executor_) { status = task_group_.AddTask([this, task]() -> Result> { return this->executor_->Submit(this->stop_source_.token(), [this, task]() { @@ -299,42 +353,6 @@ class MapNode : public ExecNode { } } - void ErrorReceived(ExecNode* input, Status error) override { - DCHECK_EQ(input, inputs_[0]); - outputs_[0]->ErrorReceived(this, std::move(error)); - } - - void InputFinished(ExecNode* input, int total_batches) override { - DCHECK_EQ(input, inputs_[0]); - outputs_[0]->InputFinished(this, total_batches); - if (input_counter_.SetTotal(total_batches)) { - this->Finish(); - } - } - - Status StartProducing() override { return Status::OK(); } - - void PauseProducing(ExecNode* output) override {} - - void ResumeProducing(ExecNode* output) override {} - - void StopProducing(ExecNode* output) override { - DCHECK_EQ(output, outputs_[0]); - StopProducing(); - } - - void StopProducing() override { - if (executor_) { - this->stop_source_.RequestStop(); - } - if (input_counter_.Cancel()) { - this->Finish(); - } - inputs_[0]->StopProducing(this); - } - - Future<> finished() override { return finished_; } - protected: // Counter for the number of batches received AtomicCounter input_counter_; diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index c916f8aec99..f685f7ba36b 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -93,16 +93,8 @@ class FilterNode : public MapNode { void InputReceived(ExecNode* input, ExecBatch batch) override { DCHECK_EQ(input, inputs_[0]); - auto task = [this, batch]() { - auto maybe_filtered = DoFilter(std::move(batch)); - if (ErrorIfNotOk(maybe_filtered.status())) { - return maybe_filtered.status(); - } - maybe_filtered->guarantee = batch.guarantee; - outputs_[0]->InputReceived(this, maybe_filtered.MoveValueUnsafe()); - return Status::OK(); - }; - this->SubmitTask(task); + auto func = [this](ExecBatch batch) { return DoFilter(std::move(batch)); }; + this->SubmitTask(func, std::move(batch)); } protected: diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index d046ea9077c..34cf6cfc6cb 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -88,15 +88,8 @@ class ProjectNode : public MapNode { void InputReceived(ExecNode* input, ExecBatch batch) override { DCHECK_EQ(input, inputs_[0]); - auto task = [this, batch]() { - auto maybe_projected = DoProject(std::move(batch)); - if (ErrorIfNotOk(maybe_projected.status())) return maybe_projected.status(); - - maybe_projected->guarantee = batch.guarantee; - outputs_[0]->InputReceived(this, maybe_projected.MoveValueUnsafe()); - return Status::OK(); - }; - this->SubmitTask(task); + auto func = [this](ExecBatch batch) { return DoProject(std::move(batch)); }; + this->SubmitTask(func, std::move(batch)); } protected: From ff578d76872500911ffb1dd39f87d031719621da Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 7 Oct 2021 23:13:28 -0500 Subject: [PATCH 17/21] minor changes and benchmark added --- cpp/src/arrow/compute/exec/exec_plan.cc | 97 +++++++++++ cpp/src/arrow/compute/exec/exec_plan.h | 95 ++--------- cpp/src/arrow/compute/exec/filter_node.cc | 2 +- cpp/src/arrow/compute/exec/project_node.cc | 2 +- cpp/src/arrow/dataset/CMakeLists.txt | 3 + cpp/src/arrow/dataset/scanner_benchmark.cc | 186 +++++++++++++++++++++ 6 files changed, 299 insertions(+), 86 deletions(-) create mode 100644 cpp/src/arrow/dataset/scanner_benchmark.cc diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index 502f0d06693..a03f8d4b9d8 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -287,6 +287,103 @@ bool ExecNode::ErrorIfNotOk(Status status) { return true; } +MapNode::MapNode(ExecPlan* plan, std::vector inputs, + std::shared_ptr output_schema) + : ExecNode(plan, std::move(inputs), /*input_labels=*/{"target"}, + std::move(output_schema), + /*num_outputs=*/1) { + executor_ = plan_->exec_context()->executor(); +} + +void MapNode::ErrorReceived(ExecNode* input, Status error) { + DCHECK_EQ(input, inputs_[0]); + outputs_[0]->ErrorReceived(this, std::move(error)); +} + +void MapNode::InputFinished(ExecNode* input, int total_batches) { + DCHECK_EQ(input, inputs_[0]); + outputs_[0]->InputFinished(this, total_batches); + if (input_counter_.SetTotal(total_batches)) { + this->Finish(); + } +} + +Status MapNode::StartProducing() { return Status::OK(); } + +void MapNode::PauseProducing(ExecNode* output) {} + +void MapNode::ResumeProducing(ExecNode* output) {} + +void MapNode::StopProducing(ExecNode* output) { + DCHECK_EQ(output, outputs_[0]); + StopProducing(); +} + +void MapNode::StopProducing() { + if (executor_) { + this->stop_source_.RequestStop(); + } + if (input_counter_.Cancel()) { + this->Finish(); + } + inputs_[0]->StopProducing(this); +} + +Future<> MapNode::finished() { return finished_; } + +void MapNode::SubmitTask(std::function(ExecBatch)> map_fn, + ExecBatch batch) { + Status status; + if (finished_.is_finished()) { + return; + } + auto task = [this, map_fn, batch]() { + auto guarantee = batch.guarantee; + auto output_batch = map_fn(std::move(batch)); + if (ErrorIfNotOk(output_batch.status())) { + return output_batch.status(); + } + output_batch->guarantee = guarantee; + outputs_[0]->InputReceived(this, output_batch.MoveValueUnsafe()); + return Status::OK(); + }; + + if (executor_) { + status = task_group_.AddTask([this, task]() -> Result> { + return this->executor_->Submit(this->stop_source_.token(), [this, task]() { + auto status = task(); + if (this->input_counter_.Increment()) { + this->Finish(status); + } + return status; + }); + }); + } else { + status = task(); + if (input_counter_.Increment()) { + this->Finish(status); + } + } + if (!status.ok()) { + if (input_counter_.Cancel()) { + this->Finish(status); + } + inputs_[0]->StopProducing(this); + return; + } +} + +void MapNode::Finish(Status finish_st /*= Status::OK()*/) { + if (executor_) { + task_group_.End().AddCallback([this, finish_st](const Status& st) { + Status final_status = finish_st & st; + this->finished_.MarkFinished(final_status); + }); + } else { + this->finished_.MarkFinished(finish_st); + } +} + std::shared_ptr MakeGeneratorReader( std::shared_ptr schema, std::function>()> gen, MemoryPool* pool) { diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index e2dc91db8b0..ec286df6553 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -30,7 +30,6 @@ #include "arrow/util/cancel.h" #include "arrow/util/macros.h" #include "arrow/util/optional.h" -#include "arrow/util/thread_pool.h" #include "arrow/util/visibility.h" namespace arrow { @@ -259,99 +258,28 @@ class ARROW_EXPORT ExecNode { class MapNode : public ExecNode { public: MapNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema) - : ExecNode(plan, std::move(inputs), /*input_labels=*/{"target"}, - std::move(output_schema), - /*num_outputs=*/1) { - executor_ = plan_->exec_context()->executor(); - } + std::shared_ptr output_schema); - void ErrorReceived(ExecNode* input, Status error) override { - DCHECK_EQ(input, inputs_[0]); - outputs_[0]->ErrorReceived(this, std::move(error)); - } + void ErrorReceived(ExecNode* input, Status error) override; - void InputFinished(ExecNode* input, int total_batches) override { - DCHECK_EQ(input, inputs_[0]); - outputs_[0]->InputFinished(this, total_batches); - if (input_counter_.SetTotal(total_batches)) { - this->Finish(); - } - } + void InputFinished(ExecNode* input, int total_batches) override; - Status StartProducing() override { return Status::OK(); } + Status StartProducing() override; - void PauseProducing(ExecNode* output) override {} + void PauseProducing(ExecNode* output) override; - void ResumeProducing(ExecNode* output) override {} + void ResumeProducing(ExecNode* output) override; - void StopProducing(ExecNode* output) override { - DCHECK_EQ(output, outputs_[0]); - StopProducing(); - } + void StopProducing(ExecNode* output) override; - void StopProducing() override { - if (executor_) { - this->stop_source_.RequestStop(); - } - if (input_counter_.Cancel()) { - this->Finish(); - } - inputs_[0]->StopProducing(this); - } + void StopProducing() override; - Future<> finished() override { return finished_; } + Future<> finished() override; protected: - void SubmitTask(std::function(ExecBatch)> map_fn, ExecBatch batch) { - Status status; - if (finished_.is_finished()) { - return; - } - auto task = [this, map_fn, batch]() { - auto output_batch = map_fn(std::move(batch)); - if (ErrorIfNotOk(output_batch.status())) { - return output_batch.status(); - } - output_batch->guarantee = batch.guarantee; - outputs_[0]->InputReceived(this, output_batch.MoveValueUnsafe()); - return Status::OK(); - }; - if (executor_) { - status = task_group_.AddTask([this, task]() -> Result> { - return this->executor_->Submit(this->stop_source_.token(), [this, task]() { - auto status = task(); - if (this->input_counter_.Increment()) { - this->Finish(status); - } - return status; - }); - }); - } else { - status = task(); - if (input_counter_.Increment()) { - this->Finish(status); - } - } - if (!status.ok()) { - if (input_counter_.Cancel()) { - this->Finish(status); - } - inputs_[0]->StopProducing(this); - return; - } - } + void SubmitTask(std::function(ExecBatch)> map_fn, ExecBatch batch); - void Finish(Status finish_st = Status::OK()) { - if (executor_) { - task_group_.End().AddCallback([this, finish_st](const Status& st) { - Status final_status = finish_st & st; - this->finished_.MarkFinished(final_status); - }); - } else { - this->finished_.MarkFinished(finish_st); - } - } + void Finish(Status finish_st = Status::OK()); protected: // Counter for the number of batches received @@ -363,7 +291,6 @@ class MapNode : public ExecNode { // The task group for the corresponding batches util::AsyncTaskGroup task_group_; - // Executor ::arrow::internal::Executor* executor_; // Variable used to cancel remaining tasks in the executor diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index f685f7ba36b..518a43d40ee 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -94,7 +94,7 @@ class FilterNode : public MapNode { void InputReceived(ExecNode* input, ExecBatch batch) override { DCHECK_EQ(input, inputs_[0]); auto func = [this](ExecBatch batch) { return DoFilter(std::move(batch)); }; - this->SubmitTask(func, std::move(batch)); + this->SubmitTask(std::move(func), std::move(batch)); } protected: diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index 34cf6cfc6cb..b627efd81d2 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -89,7 +89,7 @@ class ProjectNode : public MapNode { void InputReceived(ExecNode* input, ExecBatch batch) override { DCHECK_EQ(input, inputs_[0]); auto func = [this](ExecBatch batch) { return DoProject(std::move(batch)); }; - this->SubmitTask(func, std::move(batch)); + this->SubmitTask(std::move(func), std::move(batch)); } protected: diff --git a/cpp/src/arrow/dataset/CMakeLists.txt b/cpp/src/arrow/dataset/CMakeLists.txt index c601e9fb1e2..984116a0047 100644 --- a/cpp/src/arrow/dataset/CMakeLists.txt +++ b/cpp/src/arrow/dataset/CMakeLists.txt @@ -133,10 +133,13 @@ endif() if(ARROW_BUILD_BENCHMARKS) add_arrow_benchmark(file_benchmark PREFIX "arrow-dataset") + add_arrow_benchmark(scanner_benchmark PREFIX "arrow-dataset") if(ARROW_BUILD_STATIC) target_link_libraries(arrow-dataset-file-benchmark PUBLIC arrow_dataset_static) + target_link_libraries(arrow-dataset-scanner-benchmark PUBLIC arrow_dataset_static) else() target_link_libraries(arrow-dataset-file-benchmark PUBLIC arrow_dataset_shared) + target_link_libraries(arrow-dataset-scanner-benchmark PUBLIC arrow_dataset_static) endif() endif() diff --git a/cpp/src/arrow/dataset/scanner_benchmark.cc b/cpp/src/arrow/dataset/scanner_benchmark.cc new file mode 100644 index 00000000000..2e90e34db01 --- /dev/null +++ b/cpp/src/arrow/dataset/scanner_benchmark.cc @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "benchmark/benchmark.h" + +#include "arrow/api.h" +#include "arrow/compute/api.h" +#include "arrow/compute/exec/options.h" +#include "arrow/compute/exec/test_util.h" +#include "arrow/dataset/dataset.h" +#include "arrow/dataset/scanner.h" +#include "arrow/dataset/scanner_internal.h" +#include "arrow/dataset/test_util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/matchers.h" +#include "arrow/testing/random.h" + +namespace arrow { +namespace compute { + +constexpr auto kSeed = 0x0ff1ce; + +void GenerateBatchesFromSchema(const std::shared_ptr& schema, size_t num_batches, + BatchesWithSchema* out_batches, int multiplicity = 1, + int64_t batch_size = 4) { + ::arrow::random::RandomArrayGenerator rng_(kSeed); + if (num_batches == 0) { + auto empty_record_batch = ExecBatch(*rng_.BatchOf(schema->fields(), 0)); + out_batches->batches.push_back(empty_record_batch); + } else { + for (size_t j = 0; j < num_batches; j++) { + out_batches->batches.push_back( + ExecBatch(*rng_.BatchOf(schema->fields(), batch_size))); + } + } + + size_t batch_count = out_batches->batches.size(); + for (int repeat = 1; repeat < multiplicity; ++repeat) { + for (size_t i = 0; i < batch_count; ++i) { + out_batches->batches.push_back(out_batches->batches[i]); + } + } + out_batches->schema = schema; +} + +RecordBatchVector GenerateBatches(const std::shared_ptr& schema, + size_t num_batches, size_t batch_size) { + BatchesWithSchema input_batches; + + RecordBatchVector batches; + GenerateBatchesFromSchema(schema, num_batches, &input_batches, 1, batch_size); + + for (const auto& batch : input_batches.batches) { + batches.push_back(batch.ToRecordBatch(schema).MoveValueUnsafe()); + } + return batches; +} + +} // namespace compute + +namespace dataset { + +void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool use_executor) { + // NB: This test is here for didactic purposes + + // Specify a MemoryPool and ThreadPool for the ExecPlan + compute::ExecContext exec_context( + default_memory_pool(), + use_executor ? ::arrow::internal::GetCpuThreadPool() : nullptr); + + // ensure arrow::dataset node factories are in the registry + arrow::dataset::internal::Initialize(); + + // A ScanNode is constructed from an ExecPlan (into which it is inserted), + // a Dataset (whose batches will be scanned), and ScanOptions (to specify a filter for + // predicate pushdown, a projection to skip materialization of unnecessary columns, + // ...) + ASSERT_OK_AND_ASSIGN(std::shared_ptr plan, + compute::ExecPlan::Make(&exec_context)); + + std::shared_ptr sch = schema({field("a", int32()), field("b", boolean())}); + RecordBatchVector batches = + ::arrow::compute::GenerateBatches(sch, num_batches, batch_size); + + std::shared_ptr dataset = std::make_shared(sch, batches); + + auto options = std::make_shared(); + // sync scanning is not supported by ScanNode + options->use_async = true; + // specify the filter + compute::Expression b_is_true = field_ref("b"); + options->filter = b_is_true; + // for now, specify the projection as the full project expression (eventually this can + // just be a list of materialized field names) + compute::Expression a_times_2 = call("multiply", {field_ref("a"), literal(2)}); + options->projection = + call("make_struct", {a_times_2}, compute::MakeStructOptions{{"a * 2"}}); + + // construct the scan node + ASSERT_OK_AND_ASSIGN( + compute::ExecNode * scan, + compute::MakeExecNode("scan", plan.get(), {}, ScanNodeOptions{dataset, options})); + + // pipe the scan node into a filter node + ASSERT_OK_AND_ASSIGN(compute::ExecNode * filter, + compute::MakeExecNode("filter", plan.get(), {scan}, + compute::FilterNodeOptions{b_is_true})); + + // pipe the filter node into a project node + // NB: we're using the project node factory which preserves fragment/batch index + // tagging, so we *can* reorder later if we choose. The tags will not appear in + // our output. + ASSERT_OK_AND_ASSIGN(compute::ExecNode * project, + compute::MakeExecNode("augmented_project", plan.get(), {filter}, + compute::ProjectNodeOptions{{a_times_2}})); + + // finally, pipe the project node into a sink node + AsyncGenerator> sink_gen; + ASSERT_OK_AND_ASSIGN(compute::ExecNode * sink, + compute::MakeExecNode("ordered_sink", plan.get(), {project}, + compute::SinkNodeOptions{&sink_gen})); + + ASSERT_NE(sink, nullptr); + + // translate sink_gen (async) to sink_reader (sync) + std::shared_ptr sink_reader = compute::MakeGeneratorReader( + schema({field("a * 2", int32())}), std::move(sink_gen), exec_context.memory_pool()); + + // start the ExecPlan + ASSERT_OK(plan->StartProducing()); + + // collect sink_reader into a Table + ASSERT_OK_AND_ASSIGN(auto collected, Table::FromRecordBatchReader(sink_reader.get())); + + // Sort table + ASSERT_OK_AND_ASSIGN( + auto indices, + compute::SortIndices(collected, compute::SortOptions({compute::SortKey( + "a * 2", compute::SortOrder::Ascending)}))); + ASSERT_OK_AND_ASSIGN(auto sorted, compute::Take(collected, indices)); + + // wait 1s for completion + ASSERT_TRUE(plan->finished().Wait(/*seconds=*/1)) << "ExecPlan didn't finish within 1s"; +} + +static void MinimalEndToEndBench(benchmark::State& state) { + size_t num_batches = state.range(0); + size_t batch_size = state.range(1); + bool use_executor = state.range(2); + + for (auto _ : state) { + MinimalEndToEndScan(num_batches, batch_size, use_executor); + } + state.SetItemsProcessed(state.iterations()); +} + +BENCHMARK(MinimalEndToEndBench) + ->Args({100, 10, false}) + ->Args({1000, 100, false}) + ->Args({10000, 100, false}) + ->Args({10000, 1000, false}) + ->Args({10000, 10000, false}) + ->Args({100, 10, true}) + ->Args({1000, 100, true}) + ->Args({10000, 100, true}) + ->Args({10000, 1000, true}) + ->Args({10000, 10000, true}) + ->MinTime(1.0) + ->Unit(benchmark::TimeUnit::kMillisecond); + +} // namespace dataset +} // namespace arrow From c20e5b35dfdd3618a42bac5ad3d437248eeedf76 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 7 Oct 2021 23:38:49 -0500 Subject: [PATCH 18/21] minor change --- cpp/src/arrow/dataset/scanner_benchmark.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner_benchmark.cc b/cpp/src/arrow/dataset/scanner_benchmark.cc index 2e90e34db01..a28f6433e3d 100644 --- a/cpp/src/arrow/dataset/scanner_benchmark.cc +++ b/cpp/src/arrow/dataset/scanner_benchmark.cc @@ -22,8 +22,8 @@ #include "arrow/compute/exec/options.h" #include "arrow/compute/exec/test_util.h" #include "arrow/dataset/dataset.h" +#include "arrow/dataset/plan.h" #include "arrow/dataset/scanner.h" -#include "arrow/dataset/scanner_internal.h" #include "arrow/dataset/test_util.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/matchers.h" @@ -83,7 +83,7 @@ void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool use_executo use_executor ? ::arrow::internal::GetCpuThreadPool() : nullptr); // ensure arrow::dataset node factories are in the registry - arrow::dataset::internal::Initialize(); + ::arrow::dataset::internal::Initialize(); // A ScanNode is constructed from an ExecPlan (into which it is inserted), // a Dataset (whose batches will be scanned), and ScanOptions (to specify a filter for From c8cc2428c5091f362fbf71fc5271db6411f250fa Mon Sep 17 00:00:00 2001 From: Alexander Date: Fri, 8 Oct 2021 09:29:31 -0500 Subject: [PATCH 19/21] minor changes to the bench code --- cpp/src/arrow/dataset/scanner_benchmark.cc | 73 ++++++++++++++-------- 1 file changed, 48 insertions(+), 25 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner_benchmark.cc b/cpp/src/arrow/dataset/scanner_benchmark.cc index a28f6433e3d..ff760fd9820 100644 --- a/cpp/src/arrow/dataset/scanner_benchmark.cc +++ b/cpp/src/arrow/dataset/scanner_benchmark.cc @@ -74,6 +74,28 @@ RecordBatchVector GenerateBatches(const std::shared_ptr& schema, namespace dataset { +static std::map, RecordBatchVector> datasets; + +void StoreBatches(size_t num_batches, size_t batch_size, + const RecordBatchVector& batches) { + datasets[std::make_pair(num_batches, batch_size)] = batches; +} + +RecordBatchVector GetBatches(size_t num_batches, size_t batch_size) { + auto iter = datasets.find(std::make_pair(num_batches, batch_size)); + if (iter == datasets.end()) { + return RecordBatchVector{}; + } + return iter->second; +} + +std::shared_ptr GetSchema() { + static std::shared_ptr s = schema({field("a", int32()), field("b", boolean())}); + return s; +} + +size_t GetBytesForSchema() { return sizeof(int32_t) + sizeof(bool); } + void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool use_executor) { // NB: This test is here for didactic purposes @@ -92,11 +114,10 @@ void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool use_executo ASSERT_OK_AND_ASSIGN(std::shared_ptr plan, compute::ExecPlan::Make(&exec_context)); - std::shared_ptr sch = schema({field("a", int32()), field("b", boolean())}); - RecordBatchVector batches = - ::arrow::compute::GenerateBatches(sch, num_batches, batch_size); + RecordBatchVector batches = GetBatches(num_batches, batch_size); - std::shared_ptr dataset = std::make_shared(sch, batches); + std::shared_ptr dataset = + std::make_shared(GetSchema(), batches); auto options = std::make_shared(); // sync scanning is not supported by ScanNode @@ -131,7 +152,7 @@ void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool use_executo // finally, pipe the project node into a sink node AsyncGenerator> sink_gen; ASSERT_OK_AND_ASSIGN(compute::ExecNode * sink, - compute::MakeExecNode("ordered_sink", plan.get(), {project}, + compute::MakeExecNode("sink", plan.get(), {project}, compute::SinkNodeOptions{&sink_gen})); ASSERT_NE(sink, nullptr); @@ -146,12 +167,7 @@ void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool use_executo // collect sink_reader into a Table ASSERT_OK_AND_ASSIGN(auto collected, Table::FromRecordBatchReader(sink_reader.get())); - // Sort table - ASSERT_OK_AND_ASSIGN( - auto indices, - compute::SortIndices(collected, compute::SortOptions({compute::SortKey( - "a * 2", compute::SortOrder::Ascending)}))); - ASSERT_OK_AND_ASSIGN(auto sorted, compute::Take(collected, indices)); + ASSERT_GT(collected->num_rows(), 0); // wait 1s for completion ASSERT_TRUE(plan->finished().Wait(/*seconds=*/1)) << "ExecPlan didn't finish within 1s"; @@ -165,22 +181,29 @@ static void MinimalEndToEndBench(benchmark::State& state) { for (auto _ : state) { MinimalEndToEndScan(num_batches, batch_size, use_executor); } - state.SetItemsProcessed(state.iterations()); + state.SetItemsProcessed(state.iterations() * num_batches); + state.SetBytesProcessed(state.iterations() * num_batches * batch_size * + GetBytesForSchema()); +} + +static const std::vector kWorkload = {100, 1000, 10000, 100000}; + +static void MinimalEndToEnd_Customize(benchmark::internal::Benchmark* b) { + for (const int32_t num_batches : kWorkload) { + for (const int batch_size : {10, 100, 1000}) { + for (const bool use_executor : {true, false}) { + b->Args({num_batches, batch_size, use_executor}); + RecordBatchVector batches = + ::arrow::compute::GenerateBatches(GetSchema(), num_batches, batch_size); + StoreBatches(num_batches, batch_size, batches); + } + } + } + b->ArgNames({"num_batches", "batch_size", "use_executor"}); + b->UseRealTime(); } -BENCHMARK(MinimalEndToEndBench) - ->Args({100, 10, false}) - ->Args({1000, 100, false}) - ->Args({10000, 100, false}) - ->Args({10000, 1000, false}) - ->Args({10000, 10000, false}) - ->Args({100, 10, true}) - ->Args({1000, 100, true}) - ->Args({10000, 100, true}) - ->Args({10000, 1000, true}) - ->Args({10000, 10000, true}) - ->MinTime(1.0) - ->Unit(benchmark::TimeUnit::kMillisecond); +BENCHMARK(MinimalEndToEndBench)->Apply(MinimalEndToEnd_Customize); } // namespace dataset } // namespace arrow From a985186e6c59fb4d4f5dcc04a64834c2f3439ac8 Mon Sep 17 00:00:00 2001 From: Alexander Date: Mon, 11 Oct 2021 09:43:17 -0500 Subject: [PATCH 20/21] add async_mode flag --- cpp/src/arrow/compute/exec/exec_plan.cc | 6 +++-- cpp/src/arrow/compute/exec/exec_plan.h | 4 +-- cpp/src/arrow/compute/exec/filter_node.cc | 7 ++--- cpp/src/arrow/compute/exec/options.h | 12 ++++++--- cpp/src/arrow/compute/exec/project_node.cc | 8 +++--- cpp/src/arrow/dataset/scanner_benchmark.cc | 31 +++++++++++----------- 6 files changed, 39 insertions(+), 29 deletions(-) diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index a03f8d4b9d8..e3ebfe5a6d6 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -288,11 +288,13 @@ bool ExecNode::ErrorIfNotOk(Status status) { } MapNode::MapNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema) + std::shared_ptr output_schema, bool async_mode) : ExecNode(plan, std::move(inputs), /*input_labels=*/{"target"}, std::move(output_schema), /*num_outputs=*/1) { - executor_ = plan_->exec_context()->executor(); + if (async_mode) { + executor_ = plan_->exec_context()->executor(); + } } void MapNode::ErrorReceived(ExecNode* input, Status error) { diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index ec286df6553..b60e6915f7e 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -258,7 +258,7 @@ class ARROW_EXPORT ExecNode { class MapNode : public ExecNode { public: MapNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema); + std::shared_ptr output_schema, bool async_mode); void ErrorReceived(ExecNode* input, Status error) override; @@ -291,7 +291,7 @@ class MapNode : public ExecNode { // The task group for the corresponding batches util::AsyncTaskGroup task_group_; - ::arrow::internal::Executor* executor_; + ::arrow::internal::Executor* executor_{nullptr}; // Variable used to cancel remaining tasks in the executor StopSource stop_source_; diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index 518a43d40ee..2e6d974dc13 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -36,8 +36,8 @@ namespace { class FilterNode : public MapNode { public: FilterNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, Expression filter) - : MapNode(plan, std::move(inputs), std::move(output_schema)), + std::shared_ptr output_schema, Expression filter, bool async_mode) + : MapNode(plan, std::move(inputs), std::move(output_schema), async_mode), filter_(std::move(filter)) {} static Result Make(ExecPlan* plan, std::vector inputs, @@ -58,7 +58,8 @@ class FilterNode : public MapNode { filter_expression.type()->ToString()); } return plan->EmplaceNode(plan, std::move(inputs), std::move(schema), - std::move(filter_expression)); + std::move(filter_expression), + filter_options.async_mode); } const char* kind_name() const override { return "FilterNode"; } diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index cde118acd8a..1fc6db642e0 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -58,10 +58,11 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions { /// excluded in the batch emitted by this node. class ARROW_EXPORT FilterNodeOptions : public ExecNodeOptions { public: - explicit FilterNodeOptions(Expression filter_expression) - : filter_expression(std::move(filter_expression)) {} + explicit FilterNodeOptions(Expression filter_expression, bool async_mode = true) + : filter_expression(std::move(filter_expression)), async_mode(async_mode) {} Expression filter_expression; + bool async_mode; }; /// \brief Make a node which executes expressions on input batches, producing new batches. @@ -73,11 +74,14 @@ class ARROW_EXPORT FilterNodeOptions : public ExecNodeOptions { class ARROW_EXPORT ProjectNodeOptions : public ExecNodeOptions { public: explicit ProjectNodeOptions(std::vector expressions, - std::vector names = {}) - : expressions(std::move(expressions)), names(std::move(names)) {} + std::vector names = {}, bool async_mode = true) + : expressions(std::move(expressions)), + names(std::move(names)), + async_mode(async_mode) {} std::vector expressions; std::vector names; + bool async_mode; }; /// \brief Make a node which aggregates input batches, optionally grouped by keys. diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index b627efd81d2..c675acb3d98 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -40,8 +40,9 @@ namespace { class ProjectNode : public MapNode { public: ProjectNode(ExecPlan* plan, std::vector inputs, - std::shared_ptr output_schema, std::vector exprs) - : MapNode(plan, std::move(inputs), std::move(output_schema)), + std::shared_ptr output_schema, std::vector exprs, + bool async_mode) + : MapNode(plan, std::move(inputs), std::move(output_schema), async_mode), exprs_(std::move(exprs)) {} static Result Make(ExecPlan* plan, std::vector inputs, @@ -69,7 +70,8 @@ class ProjectNode : public MapNode { ++i; } return plan->EmplaceNode(plan, std::move(inputs), - schema(std::move(fields)), std::move(exprs)); + schema(std::move(fields)), std::move(exprs), + project_options.async_mode); } const char* kind_name() const override { return "ProjectNode"; } diff --git a/cpp/src/arrow/dataset/scanner_benchmark.cc b/cpp/src/arrow/dataset/scanner_benchmark.cc index ff760fd9820..e3021794c5d 100644 --- a/cpp/src/arrow/dataset/scanner_benchmark.cc +++ b/cpp/src/arrow/dataset/scanner_benchmark.cc @@ -96,13 +96,12 @@ std::shared_ptr GetSchema() { size_t GetBytesForSchema() { return sizeof(int32_t) + sizeof(bool); } -void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool use_executor) { +void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool async_mode) { // NB: This test is here for didactic purposes // Specify a MemoryPool and ThreadPool for the ExecPlan - compute::ExecContext exec_context( - default_memory_pool(), - use_executor ? ::arrow::internal::GetCpuThreadPool() : nullptr); + compute::ExecContext exec_context(default_memory_pool(), + ::arrow::internal::GetCpuThreadPool()); // ensure arrow::dataset node factories are in the registry ::arrow::dataset::internal::Initialize(); @@ -137,17 +136,19 @@ void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool use_executo compute::MakeExecNode("scan", plan.get(), {}, ScanNodeOptions{dataset, options})); // pipe the scan node into a filter node - ASSERT_OK_AND_ASSIGN(compute::ExecNode * filter, - compute::MakeExecNode("filter", plan.get(), {scan}, - compute::FilterNodeOptions{b_is_true})); + ASSERT_OK_AND_ASSIGN( + compute::ExecNode * filter, + compute::MakeExecNode("filter", plan.get(), {scan}, + compute::FilterNodeOptions{b_is_true, async_mode})); // pipe the filter node into a project node // NB: we're using the project node factory which preserves fragment/batch index // tagging, so we *can* reorder later if we choose. The tags will not appear in // our output. - ASSERT_OK_AND_ASSIGN(compute::ExecNode * project, - compute::MakeExecNode("augmented_project", plan.get(), {filter}, - compute::ProjectNodeOptions{{a_times_2}})); + ASSERT_OK_AND_ASSIGN( + compute::ExecNode * project, + compute::MakeExecNode("augmented_project", plan.get(), {filter}, + compute::ProjectNodeOptions{{a_times_2}, {}, async_mode})); // finally, pipe the project node into a sink node AsyncGenerator> sink_gen; @@ -176,10 +177,10 @@ void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool use_executo static void MinimalEndToEndBench(benchmark::State& state) { size_t num_batches = state.range(0); size_t batch_size = state.range(1); - bool use_executor = state.range(2); + bool async_mode = state.range(2); for (auto _ : state) { - MinimalEndToEndScan(num_batches, batch_size, use_executor); + MinimalEndToEndScan(num_batches, batch_size, async_mode); } state.SetItemsProcessed(state.iterations() * num_batches); state.SetBytesProcessed(state.iterations() * num_batches * batch_size * @@ -191,15 +192,15 @@ static const std::vector kWorkload = {100, 1000, 10000, 100000}; static void MinimalEndToEnd_Customize(benchmark::internal::Benchmark* b) { for (const int32_t num_batches : kWorkload) { for (const int batch_size : {10, 100, 1000}) { - for (const bool use_executor : {true, false}) { - b->Args({num_batches, batch_size, use_executor}); + for (const bool async_mode : {true, false}) { + b->Args({num_batches, batch_size, async_mode}); RecordBatchVector batches = ::arrow::compute::GenerateBatches(GetSchema(), num_batches, batch_size); StoreBatches(num_batches, batch_size, batches); } } } - b->ArgNames({"num_batches", "batch_size", "use_executor"}); + b->ArgNames({"num_batches", "batch_size", "async_mode"}); b->UseRealTime(); } From 92ed4db1cec04d84440d576e3f2d347c00364020 Mon Sep 17 00:00:00 2001 From: Alexander Date: Mon, 11 Oct 2021 10:03:13 -0500 Subject: [PATCH 21/21] minor CI fix --- cpp/src/arrow/compute/exec/exec_plan.cc | 2 ++ cpp/src/arrow/compute/exec/exec_plan.h | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index e3ebfe5a6d6..92654ffb252 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -294,6 +294,8 @@ MapNode::MapNode(ExecPlan* plan, std::vector inputs, /*num_outputs=*/1) { if (async_mode) { executor_ = plan_->exec_context()->executor(); + } else { + executor_ = nullptr; } } diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index b60e6915f7e..cad087ef679 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -291,7 +291,7 @@ class MapNode : public ExecNode { // The task group for the corresponding batches util::AsyncTaskGroup task_group_; - ::arrow::internal::Executor* executor_{nullptr}; + ::arrow::internal::Executor* executor_; // Variable used to cancel remaining tasks in the executor StopSource stop_source_;