diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index d19787825546de..aa3891a5a2c77c 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -333,24 +333,29 @@ void TaskScheduler::_do_work(size_t index) { void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state, Status exec_status) { auto status = task->try_close(exec_status); - if (!status.ok() && state != PipelineTaskState::CANCELED) { - // Call `close` if `try_close` failed to make sure allocated resources are released - static_cast(task->close(exec_status)); + auto cancel = [&]() { task->query_context()->cancel(true, status.to_string(), Status::Cancelled(status.to_string())); state = PipelineTaskState::CANCELED; - } else if (task->is_pending_finish()) { - task->set_state(PipelineTaskState::PENDING_FINISH); - static_cast(_blocked_task_scheduler->add_blocked_task(task)); - return; - } else { + }; + + auto try_close_failed = !status.ok() && state != PipelineTaskState::CANCELED; + if (try_close_failed) { + cancel(); + // Call `close` if `try_close` failed to make sure allocated resources are released + static_cast(task->close(exec_status)); + } else if (!task->is_pending_finish()) { status = task->close(exec_status); if (!status.ok() && state != PipelineTaskState::CANCELED) { - task->query_context()->cancel(true, status.to_string(), - Status::Cancelled(status.to_string())); - state = PipelineTaskState::CANCELED; + cancel(); } } + + if (task->is_pending_finish()) { + task->set_state(PipelineTaskState::PENDING_FINISH); + static_cast(_blocked_task_scheduler->add_blocked_task(task)); + return; + } task->set_state(state); task->set_close_pipeline_time(); task->release_dependency();