Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ void PipelineTask::_fresh_profile_counter() {
COUNTER_SET(_schedule_counts, (int64_t)_schedule_time);
COUNTER_SET(_wait_sink_timer, (int64_t)_wait_sink_watcher.elapsed_time());
COUNTER_SET(_wait_worker_timer, (int64_t)_wait_worker_watcher.elapsed_time());
COUNTER_SET(_wait_schedule_timer, (int64_t)_wait_schedule_watcher.elapsed_time());
COUNTER_SET(_begin_execute_timer, _begin_execute_time);
COUNTER_SET(_eos_timer, _eos_time);
COUNTER_SET(_src_pending_finish_over_timer, _src_pending_finish_over_time);
Expand Down Expand Up @@ -116,7 +115,6 @@ void PipelineTask::_init_profile() {
_wait_bf_timer = ADD_TIMER(_task_profile, "WaitBfTime");
_wait_sink_timer = ADD_TIMER(_task_profile, "WaitSinkTime");
_wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime");
_wait_schedule_timer = ADD_TIMER(_task_profile, "WaitScheduleTime");
_block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT);
_block_by_source_counts = ADD_COUNTER(_task_profile, "NumBlockedBySrcTimes", TUnit::UNIT);
_block_by_sink_counts = ADD_COUNTER(_task_profile, "NumBlockedBySinkTimes", TUnit::UNIT);
Expand Down
4 changes: 0 additions & 4 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,6 @@ class PipelineTask {
_wait_worker_watcher.start();
}
void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
void start_schedule_watcher() { _wait_schedule_watcher.start(); }
void stop_schedule_watcher() { _wait_schedule_watcher.stop(); }
PipelineTaskState get_state() { return _cur_state; }
void set_state(PipelineTaskState state);

Expand Down Expand Up @@ -311,8 +309,6 @@ class PipelineTask {
MonotonicStopWatch _wait_worker_watcher;
RuntimeProfile::Counter* _wait_worker_timer;
// TODO we should calculate the time between when really runnable and runnable
MonotonicStopWatch _wait_schedule_watcher;
RuntimeProfile::Counter* _wait_schedule_timer;
RuntimeProfile::Counter* _yield_counts;
RuntimeProfile::Counter* _core_change_times;

Expand Down
31 changes: 12 additions & 19 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ void BlockedTaskScheduler::_schedule() {
_started.store(true);
std::list<PipelineTask*> local_blocked_tasks;
int empty_times = 0;
std::vector<PipelineTask*> ready_tasks;

while (!_shutdown) {
{
Expand All @@ -105,6 +104,7 @@ void BlockedTaskScheduler::_schedule() {
}
}

auto origin_local_block_tasks_size = local_blocked_tasks.size();
auto iter = local_blocked_tasks.begin();
vectorized::VecDateTimeValue now = vectorized::VecDateTimeValue::local_time();
while (iter != local_blocked_tasks.end()) {
Expand All @@ -116,57 +116,52 @@ void BlockedTaskScheduler::_schedule() {
VLOG_DEBUG << "Task pending" << task->debug_string();
iter++;
} else {
_make_task_run(local_blocked_tasks, iter, ready_tasks,
PipelineTaskState::PENDING_FINISH);
_make_task_run(local_blocked_tasks, iter, PipelineTaskState::PENDING_FINISH);
}
} else if (task->query_context()->is_cancelled()) {
_make_task_run(local_blocked_tasks, iter, ready_tasks);
_make_task_run(local_blocked_tasks, iter);
} else if (task->query_context()->is_timeout(now)) {
LOG(WARNING) << "Timeout, query_id=" << print_id(task->query_context()->query_id())
<< ", instance_id=" << print_id(task->instance_id())
<< ", task info: " << task->debug_string();

task->query_context()->cancel(true, "", Status::Cancelled(""));
_make_task_run(local_blocked_tasks, iter, ready_tasks);
_make_task_run(local_blocked_tasks, iter);
} else if (state == PipelineTaskState::BLOCKED_FOR_DEPENDENCY) {
if (task->has_dependency()) {
iter++;
} else {
_make_task_run(local_blocked_tasks, iter, ready_tasks);
_make_task_run(local_blocked_tasks, iter);
}
} else if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) {
if (task->source_can_read()) {
_make_task_run(local_blocked_tasks, iter, ready_tasks);
_make_task_run(local_blocked_tasks, iter);
} else {
iter++;
}
} else if (state == PipelineTaskState::BLOCKED_FOR_RF) {
if (task->runtime_filters_are_ready_or_timeout()) {
_make_task_run(local_blocked_tasks, iter, ready_tasks);
_make_task_run(local_blocked_tasks, iter);
} else {
iter++;
}
} else if (state == PipelineTaskState::BLOCKED_FOR_SINK) {
if (task->sink_can_write()) {
_make_task_run(local_blocked_tasks, iter, ready_tasks);
_make_task_run(local_blocked_tasks, iter);
} else {
iter++;
}
} else {
// TODO: DCHECK the state
_make_task_run(local_blocked_tasks, iter, ready_tasks);
_make_task_run(local_blocked_tasks, iter);
}
}

if (ready_tasks.empty()) {
if (origin_local_block_tasks_size == 0 ||
local_blocked_tasks.size() == origin_local_block_tasks_size) {
empty_times += 1;
} else {
empty_times = 0;
for (auto& task : ready_tasks) {
task->stop_schedule_watcher();
_task_queue->push_back(task);
}
ready_tasks.clear();
}

if (empty_times != 0 && (empty_times & (EMPTY_TIMES_TO_YIELD - 1)) == 0) {
Expand All @@ -186,13 +181,11 @@ void BlockedTaskScheduler::_schedule() {

void BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& local_tasks,
std::list<PipelineTask*>::iterator& task_itr,
std::vector<PipelineTask*>& ready_tasks,
PipelineTaskState t_state) {
auto task = *task_itr;
task->start_schedule_watcher();
task->set_state(t_state);
local_tasks.erase(task_itr++);
ready_tasks.emplace_back(task);
_task_queue->push_back(task);
}

TaskScheduler::~TaskScheduler() {
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/task_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ class BlockedTaskScheduler {
void _schedule();
void _make_task_run(std::list<PipelineTask*>& local_tasks,
std::list<PipelineTask*>::iterator& task_itr,
std::vector<PipelineTask*>& ready_tasks,
PipelineTaskState state = PipelineTaskState::RUNNABLE);
};

Expand Down