Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ PipelineFragmentContext::~PipelineFragmentContext() {
}
}
_tasks.clear();
for (auto& holder : _task_holders) {
if (holder == nullptr) {
break;
}
auto expected = TaskState::VALID;
CHECK(holder->state.compare_exchange_strong(expected, TaskState::INVALID));
}
_task_holders.clear();
for (auto& runtime_states : _task_runtime_states) {
for (auto& runtime_state : runtime_states) {
runtime_state.reset();
Expand Down Expand Up @@ -363,6 +371,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
const auto target_size = request.local_params.size();
_tasks.resize(target_size);
_runtime_filter_states.resize(target_size);
_task_holders.resize(target_size * _pipelines.size());
_task_runtime_states.resize(_pipelines.size());
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
_task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks());
Expand Down Expand Up @@ -453,6 +462,12 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
task_runtime_state->set_task(task.get());
pipeline_id_to_task.insert({pipeline->id(), task.get()});
_tasks[i].emplace_back(std::move(task));
auto holder = std::make_shared<TaskHolder>(_tasks[i].back().get());
_tasks[i].back()->set_holder(holder);
_task_holders[cur_task_id] = holder;
LOG(WARNING) << "========1 " << print_id(_query_ctx->query_id()) << " "
<< _fragment_id;
DCHECK(_task_holders[cur_task_id]);
}
}

