From 592a371179b3444929f2a66ccdb1558eb0d41861 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 18 Dec 2024 21:41:50 +0800 Subject: [PATCH 1/3] [Improvement](scheduler) Use a separate eager queue to execute canceled tasks --- be/src/pipeline/pipeline_fragment_context.cpp | 27 ++++---- be/src/pipeline/pipeline_fragment_context.h | 1 + be/src/pipeline/pipeline_task.cpp | 27 +++++++- be/src/pipeline/pipeline_task.h | 29 ++------- be/src/pipeline/task_queue.cpp | 60 ++++++++++-------- be/src/pipeline/task_queue.h | 45 +++++++++----- be/src/pipeline/task_scheduler.cpp | 61 +++++++++++++------ be/src/pipeline/task_scheduler.h | 2 +- 8 files changed, 156 insertions(+), 96 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 5ae89db55a45ac..f70d3e925cd8f0 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -140,6 +140,11 @@ PipelineFragmentContext::~PipelineFragmentContext() { } } _tasks.clear(); + for (auto& holder : _task_holders) { + auto expected = TaskState::VALID; + CHECK(holder->state.compare_exchange_strong(expected, TaskState::INVALID)); + } + _task_holders.clear(); for (auto& runtime_states : _task_runtime_states) { for (auto& runtime_state : runtime_states) { runtime_state.reset(); @@ -453,6 +458,8 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag task_runtime_state->set_task(task.get()); pipeline_id_to_task.insert({pipeline->id(), task.get()}); _tasks[i].emplace_back(std::move(task)); + _task_holders.emplace_back( + std::shared_ptr(new TaskHolder(_tasks[i].back().get()))); } } @@ -1677,17 +1684,15 @@ Status PipelineFragmentContext::submit() { int submit_tasks = 0; Status st; auto* scheduler = _query_ctx->get_pipe_exec_scheduler(); - for (auto& task : _tasks) { - for (auto& t : task) { - st = scheduler->schedule_task(t.get()); - if (!st) { - cancel(Status::InternalError("submit context to executor fail")); - std::lock_guard l(_task_mutex); - _total_tasks = submit_tasks; - break; - } - submit_tasks++; - } + for (auto& holder : _task_holders) { + st = scheduler->schedule_task(holder); + if (!st) { + cancel(Status::InternalError("submit context to executor fail")); + std::lock_guard l(_task_mutex); + _total_tasks = submit_tasks; + break; + } + submit_tasks++; } if (!st.ok()) { std::lock_guard l(_task_mutex); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 1674afa886d520..59f9bd18af5632 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -227,6 +227,7 @@ class PipelineFragmentContext : public TaskExecutionContext { OperatorPtr _root_op = nullptr; // this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines. std::vector>> _tasks; + std::vector _task_holders; // TODO: remove the _sink and _multi_cast_stream_sink_senders to set both // of it in pipeline task not the fragment_context diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 5ed725010ec364..587dd120cb525f 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -394,7 +394,7 @@ Status PipelineTask::execute(bool* eos) { } } - RETURN_IF_ERROR(get_task_queue()->push_back(this)); + RETURN_IF_ERROR(get_task_queue()->push_back(_holder)); return Status::OK(); } @@ -554,7 +554,30 @@ std::string PipelineTask::debug_string() { void PipelineTask::wake_up() { // call by dependency - static_cast(get_task_queue()->push_back(this)); + static_cast(get_task_queue()->push_back(_holder)); +} + +void PipelineTask::clear_blocking_state() { + _state->get_query_ctx()->get_execution_dependency()->set_always_ready(); + // We use a lock to assure all dependencies are not deconstructed here. + std::unique_lock lc(_dependency_lock); + if (!_finalized) { + _execution_dep->set_always_ready(); + for (auto* dep : _filter_dependencies) { + dep->set_always_ready(); + } + for (auto& deps : _read_dependencies) { + for (auto* dep : deps) { + dep->set_always_ready(); + } + } + for (auto* dep : _write_dependencies) { + dep->set_always_ready(); + } + for (auto* dep : _finish_dependencies) { + dep->set_always_ready(); + } + } } QueryContext* PipelineTask::query_context() { diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 1a31e5954f479c..57d1d61619bbd3 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -27,6 +27,7 @@ #include "pipeline/dependency.h" #include "pipeline/exec/operator.h" #include "pipeline/pipeline.h" +#include "pipeline/task_queue.h" #include "util/runtime_profile.h" #include "util/stopwatch.hpp" #include "vec/core/block.h" @@ -41,8 +42,6 @@ class PipelineFragmentContext; namespace doris::pipeline { -class MultiCoreTaskQueue; -class PriorityTaskQueue; class Dependency; class PipelineTask { @@ -137,28 +136,7 @@ class PipelineTask { void set_wake_up_early() { _wake_up_early = true; } - void clear_blocking_state() { - _state->get_query_ctx()->get_execution_dependency()->set_always_ready(); - // We use a lock to assure all dependencies are not deconstructed here. - std::unique_lock lc(_dependency_lock); - if (!_finalized) { - _execution_dep->set_always_ready(); - for (auto* dep : _filter_dependencies) { - dep->set_always_ready(); - } - for (auto& deps : _read_dependencies) { - for (auto* dep : deps) { - dep->set_always_ready(); - } - } - for (auto* dep : _write_dependencies) { - dep->set_always_ready(); - } - for (auto* dep : _finish_dependencies) { - dep->set_always_ready(); - } - } - } + void clear_blocking_state(); void set_task_queue(MultiCoreTaskQueue* task_queue) { _task_queue = task_queue; } MultiCoreTaskQueue* get_task_queue() { return _task_queue; } @@ -239,6 +217,8 @@ class PipelineTask { bool wake_up_early() const { return _wake_up_early; } + void set_holder(std::shared_ptr holder) { _holder = holder; } + private: friend class RuntimeFilterDependency; bool _is_blocked(); @@ -320,6 +300,7 @@ class PipelineTask { std::atomic _running = false; std::atomic _eos = false; std::atomic _wake_up_early = false; + std::shared_ptr _holder; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index ea812ca9b12dd6..d2830d7fc6b631 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -29,7 +29,11 @@ namespace doris::pipeline { #include "common/compile_check_begin.h" -PipelineTask* SubTaskQueue::try_take(bool is_steal) { +TaskHolder::TaskHolder(PipelineTask* task_) : task(task_), state(TaskState::VALID) { + task_->set_holder(shared_from_this()); +} + +TaskHolderSPtr SubTaskQueue::try_take(bool is_steal) { if (_queue.empty()) { return nullptr; } @@ -54,7 +58,7 @@ void PriorityTaskQueue::close() { _wait_task.notify_all(); } -PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) { +TaskHolderSPtr PriorityTaskQueue::_try_take_unprotected(bool is_steal) { if (_total_task_size == 0 || _closed) { return nullptr; } @@ -75,7 +79,7 @@ PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) { auto task = _sub_queues[level].try_take(is_steal); if (task) { - task->update_queue_level(level); + task->task->update_queue_level(level); _total_task_size--; } return task; @@ -90,13 +94,13 @@ int PriorityTaskQueue::_compute_level(uint64_t runtime) { return SUB_QUEUE_LEVEL - 1; } -PipelineTask* PriorityTaskQueue::try_take(bool is_steal) { +TaskHolderSPtr PriorityTaskQueue::try_take(bool is_steal) { // TODO other efficient lock? e.g. if get lock fail, return null_ptr std::unique_lock lock(_work_size_mutex); return _try_take_unprotected(is_steal); } -PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) { +TaskHolderSPtr PriorityTaskQueue::take(uint32_t timeout_ms) { std::unique_lock lock(_work_size_mutex); auto task = _try_take_unprotected(false); if (task) { @@ -111,11 +115,11 @@ PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) { } } -Status PriorityTaskQueue::push(PipelineTask* task) { +Status PriorityTaskQueue::push(TaskHolderSPtr task) { if (_closed) { return Status::InternalError("WorkTaskQueue closed"); } - auto level = _compute_level(task->get_runtime_ns()); + auto level = _compute_level(task->task->get_runtime_ns()); std::unique_lock lock(_work_size_mutex); // update empty queue's runtime, to avoid too high priority @@ -132,8 +136,12 @@ Status PriorityTaskQueue::push(PipelineTask* task) { MultiCoreTaskQueue::~MultiCoreTaskQueue() = default; +static constexpr int NUM_EAGER_QUEUES = 1; MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size) - : _prio_task_queues(core_size), _closed(false), _core_size(core_size) {} + : _prio_task_queues(core_size + NUM_EAGER_QUEUES), + _closed(false), + _core_size(core_size), + _urgent_queue_idx(core_size) {} void MultiCoreTaskQueue::close() { if (_closed) { @@ -145,34 +153,36 @@ void MultiCoreTaskQueue::close() { [](auto& prio_task_queue) { prio_task_queue.close(); }); } -PipelineTask* MultiCoreTaskQueue::take(int core_id) { - PipelineTask* task = nullptr; +TaskHolderSPtr MultiCoreTaskQueue::take(int core_id) { + TaskHolderSPtr task = nullptr; while (!_closed) { DCHECK(_prio_task_queues.size() > core_id) << " list size: " << _prio_task_queues.size() << " core_id: " << core_id << " _core_size: " << _core_size << " _next_core: " << _next_core.load(); task = _prio_task_queues[core_id].try_take(false); if (task) { - task->set_core_id(core_id); + task->task->set_core_id(core_id); break; } - task = _steal_take(core_id); - if (task) { - break; + if (core_id != _urgent_queue_idx) { + task = _steal_take(core_id); + if (task) { + break; + } } task = _prio_task_queues[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */); if (task) { - task->set_core_id(core_id); + task->task->set_core_id(core_id); break; } } if (task) { - task->pop_out_runnable_queue(); + task->task->pop_out_runnable_queue(); } return task; } -PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) { +TaskHolderSPtr MultiCoreTaskQueue::_steal_take(int core_id) { DCHECK(core_id < _core_size); int next_id = core_id; for (int i = 1; i < _core_size; ++i) { @@ -183,25 +193,27 @@ PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) { DCHECK(next_id < _core_size); auto task = _prio_task_queues[next_id].try_take(true); if (task) { - task->set_core_id(next_id); + task->task->set_core_id(next_id); return task; } } return nullptr; } -Status MultiCoreTaskQueue::push_back(PipelineTask* task) { - int core_id = task->get_previous_core_id(); +Status MultiCoreTaskQueue::push_back(TaskHolderSPtr task) { + int core_id = task->task->get_previous_core_id(); if (core_id < 0) { core_id = _next_core.fetch_add(1) % _core_size; } return push_back(task, core_id); } -Status MultiCoreTaskQueue::push_back(PipelineTask* task, int core_id) { - DCHECK(core_id < _core_size); - task->put_in_runnable_queue(); - return _prio_task_queues[core_id].push(task); +Status MultiCoreTaskQueue::push_back(TaskHolderSPtr task, int core_id) { + DCHECK(core_id < _core_size || task->task->query_context()->is_cancelled()); + task->task->put_in_runnable_queue(); + return _prio_task_queues[task->task->query_context()->is_cancelled() ? _urgent_queue_idx + : core_id] + .push(task); } void MultiCoreTaskQueue::update_statistics(PipelineTask* task, int64_t time_spent) { diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index 1651eb50cac4ab..5da6c6cab80db9 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -28,19 +28,35 @@ #include #include +#include "common/cast_set.h" #include "common/status.h" -#include "pipeline_task.h" namespace doris::pipeline { #include "common/compile_check_begin.h" +class PipelineTask; + +enum class TaskState { + VALID = 0, // Valid task which is not running + RUNNING = 1, // Valid task which is executed by a thread + INVALID = 2, // Invalid task which already de-constructed. +}; + +struct TaskHolder : public std::enable_shared_from_this { + PipelineTask* task; + std::atomic state; + TaskHolder(PipelineTask* task_); +}; + +using TaskHolderSPtr = std::shared_ptr; + class SubTaskQueue { friend class PriorityTaskQueue; public: - void push_back(PipelineTask* task) { _queue.emplace(task); } + void push_back(TaskHolderSPtr task) { _queue.emplace(task); } - PipelineTask* try_take(bool is_steal); + TaskHolderSPtr try_take(bool is_steal); void set_level_factor(double level_factor) { _level_factor = level_factor; } @@ -58,7 +74,7 @@ class SubTaskQueue { bool empty() { return _queue.empty(); } private: - std::queue _queue; + std::queue _queue; // depends on LEVEL_QUEUE_TIME_FACTOR double _level_factor = 1; @@ -72,18 +88,18 @@ class PriorityTaskQueue { void close(); - PipelineTask* try_take(bool is_steal); + TaskHolderSPtr try_take(bool is_steal); - PipelineTask* take(uint32_t timeout_ms = 0); + TaskHolderSPtr take(uint32_t timeout_ms = 0); - Status push(PipelineTask* task); + Status push(TaskHolderSPtr task); void inc_sub_queue_runtime(int level, uint64_t runtime) { _sub_queues[level].inc_runtime(runtime); } private: - PipelineTask* _try_take_unprotected(bool is_steal); + TaskHolderSPtr _try_take_unprotected(bool is_steal); static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 2; static constexpr size_t SUB_QUEUE_LEVEL = 6; SubTaskQueue _sub_queues[SUB_QUEUE_LEVEL]; @@ -112,25 +128,26 @@ class MultiCoreTaskQueue { void close(); // Get the task by core id. - PipelineTask* take(int core_id); + TaskHolderSPtr take(int core_id); // TODO combine these methods to `push_back(task, core_id = -1)` - Status push_back(PipelineTask* task); + Status push_back(TaskHolderSPtr task); - Status push_back(PipelineTask* task, int core_id); + Status push_back(TaskHolderSPtr, int core_id); void update_statistics(PipelineTask* task, int64_t time_spent); - int cores() const { return _core_size; } + int num_queues() const { return cast_set(_prio_task_queues.size()); } private: - PipelineTask* _steal_take(int core_id); + TaskHolderSPtr _steal_take(int core_id); std::vector _prio_task_queues; std::atomic _next_core = 0; std::atomic _closed; - int _core_size; + const int _core_size; + const int _urgent_queue_idx; static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100; }; #include "common/compile_check_end.h" diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 45898e764175b2..2cc3e654a95a84 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -51,33 +51,37 @@ TaskScheduler::~TaskScheduler() { } Status TaskScheduler::start() { - int cores = _task_queue.cores(); + int num_queues = _task_queue.num_queues(); RETURN_IF_ERROR(ThreadPoolBuilder(_name) - .set_min_threads(cores) - .set_max_threads(cores) + .set_min_threads(num_queues) + .set_max_threads(num_queues) .set_max_queue_size(0) .set_cgroup_cpu_ctl(_cgroup_cpu_ctl) .build(&_fix_thread_pool)); - LOG_INFO("TaskScheduler set cores").tag("size", cores); - _markers.resize(cores, true); - for (int i = 0; i < cores; ++i) { + LOG_INFO("TaskScheduler set cores").tag("size", num_queues); + _markers.resize(num_queues, true); + for (int i = 0; i < num_queues; ++i) { RETURN_IF_ERROR(_fix_thread_pool->submit_func([this, i] { _do_work(i); })); } return Status::OK(); } -Status TaskScheduler::schedule_task(PipelineTask* task) { +Status TaskScheduler::schedule_task(TaskHolderSPtr task) { return _task_queue.push_back(task); } // after _close_task, task maybe destructed. -void _close_task(PipelineTask* task, Status exec_status) { +void _close_task(TaskHolderSPtr holder, Status exec_status) { // Has to attach memory tracker here, because the close task will also release some memory. // Should count the memory to the query or the query's memory will not decrease when part of // task finished. + auto* task = holder->task; SCOPED_ATTACH_TASK(task->runtime_state()); if (task->is_finalized()) { - task->set_running(false); + { + auto expected = TaskState::RUNNING; + CHECK(holder->state.compare_exchange_strong(expected, TaskState::VALID)); + } return; } // close_a_pipeline may delete fragment context and will core in some defer @@ -94,21 +98,32 @@ void _close_task(PipelineTask* task, Status exec_status) { } task->finalize(); task->set_running(false); + { + auto expected = TaskState::RUNNING; + CHECK(holder->state.compare_exchange_strong(expected, TaskState::VALID)); + } task->fragment_context()->close_a_pipeline(task->pipeline_id()); } void TaskScheduler::_do_work(int index) { while (_markers[index]) { - auto* task = _task_queue.take(index); - if (!task) { + auto holder = _task_queue.take(index); + if (!holder) { continue; } - if (task->is_running()) { - static_cast(_task_queue.push_back(task, index)); - continue; + { + auto expected = TaskState::VALID; + if (!holder->state.compare_exchange_strong(expected, TaskState::RUNNING)) { + if (expected == TaskState::RUNNING) { + static_cast(_task_queue.push_back(holder, index)); + } else { + DCHECK(expected == TaskState::INVALID); + } + continue; + } } + auto* task = holder->task; task->log_detail_if_need(); - task->set_running(true); task->set_task_queue(&_task_queue); auto* fragment_ctx = task->fragment_context(); bool canceled = fragment_ctx->is_canceled(); @@ -122,7 +137,7 @@ void TaskScheduler::_do_work(int index) { // If pipeline is canceled, it will report after pipeline closed, and will propagate // errors to downstream through exchange. So, here we needn't send_report. // fragment_ctx->send_report(true); - _close_task(task, fragment_ctx->get_query_ctx()->exec_status()); + _close_task(holder, fragment_ctx->get_query_ctx()->exec_status()); continue; } @@ -165,7 +180,7 @@ void TaskScheduler::_do_work(int index) { LOG(WARNING) << fmt::format("Pipeline task failed. query_id: {} reason: {}", print_id(task->query_context()->query_id()), status.to_string()); - _close_task(task, status); + _close_task(holder, status); continue; } fragment_ctx->trigger_report_if_necessary(); @@ -175,15 +190,21 @@ void TaskScheduler::_do_work(int index) { // added to running queue when dependency is ready. if (task->is_pending_finish()) { // Only meet eos, should set task to PENDING_FINISH state - task->set_running(false); + { + auto expected = TaskState::RUNNING; + CHECK(holder->state.compare_exchange_strong(expected, TaskState::VALID)); + } } else { Status exec_status = fragment_ctx->get_query_ctx()->exec_status(); - _close_task(task, exec_status); + _close_task(holder, exec_status); } continue; } - task->set_running(false); + { + auto expected = TaskState::RUNNING; + CHECK(holder->state.compare_exchange_strong(expected, TaskState::VALID)); + } } } diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index 3c1b08063dfa61..84bba2ae101948 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -51,7 +51,7 @@ class TaskScheduler { ~TaskScheduler(); - Status schedule_task(PipelineTask* task); + Status schedule_task(TaskHolderSPtr task); Status start(); From 8c8388fbc64833c7fe264b20c2bf2304663f626e Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 31 Dec 2024 11:21:50 +0800 Subject: [PATCH 2/3] update --- be/src/pipeline/pipeline_fragment_context.cpp | 16 ++++++++++++---- be/src/pipeline/task_queue.cpp | 4 +--- be/src/pipeline/task_queue.h | 2 +- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index f70d3e925cd8f0..ab618c35622c7e 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -141,6 +141,9 @@ PipelineFragmentContext::~PipelineFragmentContext() { } _tasks.clear(); for (auto& holder : _task_holders) { + if (holder == nullptr) { + break; + } auto expected = TaskState::VALID; CHECK(holder->state.compare_exchange_strong(expected, TaskState::INVALID)); } @@ -368,6 +371,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag const auto target_size = request.local_params.size(); _tasks.resize(target_size); _runtime_filter_states.resize(target_size); + _task_holders.resize(target_size * _pipelines.size()); _task_runtime_states.resize(_pipelines.size()); for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { _task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks()); @@ -458,8 +462,12 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag task_runtime_state->set_task(task.get()); pipeline_id_to_task.insert({pipeline->id(), task.get()}); _tasks[i].emplace_back(std::move(task)); - _task_holders.emplace_back( - std::shared_ptr(new TaskHolder(_tasks[i].back().get()))); + auto holder = std::make_shared(_tasks[i].back().get()); + _tasks[i].back()->set_holder(holder); + _task_holders[cur_task_id] = holder; + LOG(WARNING) << "========1 " << print_id(_query_ctx->query_id()) << " " + << _fragment_id; + DCHECK(_task_holders[cur_task_id]); } } @@ -1684,8 +1692,8 @@ Status PipelineFragmentContext::submit() { int submit_tasks = 0; Status st; auto* scheduler = _query_ctx->get_pipe_exec_scheduler(); - for (auto& holder : _task_holders) { - st = scheduler->schedule_task(holder); + for (size_t i = 0; i < _total_tasks; i++) { + st = scheduler->schedule_task(_task_holders[i]); if (!st) { cancel(Status::InternalError("submit context to executor fail")); std::lock_guard l(_task_mutex); diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index d2830d7fc6b631..39367c7f711860 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -29,9 +29,7 @@ namespace doris::pipeline { #include "common/compile_check_begin.h" -TaskHolder::TaskHolder(PipelineTask* task_) : task(task_), state(TaskState::VALID) { - task_->set_holder(shared_from_this()); -} +TaskHolder::TaskHolder(PipelineTask* task_) : task(task_), state(TaskState::VALID) {} TaskHolderSPtr SubTaskQueue::try_take(bool is_steal) { if (_queue.empty()) { diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index 5da6c6cab80db9..9f254ffe69f67f 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -42,7 +42,7 @@ enum class TaskState { INVALID = 2, // Invalid task which already de-constructed. }; -struct TaskHolder : public std::enable_shared_from_this { +struct TaskHolder { PipelineTask* task; std::atomic state; TaskHolder(PipelineTask* task_); From 2f6c7f622d28669979d2a58bcad05339c5890d19 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 31 Dec 2024 11:38:36 +0800 Subject: [PATCH 3/3] udpate --- be/src/pipeline/pipeline_task.cpp | 2 +- be/src/pipeline/pipeline_task.h | 4 ---- be/src/pipeline/task_scheduler.cpp | 1 - 3 files changed, 1 insertion(+), 6 deletions(-) diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 587dd120cb525f..a1361071032cdb 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -511,7 +511,7 @@ std::string PipelineTask::debug_string() { (void*)this, _index, _opened, _eos, _finalized, _dry_run, elapsed, _wake_up_early.load(), cur_blocked_dep && !_finalized ? cur_blocked_dep->debug_string() : "NULL", - is_running()); + _holder ? false : _holder->state.load() == TaskState::RUNNING); for (size_t i = 0; i < _operators.size(); i++) { fmt::format_to(debug_string_buffer, "\n{}", _opened && !_finalized ? _operators[i]->debug_string(_state, i) diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 57d1d61619bbd3..f4f82f5026c6de 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -172,9 +172,6 @@ class PipelineTask { void pop_out_runnable_queue() { _wait_worker_watcher.stop(); } - bool is_running() { return _running.load(); } - void set_running(bool running) { _running = running; } - bool is_exceed_debug_timeout() { if (_has_exceed_timeout) { return true; @@ -297,7 +294,6 @@ class PipelineTask { std::atomic _finalized = false; std::mutex _dependency_lock; - std::atomic _running = false; std::atomic _eos = false; std::atomic _wake_up_early = false; std::shared_ptr _holder; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 2cc3e654a95a84..3449d4790f8d31 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -97,7 +97,6 @@ void _close_task(TaskHolderSPtr holder, Status exec_status) { task->fragment_context()->cancel(status); } task->finalize(); - task->set_running(false); { auto expected = TaskState::RUNNING; CHECK(holder->state.compare_exchange_strong(expected, TaskState::VALID));