diff --git a/be/src/util/blocking_priority_queue.hpp b/be/src/util/blocking_priority_queue.hpp index 92d68751353f68..798277230bd262 100644 --- a/be/src/util/blocking_priority_queue.hpp +++ b/be/src/util/blocking_priority_queue.hpp @@ -42,7 +42,9 @@ class BlockingPriorityQueue { _max_element(max_elements), _upgrade_counter(0), _total_get_wait_time(0), - _total_put_wait_time(0) {} + _total_put_wait_time(0), + _get_waiting(0), + _put_waiting(0) {} // Get an element from the queue, waiting indefinitely (or until timeout) for one to become available. // Returns false if we were shut down prior to getting the element, and there @@ -55,17 +57,29 @@ class BlockingPriorityQueue { bool wait_successful = false; #if !defined(USE_BTHREAD_SCANNER) if (timeout_ms > 0) { - wait_successful = _get_cv.wait_for(unique_lock, std::chrono::milliseconds(timeout_ms), - [this] { return _shutdown || !_queue.empty(); }); + while (!(_shutdown || !_queue.empty())) { + ++_get_waiting; + if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(timeout_ms)) == + std::cv_status::timeout) { + // timeout + wait_successful = _shutdown || !_queue.empty(); + break; + } + } } else { - _get_cv.wait(unique_lock, [this] { return _shutdown || !_queue.empty(); }); + while (!(_shutdown || !_queue.empty())) { + ++_get_waiting; + _get_cv.wait(unique_lock); + } wait_successful = true; } #else if (timeout_ms > 0) { wait_successful = true; while (!(_shutdown || !_queue.empty())) { - if (_get_cv.wait_for(unique_lock, timeout_ms * 1000) != 0) { + ++_get_waiting; + if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(timeout_ms)) == + std::cv_status::timeout) { // timeout wait_successful = _shutdown || !_queue.empty(); break; @@ -95,7 +109,11 @@ class BlockingPriorityQueue { *out = _queue.top(); _queue.pop(); ++_upgrade_counter; - _put_cv.notify_one(); + if (_put_waiting > 0) { + --_put_waiting; + unique_lock.unlock(); + _put_cv.notify_one(); + } return true; } else { assert(_shutdown); @@ -131,7 +149,11 @@ class BlockingPriorityQueue { _queue.pop(); ++_upgrade_counter; _total_get_wait_time += timer.elapsed_time(); - _put_cv.notify_one(); + if (_put_waiting > 0) { + --_put_waiting; + unique_lock.unlock(); + _put_cv.notify_one(); + } return true; } @@ -145,6 +167,7 @@ class BlockingPriorityQueue { timer.start(); std::unique_lock unique_lock(_lock); while (!(_shutdown || _queue.size() < _max_element)) { + ++_put_waiting; _put_cv.wait(unique_lock); } _total_put_wait_time += timer.elapsed_time(); @@ -154,7 +177,11 @@ class BlockingPriorityQueue { } _queue.push(val); - _get_cv.notify_one(); + if (_get_waiting > 0) { + --_get_waiting; + unique_lock.unlock(); + _get_cv.notify_one(); + } return true; } @@ -163,8 +190,11 @@ class BlockingPriorityQueue { std::unique_lock unique_lock(_lock); if (_queue.size() < _max_element && !_shutdown) { _queue.push(val); - unique_lock.unlock(); - _get_cv.notify_one(); + if (_get_waiting > 0) { + --_get_waiting; + unique_lock.unlock(); + _get_cv.notify_one(); + } return true; } return false; @@ -204,6 +234,8 @@ class BlockingPriorityQueue { int _upgrade_counter; std::atomic _total_get_wait_time; std::atomic _total_put_wait_time; + size_t _get_waiting; + size_t _put_waiting; }; -} // namespace doris +} // namespace doris \ No newline at end of file diff --git a/be/src/util/blocking_queue.hpp b/be/src/util/blocking_queue.hpp index 0b66220a371114..6f325b7d6ab43d 100644 --- a/be/src/util/blocking_queue.hpp +++ b/be/src/util/blocking_queue.hpp @@ -41,7 +41,9 @@ class BlockingQueue { : _shutdown(false), _max_elements(max_elements), _total_get_wait_time(0), - _total_put_wait_time(0) {} + _total_put_wait_time(0), + _get_waiting(0), + _put_waiting(0) {} // Get an element from the queue, waiting indefinitely for one to become available. // Returns false if we were shut down prior to getting the element, and there @@ -50,13 +52,20 @@ class BlockingQueue { MonotonicStopWatch timer; timer.start(); std::unique_lock unique_lock(_lock); - _get_cv.wait(unique_lock, [this] { return _shutdown || !_list.empty(); }); + while (!(_shutdown || !_list.empty())) { + ++_get_waiting; + _get_cv.wait(unique_lock); + } _total_get_wait_time += timer.elapsed_time(); if (!_list.empty()) { *out = _list.front(); _list.pop_front(); - _put_cv.notify_one(); + if (_put_waiting > 0) { + --_put_waiting; + unique_lock.unlock(); + _put_cv.notify_one(); + } return true; } else { assert(_shutdown); @@ -70,7 +79,10 @@ class BlockingQueue { MonotonicStopWatch timer; timer.start(); std::unique_lock unique_lock(_lock); - _put_cv.wait(unique_lock, [this] { return _shutdown || _list.size() < _max_elements; }); + while (!(_shutdown || _list.size() < _max_elements)) { + ++_put_waiting; + _put_cv.wait(unique_lock); + } _total_put_wait_time += timer.elapsed_time(); if (_shutdown) { @@ -78,7 +90,11 @@ class BlockingQueue { } _list.push_back(val); - _get_cv.notify_one(); + if (_get_waiting > 0) { + --_get_waiting; + unique_lock.unlock(); + _get_cv.notify_one(); + } return true; } @@ -98,7 +114,11 @@ class BlockingQueue { } _list.push_back(val); - _get_cv.notify_one(); + if (_get_waiting > 0) { + --_get_waiting; + unique_lock.unlock(); + _get_cv.notify_one(); + } return true; } @@ -136,6 +156,8 @@ class BlockingQueue { std::list _list; std::atomic _total_get_wait_time; std::atomic _total_put_wait_time; + size_t _get_waiting; + size_t _put_waiting; }; } // namespace doris diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 6c40ccc242e135..1cd708cf273184 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -97,7 +97,7 @@ void ScannerScheduler::stop() { _scheduler_pool->wait(); _local_scan_thread_pool->join(); - _remote_scan_thread_pool->wait(); + _remote_scan_thread_pool->join(); _limited_scan_thread_pool->wait(); _group_local_scan_thread_pool->wait(); @@ -123,15 +123,9 @@ Status ScannerScheduler::init(ExecEnv* env) { config::doris_scanner_thread_pool_queue_size, "local_scan"); // 3. remote scan thread pool - _remote_thread_pool_max_size = config::doris_max_remote_scanner_thread_pool_thread_num != -1 - ? config::doris_max_remote_scanner_thread_pool_thread_num - : std::max(512, CpuInfo::num_cores() * 10); - static_cast( - ThreadPoolBuilder("RemoteScanThreadPool") - .set_min_threads(config::doris_scanner_thread_pool_thread_num) // 48 default - .set_max_threads(_remote_thread_pool_max_size) - .set_max_queue_size(config::doris_scanner_thread_pool_queue_size) - .build(&_remote_scan_thread_pool)); + _remote_scan_thread_pool = std::make_unique( + config::doris_remote_scanner_thread_pool_thread_num, + config::doris_remote_scanner_thread_pool_queue_size, "RemoteScanThreadPool"); // 4. limited scan thread pool static_cast(ThreadPoolBuilder("LimitedScanThreadPool") @@ -269,9 +263,12 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { ret = _local_scan_thread_pool->offer(task); } } else { - ret = _remote_scan_thread_pool->submit_func([this, scanner = *iter, ctx] { + PriorityThreadPool::Task task; + task.work_function = [this, scanner = *iter, ctx] { this->_scanner_scan(this, ctx, scanner); - }); + }; + task.priority = nice; + ret = _remote_scan_thread_pool->offer(task); } if (ret) { this_run.erase(iter++); @@ -482,7 +479,7 @@ void ScannerScheduler::_register_metrics() { REGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size, [this]() { return _remote_scan_thread_pool->get_queue_size(); }); REGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num, - [this]() { return _remote_scan_thread_pool->num_threads(); }); + [this]() { return _remote_scan_thread_pool->get_active_threads(); }); REGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size, [this]() { return _limited_scan_thread_pool->get_queue_size(); }); REGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num, @@ -503,5 +500,4 @@ void ScannerScheduler::_deregister_metrics() { DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_queue_size); DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_thread_num); } - } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 9825085fb1c03f..27befc4328f94f 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -111,7 +111,7 @@ class ScannerScheduler { // _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs, etc.) // _limited_scan_thread_pool is a special pool for queries with resource limit std::unique_ptr _local_scan_thread_pool; - std::unique_ptr _remote_scan_thread_pool; + std::unique_ptr _remote_scan_thread_pool; std::unique_ptr _limited_scan_thread_pool; std::unique_ptr _task_group_local_scan_queue;