From e4fe0f813e7265c9030171322bb93c4f50e5fd5f Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Mon, 21 Mar 2022 22:20:41 +0530 Subject: [PATCH 1/3] remove: thread tasks in exec plan --- cpp/src/arrow/compute/exec/exec_plan.cc | 26 ++++--------------------- cpp/src/arrow/compute/exec/exec_plan.h | 3 --- 2 files changed, 4 insertions(+), 25 deletions(-) diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index c383c6092af..b73c523810d 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -436,22 +436,11 @@ void MapNode::SubmitTask(std::function(ExecBatch)> map_fn, return Status::OK(); }; - if (executor_) { - status = task_group_.AddTask([this, task]() -> Result> { - return this->executor_->Submit(this->stop_source_.token(), [this, task]() { - auto status = task(); - if (this->input_counter_.Increment()) { - this->Finish(status); - } - return status; - }); - }); - } else { - status = task(); - if (input_counter_.Increment()) { - this->Finish(status); - } + status = task(); + if (input_counter_.Increment()) { + this->Finish(status); } + // If we get a cancelled status from AddTask it means this node was stopped // or errored out already so we can just drop the task. if (!status.ok() && !status.IsCancelled()) { @@ -464,14 +453,7 @@ void MapNode::SubmitTask(std::function(ExecBatch)> map_fn, } void MapNode::Finish(Status finish_st /*= Status::OK()*/) { - if (executor_) { - task_group_.End().AddCallback([this, finish_st](const Status& st) { - Status final_status = finish_st & st; - this->finished_.MarkFinished(final_status); - }); - } else { this->finished_.MarkFinished(finish_st); - } } std::shared_ptr MakeGeneratorReader( diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index 85bfd5f756d..4d0fb1f3243 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -300,9 +300,6 @@ class MapNode : public ExecNode { // Counter for the number of batches received AtomicCounter input_counter_; - // The task group for the corresponding batches - util::AsyncTaskGroup task_group_; - ::arrow::internal::Executor* executor_; // Variable used to cancel remaining tasks in the executor From 7cfe02c99b787c709c966cecfeb079cfc7687997 Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Sat, 26 Mar 2022 02:32:05 +0530 Subject: [PATCH 2/3] fix: cpp lint --- cpp/src/arrow/compute/exec/exec_plan.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index b73c523810d..e565b8ea09d 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -453,7 +453,7 @@ void MapNode::SubmitTask(std::function(ExecBatch)> map_fn, } void MapNode::Finish(Status finish_st /*= Status::OK()*/) { - this->finished_.MarkFinished(finish_st); + this->finished_.MarkFinished(finish_st); } std::shared_ptr MakeGeneratorReader( From d08b5fa10f49dc951b24c7e50dcf18cddeee04de Mon Sep 17 00:00:00 2001 From: Sanjiban Sengupta Date: Wed, 30 Mar 2022 04:40:39 +0530 Subject: [PATCH 3/3] review: removed comment, modified if checks --- cpp/src/arrow/compute/exec/exec_plan.cc | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index e565b8ea09d..7a1c387c1aa 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -437,19 +437,16 @@ void MapNode::SubmitTask(std::function(ExecBatch)> map_fn, }; status = task(); - if (input_counter_.Increment()) { - this->Finish(status); - } - - // If we get a cancelled status from AddTask it means this node was stopped - // or errored out already so we can just drop the task. - if (!status.ok() && !status.IsCancelled()) { + if (!status.ok()) { if (input_counter_.Cancel()) { this->Finish(status); } inputs_[0]->StopProducing(this); return; } + if (input_counter_.Increment()) { + this->Finish(); + } } void MapNode::Finish(Status finish_st /*= Status::OK()*/) {