Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 6 additions & 27 deletions cpp/src/arrow/compute/exec/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -436,42 +436,21 @@ void MapNode::SubmitTask(std::function<Result<ExecBatch>(ExecBatch)> map_fn,
return Status::OK();
};

if (executor_) {
status = task_group_.AddTask([this, task]() -> Result<Future<>> {
return this->executor_->Submit(this->stop_source_.token(), [this, task]() {
auto status = task();
if (this->input_counter_.Increment()) {
this->Finish(status);
}
return status;
});
});
} else {
status = task();
if (input_counter_.Increment()) {
this->Finish(status);
}
}
// If we get a cancelled status from AddTask it means this node was stopped
// or errored out already so we can just drop the task.
if (!status.ok() && !status.IsCancelled()) {
status = task();
if (!status.ok()) {
if (input_counter_.Cancel()) {
this->Finish(status);
}
inputs_[0]->StopProducing(this);
return;
}
if (input_counter_.Increment()) {
this->Finish();
}
}

void MapNode::Finish(Status finish_st /*= Status::OK()*/) {
if (executor_) {
task_group_.End().AddCallback([this, finish_st](const Status& st) {
Status final_status = finish_st & st;
this->finished_.MarkFinished(final_status);
});
} else {
this->finished_.MarkFinished(finish_st);
}
this->finished_.MarkFinished(finish_st);
}

std::shared_ptr<RecordBatchReader> MakeGeneratorReader(
Expand Down
3 changes: 0 additions & 3 deletions cpp/src/arrow/compute/exec/exec_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,6 @@ class MapNode : public ExecNode {
// Counter for the number of batches received
AtomicCounter input_counter_;

// The task group for the corresponding batches
util::AsyncTaskGroup task_group_;

::arrow::internal::Executor* executor_;

// Variable used to cancel remaining tasks in the executor
Expand Down