Expand Down Expand Up @@ -1677,17 +1692,15 @@ Status PipelineFragmentContext::submit() {
int submit_tasks = 0;
Status st;
auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
for (auto& task : _tasks) {
for (auto& t : task) {
st = scheduler->schedule_task(t.get());
if (!st) {
cancel(Status::InternalError("submit context to executor fail"));
std::lock_guard<std::mutex> l(_task_mutex);
_total_tasks = submit_tasks;
break;
}
submit_tasks++;
}
for (size_t i = 0; i < _total_tasks; i++) {
st = scheduler->schedule_task(_task_holders[i]);
if (!st) {
cancel(Status::InternalError("submit context to executor fail"));
std::lock_guard<std::mutex> l(_task_mutex);
_total_tasks = submit_tasks;
break;
}
submit_tasks++;
}
if (!st.ok()) {
std::lock_guard<std::mutex> l(_task_mutex);
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ class PipelineFragmentContext : public TaskExecutionContext {
OperatorPtr _root_op = nullptr;
// this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines.
std::vector<std::vector<std::unique_ptr<PipelineTask>>> _tasks;
std::vector<TaskHolderSPtr> _task_holders;

// TODO: remove the _sink and _multi_cast_stream_sink_senders to set both
// of it in pipeline task not the fragment_context
Expand Down
29 changes: 26 additions & 3 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ Status PipelineTask::execute(bool* eos) {
}
}

RETURN_IF_ERROR(get_task_queue()->push_back(this));
RETURN_IF_ERROR(get_task_queue()->push_back(_holder));
return Status::OK();
}

Expand Down Expand Up @@ -511,7 +511,7 @@ std::string PipelineTask::debug_string() {
(void*)this, _index, _opened, _eos, _finalized, _dry_run, elapsed,
_wake_up_early.load(),
cur_blocked_dep && !_finalized ? cur_blocked_dep->debug_string() : "NULL",
is_running());
_holder ? false : _holder->state.load() == TaskState::RUNNING);
for (size_t i = 0; i < _operators.size(); i++) {
fmt::format_to(debug_string_buffer, "\n{}",
_opened && !_finalized ? _operators[i]->debug_string(_state, i)
Expand Down Expand Up @@ -554,7 +554,30 @@ std::string PipelineTask::debug_string() {

void PipelineTask::wake_up() {
// call by dependency
static_cast<void>(get_task_queue()->push_back(this));
static_cast<void>(get_task_queue()->push_back(_holder));
}

void PipelineTask::clear_blocking_state() {
_state->get_query_ctx()->get_execution_dependency()->set_always_ready();
// We use a lock to assure all dependencies are not deconstructed here.
std::unique_lock<std::mutex> lc(_dependency_lock);
if (!_finalized) {
_execution_dep->set_always_ready();
for (auto* dep : _filter_dependencies) {
dep->set_always_ready();
}
for (auto& deps : _read_dependencies) {
for (auto* dep : deps) {
dep->set_always_ready();
}
}
for (auto* dep : _write_dependencies) {
dep->set_always_ready();
}
for (auto* dep : _finish_dependencies) {
dep->set_always_ready();
}
}
}

QueryContext* PipelineTask::query_context() {
Expand Down
33 changes: 5 additions & 28 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "pipeline/dependency.h"
#include "pipeline/exec/operator.h"
#include "pipeline/pipeline.h"
#include "pipeline/task_queue.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "vec/core/block.h"
Expand All @@ -41,8 +42,6 @@ class PipelineFragmentContext;

namespace doris::pipeline {

class MultiCoreTaskQueue;
class PriorityTaskQueue;
class Dependency;

class PipelineTask {
Expand Down Expand Up @@ -137,28 +136,7 @@ class PipelineTask {

void set_wake_up_early() { _wake_up_early = true; }

void clear_blocking_state() {
_state->get_query_ctx()->get_execution_dependency()->set_always_ready();
// We use a lock to assure all dependencies are not deconstructed here.
std::unique_lock<std::mutex> lc(_dependency_lock);
if (!_finalized) {
_execution_dep->set_always_ready();
for (auto* dep : _filter_dependencies) {
dep->set_always_ready();
}
for (auto& deps : _read_dependencies) {
for (auto* dep : deps) {
dep->set_always_ready();
}
}
for (auto* dep : _write_dependencies) {
dep->set_always_ready();
}
for (auto* dep : _finish_dependencies) {
dep->set_always_ready();
}
}
}
void clear_blocking_state();

void set_task_queue(MultiCoreTaskQueue* task_queue) { _task_queue = task_queue; }
MultiCoreTaskQueue* get_task_queue() { return _task_queue; }
Expand Down Expand Up @@ -194,9 +172,6 @@ class PipelineTask {

void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }

bool is_running() { return _running.load(); }
void set_running(bool running) { _running = running; }

bool is_exceed_debug_timeout() {
if (_has_exceed_timeout) {
return true;
Expand Down Expand Up @@ -239,6 +214,8 @@ class PipelineTask {

bool wake_up_early() const { return _wake_up_early; }

void set_holder(std::shared_ptr<TaskHolder> holder) { _holder = holder; }

private:
friend class RuntimeFilterDependency;
bool _is_blocked();
Expand Down Expand Up @@ -317,9 +294,9 @@ class PipelineTask {
std::atomic<bool> _finalized = false;
std::mutex _dependency_lock;

std::atomic<bool> _running = false;
std::atomic<bool> _eos = false;
std::atomic<bool> _wake_up_early = false;
std::shared_ptr<TaskHolder> _holder;
};

} // namespace doris::pipeline
58 changes: 34 additions & 24 deletions be/src/pipeline/task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
namespace doris::pipeline {
#include "common/compile_check_begin.h"

PipelineTask* SubTaskQueue::try_take(bool is_steal) {
TaskHolder::TaskHolder(PipelineTask* task_) : task(task_), state(TaskState::VALID) {}

TaskHolderSPtr SubTaskQueue::try_take(bool is_steal) {
if (_queue.empty()) {
return nullptr;
}
Expand All @@ -54,7 +56,7 @@ void PriorityTaskQueue::close() {
_wait_task.notify_all();
}

PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
TaskHolderSPtr PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
if (_total_task_size == 0 || _closed) {
return nullptr;
}
Expand All @@ -75,7 +77,7 @@ PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) {

auto task = _sub_queues[level].try_take(is_steal);
if (task) {
task->update_queue_level(level);
task->task->update_queue_level(level);
_total_task_size--;
}
return task;
Expand All @@ -90,13 +92,13 @@ int PriorityTaskQueue::_compute_level(uint64_t runtime) {
return SUB_QUEUE_LEVEL - 1;
}

PipelineTask* PriorityTaskQueue::try_take(bool is_steal) {
TaskHolderSPtr PriorityTaskQueue::try_take(bool is_steal) {
// TODO other efficient lock? e.g. if get lock fail, return null_ptr
std::unique_lock<std::mutex> lock(_work_size_mutex);
return _try_take_unprotected(is_steal);
}

PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) {
TaskHolderSPtr PriorityTaskQueue::take(uint32_t timeout_ms) {
std::unique_lock<std::mutex> lock(_work_size_mutex);
auto task = _try_take_unprotected(false);
if (task) {
Expand All @@ -111,11 +113,11 @@ PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) {
}
}

Status PriorityTaskQueue::push(PipelineTask* task) {
Status PriorityTaskQueue::push(TaskHolderSPtr task) {
if (_closed) {
return Status::InternalError("WorkTaskQueue closed");
}
auto level = _compute_level(task->get_runtime_ns());
auto level = _compute_level(task->task->get_runtime_ns());
std::unique_lock<std::mutex> lock(_work_size_mutex);

// update empty queue's runtime, to avoid too high priority
Expand All @@ -132,8 +134,12 @@ Status PriorityTaskQueue::push(PipelineTask* task) {

MultiCoreTaskQueue::~MultiCoreTaskQueue() = default;

static constexpr int NUM_EAGER_QUEUES = 1;
MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size)
: _prio_task_queues(core_size), _closed(false), _core_size(core_size) {}
: _prio_task_queues(core_size + NUM_EAGER_QUEUES),
_closed(false),
_core_size(core_size),
_urgent_queue_idx(core_size) {}

void MultiCoreTaskQueue::close() {
if (_closed) {
Expand All @@ -145,34 +151,36 @@ void MultiCoreTaskQueue::close() {
[](auto& prio_task_queue) { prio_task_queue.close(); });
}

PipelineTask* MultiCoreTaskQueue::take(int core_id) {
PipelineTask* task = nullptr;
TaskHolderSPtr MultiCoreTaskQueue::take(int core_id) {
TaskHolderSPtr task = nullptr;
while (!_closed) {
DCHECK(_prio_task_queues.size() > core_id)
<< " list size: " << _prio_task_queues.size() << " core_id: " << core_id
<< " _core_size: " << _core_size << " _next_core: " << _next_core.load();
task = _prio_task_queues[core_id].try_take(false);
if (task) {
task->set_core_id(core_id);
task->task->set_core_id(core_id);
break;
}
task = _steal_take(core_id);
if (task) {
break;
if (core_id != _urgent_queue_idx) {
task = _steal_take(core_id);
if (task) {
break;
}
}
task = _prio_task_queues[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */);
if (task) {
task->set_core_id(core_id);
task->task->set_core_id(core_id);
break;
}
}
if (task) {
task->pop_out_runnable_queue();
task->task->pop_out_runnable_queue();
}
return task;
}

PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) {
TaskHolderSPtr MultiCoreTaskQueue::_steal_take(int core_id) {
DCHECK(core_id < _core_size);
int next_id = core_id;
for (int i = 1; i < _core_size; ++i) {
Expand All @@ -183,25 +191,27 @@ PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) {
DCHECK(next_id < _core_size);
auto task = _prio_task_queues[next_id].try_take(true);
if (task) {
task->set_core_id(next_id);
task->task->set_core_id(next_id);
return task;
}
}
return nullptr;
}

Status MultiCoreTaskQueue::push_back(PipelineTask* task) {
int core_id = task->get_previous_core_id();
Status MultiCoreTaskQueue::push_back(TaskHolderSPtr task) {
int core_id = task->task->get_previous_core_id();
if (core_id < 0) {
core_id = _next_core.fetch_add(1) % _core_size;
}
return push_back(task, core_id);
}

Status MultiCoreTaskQueue::push_back(PipelineTask* task, int core_id) {
DCHECK(core_id < _core_size);
task->put_in_runnable_queue();
return _prio_task_queues[core_id].push(task);
Status MultiCoreTaskQueue::push_back(TaskHolderSPtr task, int core_id) {
DCHECK(core_id < _core_size || task->task->query_context()->is_cancelled());
task->task->put_in_runnable_queue();
return _prio_task_queues[task->task->query_context()->is_cancelled() ? _urgent_queue_idx
: core_id]
.push(task);
}

void MultiCoreTaskQueue::update_statistics(PipelineTask* task, int64_t time_spent) {
Expand Down
Loading
Loading