From 047ba7aef6f9bf8256dbf7d2f1b642d5906c89c8 Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Sat, 18 Nov 2023 02:10:28 +0800 Subject: [PATCH] Revert "[Bug](pipeline) try fix the exchange sink buffer result error (#27052)" This reverts commit 7ef1f7e511c74d0d72ddeb0bd854ae4dd2d754e0. --- be/src/pipeline/task_scheduler.cpp | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index aa3891a5a2c77c..d19787825546de 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -333,28 +333,23 @@ void TaskScheduler::_do_work(size_t index) { void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state, Status exec_status) { auto status = task->try_close(exec_status); - auto cancel = [&]() { + if (!status.ok() && state != PipelineTaskState::CANCELED) { + // Call `close` if `try_close` failed to make sure allocated resources are released + static_cast(task->close(exec_status)); task->query_context()->cancel(true, status.to_string(), Status::Cancelled(status.to_string())); state = PipelineTaskState::CANCELED; - }; - - auto try_close_failed = !status.ok() && state != PipelineTaskState::CANCELED; - if (try_close_failed) { - cancel(); - // Call `close` if `try_close` failed to make sure allocated resources are released - static_cast(task->close(exec_status)); - } else if (!task->is_pending_finish()) { - status = task->close(exec_status); - if (!status.ok() && state != PipelineTaskState::CANCELED) { - cancel(); - } - } - - if (task->is_pending_finish()) { + } else if (task->is_pending_finish()) { task->set_state(PipelineTaskState::PENDING_FINISH); static_cast(_blocked_task_scheduler->add_blocked_task(task)); return; + } else { + status = task->close(exec_status); + if (!status.ok() && state != PipelineTaskState::CANCELED) { + task->query_context()->cancel(true, status.to_string(), + Status::Cancelled(status.to_string())); + state = PipelineTaskState::CANCELED; + } } task->set_state(state); task->set_close_pipeline_time();