Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions be/src/pipeline/task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,37 +130,46 @@ Status PriorityTaskQueue::push(PipelineTask* task) {
return Status::OK();
}

int PriorityTaskQueue::task_size() {
std::unique_lock<std::mutex> lock(_work_size_mutex);
return _total_task_size;
}

MultiCoreTaskQueue::~MultiCoreTaskQueue() = default;

MultiCoreTaskQueue::MultiCoreTaskQueue(size_t core_size) : TaskQueue(core_size), _closed(false) {
_prio_task_queue_list.reset(new PriorityTaskQueue[core_size]);
MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size) : TaskQueue(core_size), _closed(false) {
_prio_task_queue_list =
std::make_shared<std::vector<std::unique_ptr<PriorityTaskQueue>>>(core_size);
for (int i = 0; i < core_size; i++) {
(*_prio_task_queue_list)[i] = std::make_unique<PriorityTaskQueue>();
}
}

void MultiCoreTaskQueue::close() {
if (_closed) {
return;
}
_closed = true;
for (int i = 0; i < _core_size; ++i) {
_prio_task_queue_list[i].close();
(*_prio_task_queue_list)[i]->close();
}
std::atomic_store(&_prio_task_queue_list,
std::shared_ptr<std::vector<std::unique_ptr<PriorityTaskQueue>>>(nullptr));
}

PipelineTask* MultiCoreTaskQueue::take(size_t core_id) {
PipelineTask* MultiCoreTaskQueue::take(int core_id) {
PipelineTask* task = nullptr;
auto prio_task_queue_list =
std::atomic_load_explicit(&_prio_task_queue_list, std::memory_order_relaxed);
while (!_closed) {
task = _prio_task_queue_list[core_id].try_take(false);
DCHECK(prio_task_queue_list->size() > core_id)
<< " list size: " << prio_task_queue_list->size() << " core_id: " << core_id
<< " _core_size: " << _core_size << " _next_core: " << _next_core.load();
task = (*prio_task_queue_list)[core_id]->try_take(false);
if (task) {
task->set_core_id(core_id);
break;
}
task = _steal_take(core_id);
task = _steal_take(core_id, *prio_task_queue_list);
if (task) {
break;
}
task = _prio_task_queue_list[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */);
task = (*prio_task_queue_list)[core_id]->take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */);
if (task) {
task->set_core_id(core_id);
break;
Expand All @@ -172,16 +181,17 @@ PipelineTask* MultiCoreTaskQueue::take(size_t core_id) {
return task;
}

PipelineTask* MultiCoreTaskQueue::_steal_take(size_t core_id) {
PipelineTask* MultiCoreTaskQueue::_steal_take(
int core_id, std::vector<std::unique_ptr<PriorityTaskQueue>>& prio_task_queue_list) {
DCHECK(core_id < _core_size);
size_t next_id = core_id;
for (size_t i = 1; i < _core_size; ++i) {
int next_id = core_id;
for (int i = 1; i < _core_size; ++i) {
++next_id;
if (next_id == _core_size) {
next_id = 0;
}
DCHECK(next_id < _core_size);
auto task = _prio_task_queue_list[next_id].try_take(true);
auto task = prio_task_queue_list[next_id]->try_take(true);
if (task) {
task->set_core_id(next_id);
return task;
Expand All @@ -198,10 +208,12 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task) {
return push_back(task, core_id);
}

Status MultiCoreTaskQueue::push_back(PipelineTask* task, size_t core_id) {
Status MultiCoreTaskQueue::push_back(PipelineTask* task, int core_id) {
DCHECK(core_id < _core_size);
task->put_in_runnable_queue();
return _prio_task_queue_list[core_id].push(task);
auto prio_task_queue_list =
std::atomic_load_explicit(&_prio_task_queue_list, std::memory_order_relaxed);
return (*prio_task_queue_list)[core_id]->push(task);
}

} // namespace pipeline
Expand Down
30 changes: 15 additions & 15 deletions be/src/pipeline/task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,25 @@ namespace pipeline {

class TaskQueue {
public:
TaskQueue(size_t core_size) : _core_size(core_size) {}
TaskQueue(int core_size) : _core_size(core_size) {}
virtual ~TaskQueue();
virtual void close() = 0;
// Get the task by core id.
// TODO: To think the logic is useful?
virtual PipelineTask* take(size_t core_id) = 0;
virtual PipelineTask* take(int core_id) = 0;

// push from scheduler
virtual Status push_back(PipelineTask* task) = 0;

// push from worker
virtual Status push_back(PipelineTask* task, size_t core_id) = 0;
virtual Status push_back(PipelineTask* task, int core_id) = 0;

virtual void update_statistics(PipelineTask* task, int64_t time_spent) {}

int cores() const { return _core_size; }

protected:
size_t _core_size;
int _core_size;
static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100;
};

Expand Down Expand Up @@ -105,8 +105,6 @@ class PriorityTaskQueue {
_sub_queues[level].inc_runtime(runtime);
}

int task_size();

private:
PipelineTask* _try_take_unprotected(bool is_steal);
static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 2;
Expand All @@ -130,32 +128,34 @@ class PriorityTaskQueue {
// Need consider NUMA architecture
class MultiCoreTaskQueue : public TaskQueue {
public:
explicit MultiCoreTaskQueue(size_t core_size);
explicit MultiCoreTaskQueue(int core_size);

~MultiCoreTaskQueue() override;

void close() override;

// Get the task by core id.
// TODO: To think the logic is useful?
PipelineTask* take(size_t core_id) override;
PipelineTask* take(int core_id) override;

// TODO combine these methods to `push_back(task, core_id = -1)`
Status push_back(PipelineTask* task) override;

Status push_back(PipelineTask* task, size_t core_id) override;
Status push_back(PipelineTask* task, int core_id) override;

void update_statistics(PipelineTask* task, int64_t time_spent) override {
task->inc_runtime_ns(time_spent);
_prio_task_queue_list[task->get_core_id()].inc_sub_queue_runtime(task->get_queue_level(),
time_spent);
auto prio_task_queue_list =
std::atomic_load_explicit(&_prio_task_queue_list, std::memory_order_relaxed);
(*prio_task_queue_list)[task->get_core_id()]->inc_sub_queue_runtime(task->get_queue_level(),
time_spent);
}

private:
PipelineTask* _steal_take(size_t core_id);
PipelineTask* _steal_take(
int core_id, std::vector<std::unique_ptr<PriorityTaskQueue>>& prio_task_queue_list);

std::unique_ptr<PriorityTaskQueue[]> _prio_task_queue_list;
std::atomic<size_t> _next_core = 0;
std::shared_ptr<std::vector<std::unique_ptr<PriorityTaskQueue>>> _prio_task_queue_list;
std::atomic<int> _next_core = 0;
std::atomic<bool> _closed;
};

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,13 @@ TaskScheduler::~TaskScheduler() {

Status TaskScheduler::start() {
int cores = _task_queue->cores();
// Must be mutil number of cpu cores
RETURN_IF_ERROR(ThreadPoolBuilder(_name)
.set_min_threads(cores)
.set_max_threads(cores)
.set_max_queue_size(0)
.set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
.build(&_fix_thread_pool));
LOG_INFO("TaskScheduler set cores").tag("size", cores);
_markers.reserve(cores);
for (size_t i = 0; i < cores; ++i) {
_markers.push_back(std::make_unique<std::atomic<bool>>(true));
Expand Down