Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 10 additions & 40 deletions be/src/util/blocking_priority_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ class BlockingPriorityQueue {
_max_element(max_elements),
_upgrade_counter(0),
_total_get_wait_time(0),
_total_put_wait_time(0),
_get_waiting(0),
_put_waiting(0) {}
_total_put_wait_time(0) {}

// Get an element from the queue, waiting indefinitely (or until timeout) for one to become available.
// Returns false if we were shut down prior to getting the element, and there
Expand All @@ -55,20 +53,10 @@ class BlockingPriorityQueue {
std::unique_lock unique_lock(_lock);
bool wait_successful = false;
if (timeout_ms > 0) {
while (!(_shutdown || !_queue.empty())) {
++_get_waiting;
if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(timeout_ms)) ==
std::cv_status::timeout) {
// timeout
wait_successful = _shutdown || !_queue.empty();
break;
}
}
wait_successful = _get_cv.wait_for(unique_lock, std::chrono::milliseconds(timeout_ms),
[this] { return _shutdown || !_queue.empty(); });
} else {
while (!(_shutdown || !_queue.empty())) {
++_get_waiting;
_get_cv.wait(unique_lock);
}
_get_cv.wait(unique_lock, [this] { return _shutdown || !_queue.empty(); });
wait_successful = true;
}
_total_get_wait_time += timer.elapsed_time();
Expand All @@ -88,11 +76,7 @@ class BlockingPriorityQueue {
*out = _queue.top();
_queue.pop();
++_upgrade_counter;
if (_put_waiting > 0) {
--_put_waiting;
unique_lock.unlock();
_put_cv.notify_one();
}
_put_cv.notify_one();
return true;
} else {
assert(_shutdown);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: use of undeclared identifier 'assert' [clang-diagnostic-error]

                assert(_shutdown);
                ^

Expand Down Expand Up @@ -128,11 +112,7 @@ class BlockingPriorityQueue {
_queue.pop();
++_upgrade_counter;
_total_get_wait_time += timer.elapsed_time();
if (_put_waiting > 0) {
--_put_waiting;
unique_lock.unlock();
_put_cv.notify_one();
}
_put_cv.notify_one();
return true;
}

Expand All @@ -146,7 +126,6 @@ class BlockingPriorityQueue {
timer.start();
std::unique_lock unique_lock(_lock);
while (!(_shutdown || _queue.size() < _max_element)) {
++_put_waiting;
_put_cv.wait(unique_lock);
}
_total_put_wait_time += timer.elapsed_time();
Expand All @@ -156,11 +135,7 @@ class BlockingPriorityQueue {
}

_queue.push(val);
if (_get_waiting > 0) {
--_get_waiting;
unique_lock.unlock();
_get_cv.notify_one();
}
_get_cv.notify_one();
return true;
}

Expand All @@ -169,11 +144,8 @@ class BlockingPriorityQueue {
std::unique_lock unique_lock(_lock);
if (_queue.size() < _max_element && !_shutdown) {
_queue.push(val);
if (_get_waiting > 0) {
--_get_waiting;
unique_lock.unlock();
_get_cv.notify_one();
}
unique_lock.unlock();
_get_cv.notify_one();
return true;
}
return false;
Expand Down Expand Up @@ -213,8 +185,6 @@ class BlockingPriorityQueue {
int _upgrade_counter;
std::atomic<uint64_t> _total_get_wait_time;
std::atomic<uint64_t> _total_put_wait_time;
size_t _get_waiting;
size_t _put_waiting;
};

} // namespace doris
} // namespace doris
34 changes: 6 additions & 28 deletions be/src/util/blocking_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ class BlockingQueue {
: _shutdown(false),
_max_elements(max_elements),
_total_get_wait_time(0),
_total_put_wait_time(0),
_get_waiting(0),
_put_waiting(0) {}
_total_put_wait_time(0) {}

// Get an element from the queue, waiting indefinitely for one to become available.
// Returns false if we were shut down prior to getting the element, and there
Expand All @@ -52,20 +50,13 @@ class BlockingQueue {
MonotonicStopWatch timer;
timer.start();
std::unique_lock<std::mutex> unique_lock(_lock);
while (!(_shutdown || !_list.empty())) {
++_get_waiting;
_get_cv.wait(unique_lock);
}
_get_cv.wait(unique_lock, [this] { return _shutdown || !_list.empty(); });
_total_get_wait_time += timer.elapsed_time();

if (!_list.empty()) {
*out = _list.front();
_list.pop_front();
if (_put_waiting > 0) {
--_put_waiting;
unique_lock.unlock();
_put_cv.notify_one();
}
_put_cv.notify_one();
return true;
} else {
assert(_shutdown);
Expand All @@ -79,22 +70,15 @@ class BlockingQueue {
MonotonicStopWatch timer;
timer.start();
std::unique_lock<std::mutex> unique_lock(_lock);
while (!(_shutdown || _list.size() < _max_elements)) {
++_put_waiting;
_put_cv.wait(unique_lock);
}
_put_cv.wait(unique_lock, [this] { return _shutdown || _list.size() < _max_elements; });
_total_put_wait_time += timer.elapsed_time();

if (_shutdown) {
return false;
}

_list.push_back(val);
if (_get_waiting > 0) {
--_get_waiting;
unique_lock.unlock();
_get_cv.notify_one();
}
_get_cv.notify_one();
return true;
}

Expand All @@ -114,11 +98,7 @@ class BlockingQueue {
}

_list.push_back(val);
if (_get_waiting > 0) {
--_get_waiting;
unique_lock.unlock();
_get_cv.notify_one();
}
_get_cv.notify_one();
return true;
}

Expand Down Expand Up @@ -156,8 +136,6 @@ class BlockingQueue {
std::list<T> _list;
std::atomic<uint64_t> _total_get_wait_time;
std::atomic<uint64_t> _total_put_wait_time;
size_t _get_waiting;
size_t _put_waiting;
};

} // namespace doris
29 changes: 12 additions & 17 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ void ScannerScheduler::stop() {

_scheduler_pool->wait();
_local_scan_thread_pool->join();
_remote_scan_thread_pool->join();
_remote_scan_thread_pool->wait();
_limited_scan_thread_pool->wait();
_group_local_scan_thread_pool->wait();

Expand Down Expand Up @@ -126,9 +126,12 @@ Status ScannerScheduler::init(ExecEnv* env) {
_remote_thread_pool_max_size = config::doris_max_remote_scanner_thread_pool_thread_num != -1
? config::doris_max_remote_scanner_thread_pool_thread_num
: std::max(512, CpuInfo::num_cores() * 10);
_remote_scan_thread_pool = std::make_unique<PriorityThreadPool>(
_remote_thread_pool_max_size, config::doris_remote_scanner_thread_pool_queue_size,
"RemoteScanThreadPool");
static_cast<void>(
ThreadPoolBuilder("RemoteScanThreadPool")
.set_min_threads(config::doris_scanner_thread_pool_thread_num) // 48 default
.set_max_threads(_remote_thread_pool_max_size)
.set_max_queue_size(config::doris_scanner_thread_pool_queue_size)
.build(&_remote_scan_thread_pool));

// 4. limited scan thread pool
static_cast<void>(ThreadPoolBuilder("LimitedScanThreadPool")
Expand Down Expand Up @@ -246,6 +249,7 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) {
(*iter)->start_wait_worker_timer();
TabletStorageType type = (*iter)->get_storage_type();
bool ret = false;
(void) ret;
if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
if (auto* scan_sche = ctx->get_simple_scan_scheduler()) {
auto work_func = [this, scanner = *iter, ctx] {
Expand All @@ -270,19 +274,9 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) {
ret = _local_scan_thread_pool->offer(task);
}
} else {
PriorityThreadPool::Task task;
task.work_function = [this, scanner = *iter, ctx] {
ret = _remote_scan_thread_pool->submit_func([this, scanner = *iter, ctx] {
this->_scanner_scan(this, ctx, scanner);
};
task.priority = nice;
ret = _remote_scan_thread_pool->offer(task);
}
if (ret) {
this_run.erase(iter++);
} else {
ctx->set_status_on_error(
Status::InternalError("failed to submit scanner to scanner pool"));
break;
});
}
}
}
Expand Down Expand Up @@ -450,7 +444,7 @@ void ScannerScheduler::_register_metrics() {
REGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size,
[this]() { return _remote_scan_thread_pool->get_queue_size(); });
REGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num,
[this]() { return _remote_scan_thread_pool->get_active_threads(); });
[this]() { return _remote_scan_thread_pool->num_threads(); });
REGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size,
[this]() { return _limited_scan_thread_pool->get_queue_size(); });
REGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num,
Expand All @@ -471,4 +465,5 @@ void ScannerScheduler::_deregister_metrics() {
DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_queue_size);
DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_thread_num);
}

} // namespace doris::vectorized
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/scanner_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class ScannerScheduler {
// _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs, etc.)
// _limited_scan_thread_pool is a special pool for queries with resource limit
std::unique_ptr<PriorityThreadPool> _local_scan_thread_pool;
std::unique_ptr<PriorityThreadPool> _remote_scan_thread_pool;
std::unique_ptr<ThreadPool> _remote_scan_thread_pool;
std::unique_ptr<ThreadPool> _limited_scan_thread_pool;

std::unique_ptr<taskgroup::ScanTaskTaskGroupQueue> _task_group_local_scan_queue;
Expand Down