From d7bc877888bc0b010f8eb199c7d0e71b88c013ff Mon Sep 17 00:00:00 2001 From: freemandealer Date: Thu, 21 Dec 2023 12:38:03 +0800 Subject: [PATCH] Revert "[Opt](scanner-scheduler) Optimize `BlockingQueue`, `BlockingPriorityQueue` and change remote scan thread pool. (#26784)" This is only a test to see if the commit causing load performance drop This reverts commit 0491437a8692a30c9fe9888551c72ab22fe7101c. --- be/src/util/blocking_priority_queue.hpp | 50 +++++----------------- be/src/util/blocking_queue.hpp | 34 +++------------ be/src/vec/exec/scan/scanner_scheduler.cpp | 8 ++-- be/src/vec/exec/scan/scanner_scheduler.h | 2 +- 4 files changed, 21 insertions(+), 73 deletions(-) diff --git a/be/src/util/blocking_priority_queue.hpp b/be/src/util/blocking_priority_queue.hpp index bfc1c34e8f16d7..7843d23077abe4 100644 --- a/be/src/util/blocking_priority_queue.hpp +++ b/be/src/util/blocking_priority_queue.hpp @@ -41,9 +41,7 @@ class BlockingPriorityQueue { _max_element(max_elements), _upgrade_counter(0), _total_get_wait_time(0), - _total_put_wait_time(0), - _get_waiting(0), - _put_waiting(0) {} + _total_put_wait_time(0) {} // Get an element from the queue, waiting indefinitely (or until timeout) for one to become available. // Returns false if we were shut down prior to getting the element, and there @@ -55,20 +53,10 @@ class BlockingPriorityQueue { std::unique_lock unique_lock(_lock); bool wait_successful = false; if (timeout_ms > 0) { - while (!(_shutdown || !_queue.empty())) { - ++_get_waiting; - if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(timeout_ms)) == - std::cv_status::timeout) { - // timeout - wait_successful = _shutdown || !_queue.empty(); - break; - } - } + wait_successful = _get_cv.wait_for(unique_lock, std::chrono::milliseconds(timeout_ms), + [this] { return _shutdown || !_queue.empty(); }); } else { - while (!(_shutdown || !_queue.empty())) { - ++_get_waiting; - _get_cv.wait(unique_lock); - } + _get_cv.wait(unique_lock, [this] { return _shutdown || !_queue.empty(); }); wait_successful = true; } _total_get_wait_time += timer.elapsed_time(); @@ -88,11 +76,7 @@ class BlockingPriorityQueue { *out = _queue.top(); _queue.pop(); ++_upgrade_counter; - if (_put_waiting > 0) { - --_put_waiting; - unique_lock.unlock(); - _put_cv.notify_one(); - } + _put_cv.notify_one(); return true; } else { assert(_shutdown); @@ -128,11 +112,7 @@ class BlockingPriorityQueue { _queue.pop(); ++_upgrade_counter; _total_get_wait_time += timer.elapsed_time(); - if (_put_waiting > 0) { - --_put_waiting; - unique_lock.unlock(); - _put_cv.notify_one(); - } + _put_cv.notify_one(); return true; } @@ -146,7 +126,6 @@ class BlockingPriorityQueue { timer.start(); std::unique_lock unique_lock(_lock); while (!(_shutdown || _queue.size() < _max_element)) { - ++_put_waiting; _put_cv.wait(unique_lock); } _total_put_wait_time += timer.elapsed_time(); @@ -156,11 +135,7 @@ class BlockingPriorityQueue { } _queue.push(val); - if (_get_waiting > 0) { - --_get_waiting; - unique_lock.unlock(); - _get_cv.notify_one(); - } + _get_cv.notify_one(); return true; } @@ -169,11 +144,8 @@ class BlockingPriorityQueue { std::unique_lock unique_lock(_lock); if (_queue.size() < _max_element && !_shutdown) { _queue.push(val); - if (_get_waiting > 0) { - --_get_waiting; - unique_lock.unlock(); - _get_cv.notify_one(); - } + unique_lock.unlock(); + _get_cv.notify_one(); return true; } return false; @@ -213,8 +185,6 @@ class BlockingPriorityQueue { int _upgrade_counter; std::atomic _total_get_wait_time; std::atomic _total_put_wait_time; - size_t _get_waiting; - size_t _put_waiting; }; -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/util/blocking_queue.hpp b/be/src/util/blocking_queue.hpp index 6f325b7d6ab43d..0b66220a371114 100644 --- a/be/src/util/blocking_queue.hpp +++ b/be/src/util/blocking_queue.hpp @@ -41,9 +41,7 @@ class BlockingQueue { : _shutdown(false), _max_elements(max_elements), _total_get_wait_time(0), - _total_put_wait_time(0), - _get_waiting(0), - _put_waiting(0) {} + _total_put_wait_time(0) {} // Get an element from the queue, waiting indefinitely for one to become available. // Returns false if we were shut down prior to getting the element, and there @@ -52,20 +50,13 @@ class BlockingQueue { MonotonicStopWatch timer; timer.start(); std::unique_lock unique_lock(_lock); - while (!(_shutdown || !_list.empty())) { - ++_get_waiting; - _get_cv.wait(unique_lock); - } + _get_cv.wait(unique_lock, [this] { return _shutdown || !_list.empty(); }); _total_get_wait_time += timer.elapsed_time(); if (!_list.empty()) { *out = _list.front(); _list.pop_front(); - if (_put_waiting > 0) { - --_put_waiting; - unique_lock.unlock(); - _put_cv.notify_one(); - } + _put_cv.notify_one(); return true; } else { assert(_shutdown); @@ -79,10 +70,7 @@ class BlockingQueue { MonotonicStopWatch timer; timer.start(); std::unique_lock unique_lock(_lock); - while (!(_shutdown || _list.size() < _max_elements)) { - ++_put_waiting; - _put_cv.wait(unique_lock); - } + _put_cv.wait(unique_lock, [this] { return _shutdown || _list.size() < _max_elements; }); _total_put_wait_time += timer.elapsed_time(); if (_shutdown) { @@ -90,11 +78,7 @@ class BlockingQueue { } _list.push_back(val); - if (_get_waiting > 0) { - --_get_waiting; - unique_lock.unlock(); - _get_cv.notify_one(); - } + _get_cv.notify_one(); return true; } @@ -114,11 +98,7 @@ class BlockingQueue { } _list.push_back(val); - if (_get_waiting > 0) { - --_get_waiting; - unique_lock.unlock(); - _get_cv.notify_one(); - } + _get_cv.notify_one(); return true; } @@ -156,8 +136,6 @@ class BlockingQueue { std::list _list; std::atomic _total_get_wait_time; std::atomic _total_put_wait_time; - size_t _get_waiting; - size_t _put_waiting; }; } // namespace doris diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 2e4db75a241b2c..c905cc638f009a 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -97,7 +97,7 @@ void ScannerScheduler::stop() { _scheduler_pool->wait(); _local_scan_thread_pool->join(); - _remote_scan_thread_pool->join(); + _remote_scan_thread_pool->wait(); _limited_scan_thread_pool->wait(); _group_local_scan_thread_pool->wait(); @@ -259,8 +259,7 @@ void ScannerScheduler::_schedule_scanners(std::shared_ptr ctx) { work_func, ctx, ctx->get_task_group()->local_scan_task_entity(), nice}; ret = _task_group_local_scan_queue->push_back(scan_task); } else { - PriorityThreadPool::Task task; - task.work_function = [this, scanner = *iter, ctx] { + ret = _remote_scan_thread_pool->submit_func([this, scanner = *iter, ctx] { this->_scanner_scan(this, ctx, scanner); }; task.priority = nice; @@ -451,7 +450,7 @@ void ScannerScheduler::_register_metrics() { REGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size, [this]() { return _remote_scan_thread_pool->get_queue_size(); }); REGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num, - [this]() { return _remote_scan_thread_pool->get_active_threads(); }); + [this]() { return _remote_scan_thread_pool->num_threads(); }); REGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size, [this]() { return _limited_scan_thread_pool->get_queue_size(); }); REGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num, @@ -472,4 +471,5 @@ void ScannerScheduler::_deregister_metrics() { DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_queue_size); DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_thread_num); } + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 91d341613dfce1..60d3c50bdc8a06 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -112,7 +112,7 @@ class ScannerScheduler { // _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs, etc.) // _limited_scan_thread_pool is a special pool for queries with resource limit std::unique_ptr _local_scan_thread_pool; - std::unique_ptr _remote_scan_thread_pool; + std::unique_ptr _remote_scan_thread_pool; std::unique_ptr _limited_scan_thread_pool; std::unique_ptr _task_group_local_scan_queue;