diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index 617cd7a78d110a..293769162f6aa3 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -130,37 +130,46 @@ Status PriorityTaskQueue::push(PipelineTask* task) { return Status::OK(); } -int PriorityTaskQueue::task_size() { - std::unique_lock lock(_work_size_mutex); - return _total_task_size; -} - MultiCoreTaskQueue::~MultiCoreTaskQueue() = default; -MultiCoreTaskQueue::MultiCoreTaskQueue(size_t core_size) : TaskQueue(core_size), _closed(false) { - _prio_task_queue_list.reset(new PriorityTaskQueue[core_size]); +MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size) : TaskQueue(core_size), _closed(false) { + _prio_task_queue_list = + std::make_shared>>(core_size); + for (int i = 0; i < core_size; i++) { + (*_prio_task_queue_list)[i] = std::make_unique(); + } } void MultiCoreTaskQueue::close() { + if (_closed) { + return; + } _closed = true; for (int i = 0; i < _core_size; ++i) { - _prio_task_queue_list[i].close(); + (*_prio_task_queue_list)[i]->close(); } + std::atomic_store(&_prio_task_queue_list, + std::shared_ptr>>(nullptr)); } -PipelineTask* MultiCoreTaskQueue::take(size_t core_id) { +PipelineTask* MultiCoreTaskQueue::take(int core_id) { PipelineTask* task = nullptr; + auto prio_task_queue_list = + std::atomic_load_explicit(&_prio_task_queue_list, std::memory_order_relaxed); while (!_closed) { - task = _prio_task_queue_list[core_id].try_take(false); + DCHECK(prio_task_queue_list->size() > core_id) + << " list size: " << prio_task_queue_list->size() << " core_id: " << core_id + << " _core_size: " << _core_size << " _next_core: " << _next_core.load(); + task = (*prio_task_queue_list)[core_id]->try_take(false); if (task) { task->set_core_id(core_id); break; } - task = _steal_take(core_id); + task = _steal_take(core_id, *prio_task_queue_list); if (task) { break; } - task = _prio_task_queue_list[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */); + task = (*prio_task_queue_list)[core_id]->take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */); if (task) { task->set_core_id(core_id); break; @@ -172,16 +181,17 @@ PipelineTask* MultiCoreTaskQueue::take(size_t core_id) { return task; } -PipelineTask* MultiCoreTaskQueue::_steal_take(size_t core_id) { +PipelineTask* MultiCoreTaskQueue::_steal_take( + int core_id, std::vector>& prio_task_queue_list) { DCHECK(core_id < _core_size); - size_t next_id = core_id; - for (size_t i = 1; i < _core_size; ++i) { + int next_id = core_id; + for (int i = 1; i < _core_size; ++i) { ++next_id; if (next_id == _core_size) { next_id = 0; } DCHECK(next_id < _core_size); - auto task = _prio_task_queue_list[next_id].try_take(true); + auto task = prio_task_queue_list[next_id]->try_take(true); if (task) { task->set_core_id(next_id); return task; @@ -198,10 +208,12 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task) { return push_back(task, core_id); } -Status MultiCoreTaskQueue::push_back(PipelineTask* task, size_t core_id) { +Status MultiCoreTaskQueue::push_back(PipelineTask* task, int core_id) { DCHECK(core_id < _core_size); task->put_in_runnable_queue(); - return _prio_task_queue_list[core_id].push(task); + auto prio_task_queue_list = + std::atomic_load_explicit(&_prio_task_queue_list, std::memory_order_relaxed); + return (*prio_task_queue_list)[core_id]->push(task); } } // namespace pipeline diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index 02994511019f7d..3ac9de460250d0 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -37,25 +37,25 @@ namespace pipeline { class TaskQueue { public: - TaskQueue(size_t core_size) : _core_size(core_size) {} + TaskQueue(int core_size) : _core_size(core_size) {} virtual ~TaskQueue(); virtual void close() = 0; // Get the task by core id. // TODO: To think the logic is useful? - virtual PipelineTask* take(size_t core_id) = 0; + virtual PipelineTask* take(int core_id) = 0; // push from scheduler virtual Status push_back(PipelineTask* task) = 0; // push from worker - virtual Status push_back(PipelineTask* task, size_t core_id) = 0; + virtual Status push_back(PipelineTask* task, int core_id) = 0; virtual void update_statistics(PipelineTask* task, int64_t time_spent) {} int cores() const { return _core_size; } protected: - size_t _core_size; + int _core_size; static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100; }; @@ -105,8 +105,6 @@ class PriorityTaskQueue { _sub_queues[level].inc_runtime(runtime); } - int task_size(); - private: PipelineTask* _try_take_unprotected(bool is_steal); static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 2; @@ -130,32 +128,34 @@ class PriorityTaskQueue { // Need consider NUMA architecture class MultiCoreTaskQueue : public TaskQueue { public: - explicit MultiCoreTaskQueue(size_t core_size); + explicit MultiCoreTaskQueue(int core_size); ~MultiCoreTaskQueue() override; void close() override; // Get the task by core id. - // TODO: To think the logic is useful? - PipelineTask* take(size_t core_id) override; + PipelineTask* take(int core_id) override; // TODO combine these methods to `push_back(task, core_id = -1)` Status push_back(PipelineTask* task) override; - Status push_back(PipelineTask* task, size_t core_id) override; + Status push_back(PipelineTask* task, int core_id) override; void update_statistics(PipelineTask* task, int64_t time_spent) override { task->inc_runtime_ns(time_spent); - _prio_task_queue_list[task->get_core_id()].inc_sub_queue_runtime(task->get_queue_level(), - time_spent); + auto prio_task_queue_list = + std::atomic_load_explicit(&_prio_task_queue_list, std::memory_order_relaxed); + (*prio_task_queue_list)[task->get_core_id()]->inc_sub_queue_runtime(task->get_queue_level(), + time_spent); } private: - PipelineTask* _steal_take(size_t core_id); + PipelineTask* _steal_take( + int core_id, std::vector>& prio_task_queue_list); - std::unique_ptr _prio_task_queue_list; - std::atomic _next_core = 0; + std::shared_ptr>> _prio_task_queue_list; + std::atomic _next_core = 0; std::atomic _closed; }; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index f2c86168180910..de697469575bc9 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -205,13 +205,13 @@ TaskScheduler::~TaskScheduler() { Status TaskScheduler::start() { int cores = _task_queue->cores(); - // Must be mutil number of cpu cores RETURN_IF_ERROR(ThreadPoolBuilder(_name) .set_min_threads(cores) .set_max_threads(cores) .set_max_queue_size(0) .set_cgroup_cpu_ctl(_cgroup_cpu_ctl) .build(&_fix_thread_pool)); + LOG_INFO("TaskScheduler set cores").tag("size", cores); _markers.reserve(cores); for (size_t i = 0; i < cores; ++i) { _markers.push_back(std::make_unique>(true));