diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index c383c6092af..7a1c387c1aa 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -436,42 +436,21 @@ void MapNode::SubmitTask(std::function(ExecBatch)> map_fn, return Status::OK(); }; - if (executor_) { - status = task_group_.AddTask([this, task]() -> Result> { - return this->executor_->Submit(this->stop_source_.token(), [this, task]() { - auto status = task(); - if (this->input_counter_.Increment()) { - this->Finish(status); - } - return status; - }); - }); - } else { - status = task(); - if (input_counter_.Increment()) { - this->Finish(status); - } - } - // If we get a cancelled status from AddTask it means this node was stopped - // or errored out already so we can just drop the task. - if (!status.ok() && !status.IsCancelled()) { + status = task(); + if (!status.ok()) { if (input_counter_.Cancel()) { this->Finish(status); } inputs_[0]->StopProducing(this); return; } + if (input_counter_.Increment()) { + this->Finish(); + } } void MapNode::Finish(Status finish_st /*= Status::OK()*/) { - if (executor_) { - task_group_.End().AddCallback([this, finish_st](const Status& st) { - Status final_status = finish_st & st; - this->finished_.MarkFinished(final_status); - }); - } else { - this->finished_.MarkFinished(finish_st); - } + this->finished_.MarkFinished(finish_st); } std::shared_ptr MakeGeneratorReader( diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index 85bfd5f756d..4d0fb1f3243 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -300,9 +300,6 @@ class MapNode : public ExecNode { // Counter for the number of batches received AtomicCounter input_counter_; - // The task group for the corresponding batches - util::AsyncTaskGroup task_group_; - ::arrow::internal::Executor* executor_; // Variable used to cancel remaining tasks in the executor