From d8a7e054986d209a4a864d8b9e9501bb8851e8f4 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 31 May 2023 22:04:02 +0800 Subject: [PATCH] Revert "[Enhancement](scanner) allocate blocks in scanner_context on demand and free them on close (#19389)" This reverts commit 6efe6ef6e8f98642e43f6c3ad1b3d7141518db95. --- be/src/vec/exec/scan/scanner_context.cpp | 33 ++++++++++++++---------- be/src/vec/exec/scan/scanner_context.h | 6 +---- be/src/vec/exec/scan/vscan_node.cpp | 2 ++ be/src/vec/exec/scan/vscan_node.h | 2 ++ 4 files changed, 24 insertions(+), 19 deletions(-) diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index b50aa62629da0a..d5c200581bbf83 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -85,19 +85,30 @@ Status ScannerContext::init() { _newly_create_free_blocks_num = _parent->_newly_create_free_blocks_num; _queued_blocks_memory_usage = _parent->_queued_blocks_memory_usage; _scanner_wait_batch_timer = _parent->_scanner_wait_batch_timer; - // 2. Calculate the number of free blocks that all scanners can use. + // 2. Calculate how many blocks need to be preallocated. // The calculation logic is as follows: // 1. Assuming that at most M rows can be scanned in one scan(config::doris_scanner_row_num), // then figure out how many blocks are required for one scan(_block_per_scanner). // 2. The maximum number of concurrency * the blocks required for one scan, - // that is, the number of blocks that all scanners can use. + // that is, the number of blocks that need to be pre-allocated auto doris_scanner_row_num = limit == -1 ? config::doris_scanner_row_num : std::min(static_cast(config::doris_scanner_row_num), limit); int real_block_size = limit == -1 ? _batch_size : std::min(static_cast(_batch_size), limit); _block_per_scanner = (doris_scanner_row_num + (real_block_size - 1)) / real_block_size; - _free_blocks_capacity = _max_thread_num * _block_per_scanner; + auto pre_alloc_block_count = _max_thread_num * _block_per_scanner; + + // The free blocks is used for final output block of scanners. + // So use _output_tuple_desc; + int64_t free_blocks_memory_usage = 0; + for (int i = 0; i < pre_alloc_block_count; ++i) { + auto block = vectorized::Block::create_unique(_output_tuple_desc->slots(), real_block_size, + true /*ignore invalid slots*/); + free_blocks_memory_usage += block->allocated_bytes(); + _free_blocks.emplace_back(std::move(block)); + } + _free_blocks_memory_usage->add(free_blocks_memory_usage); #ifndef BE_TEST // 3. get thread token @@ -110,6 +121,7 @@ Status ScannerContext::init() { _num_unfinished_scanners = _scanners.size(); + COUNTER_SET(_parent->_pre_alloc_free_blocks_num, (int64_t)pre_alloc_block_count); COUNTER_SET(_parent->_max_scanner_thread_num, (int64_t)_max_thread_num); _parent->_runtime_profile->add_info_string("UseSpecificThreadToken", thread_token == nullptr ? "False" : "True"); @@ -121,12 +133,6 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block, bool get_block_not_empty) { { std::lock_guard l(_free_blocks_lock); - *has_free_block = _free_blocks_capacity > 0; - // Always reduce _free_blocks_capacity by one since we always return a block - if (_free_blocks_capacity > 0) { - --_free_blocks_capacity; - } - if (!_free_blocks.empty()) { if (!get_block_not_empty || _free_blocks.back()->mem_reuse()) { auto block = std::move(_free_blocks.back()); @@ -136,6 +142,7 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block, } } } + *has_free_block = false; COUNTER_UPDATE(_newly_create_free_blocks_num, 1); return vectorized::Block::create_unique(_real_tuple_desc->slots(), _batch_size, @@ -147,7 +154,6 @@ void ScannerContext::return_free_block(std::unique_ptr block) _free_blocks_memory_usage->add(block->allocated_bytes()); std::lock_guard l(_free_blocks_lock); _free_blocks.emplace_back(std::move(block)); - ++_free_blocks_capacity; } void ScannerContext::append_blocks_to_queue(std::vector& blocks) { @@ -281,8 +287,6 @@ void ScannerContext::clear_and_join(VScanNode* node, RuntimeState* state) { _close_and_clear_scanners(node, state); _blocks_queue.clear(); - std::unique_lock lock(_free_blocks_lock); - _free_blocks.clear(); } bool ScannerContext::no_schedule() { @@ -345,9 +349,10 @@ void ScannerContext::get_next_batch_of_scanners(std::list* current int thread_slot_num = 0; { // If there are enough space in blocks queue, - // the scanner number depends on the _free_blocks_capacity + // the scanner number depends on the _free_blocks numbers std::lock_guard f(_free_blocks_lock); - thread_slot_num = (_free_blocks_capacity + _block_per_scanner - 1) / _block_per_scanner; + thread_slot_num = _free_blocks.size() / _block_per_scanner; + thread_slot_num += (_free_blocks.size() % _block_per_scanner != 0); thread_slot_num = std::min(thread_slot_num, _max_thread_num - _num_running_scanners); if (thread_slot_num <= 0) { thread_slot_num = 1; diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index e0bb78908f98b9..0b51758399514b 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -194,13 +194,9 @@ class ScannerContext { std::atomic_bool _should_stop = false; std::atomic_bool _is_finished = false; - // Lazy-allocated blocks for all scanners to share, for memory reuse. + // Pre-allocated blocks for all scanners to share, for memory reuse. doris::Mutex _free_blocks_lock; std::vector _free_blocks; - // The current number of free blocks available to the scanners. - // Used to limit the memory usage of the scanner. - // NOTE: this is NOT the size of `_free_blocks`. - int32_t _free_blocks_capacity = 0; int _batch_size; // The limit from SQL's limit clause diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index cf5516471c4005..875421bdd19c5e 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -288,6 +288,8 @@ Status VScanNode::_init_profile() { // time of scan thread to wait for worker thread of the thread pool _scanner_wait_worker_timer = ADD_TIMER(_runtime_profile, "ScannerWorkerWaitTime"); + _pre_alloc_free_blocks_num = + ADD_COUNTER(_runtime_profile, "PreAllocFreeBlocksNum", TUnit::UNIT); _max_scanner_thread_num = ADD_COUNTER(_runtime_profile, "MaxScannerThreadNum", TUnit::UNIT); return Status::OK(); diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index e4c2fb6118a91b..3b19d84dabfb41 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -356,6 +356,8 @@ class VScanNode : public ExecNode { RuntimeProfile::Counter* _scanner_ctx_sched_counter = nullptr; RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr; RuntimeProfile::Counter* _scanner_wait_worker_timer = nullptr; + // Num of pre allocated free blocks + RuntimeProfile::Counter* _pre_alloc_free_blocks_num = nullptr; // Num of newly created free blocks when running query RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr; // Max num of scanner thread