Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 19 additions & 14 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,30 @@ Status ScannerContext::init() {
_newly_create_free_blocks_num = _parent->_newly_create_free_blocks_num;
_queued_blocks_memory_usage = _parent->_queued_blocks_memory_usage;
_scanner_wait_batch_timer = _parent->_scanner_wait_batch_timer;
// 2. Calculate the number of free blocks that all scanners can use.
// 2. Calculate how many blocks need to be preallocated.
// The calculation logic is as follows:
// 1. Assuming that at most M rows can be scanned in one scan(config::doris_scanner_row_num),
// then figure out how many blocks are required for one scan(_block_per_scanner).
// 2. The maximum number of concurrency * the blocks required for one scan,
// that is, the number of blocks that all scanners can use.
// that is, the number of blocks that need to be pre-allocated
auto doris_scanner_row_num =
limit == -1 ? config::doris_scanner_row_num
: std::min(static_cast<int64_t>(config::doris_scanner_row_num), limit);
int real_block_size =
limit == -1 ? _batch_size : std::min(static_cast<int64_t>(_batch_size), limit);
_block_per_scanner = (doris_scanner_row_num + (real_block_size - 1)) / real_block_size;
_free_blocks_capacity = _max_thread_num * _block_per_scanner;
auto pre_alloc_block_count = _max_thread_num * _block_per_scanner;

// The free blocks is used for final output block of scanners.
// So use _output_tuple_desc;
int64_t free_blocks_memory_usage = 0;
for (int i = 0; i < pre_alloc_block_count; ++i) {
auto block = vectorized::Block::create_unique(_output_tuple_desc->slots(), real_block_size,
true /*ignore invalid slots*/);
free_blocks_memory_usage += block->allocated_bytes();
_free_blocks.emplace_back(std::move(block));
}
_free_blocks_memory_usage->add(free_blocks_memory_usage);

#ifndef BE_TEST
// 3. get thread token
Expand All @@ -110,6 +121,7 @@ Status ScannerContext::init() {

_num_unfinished_scanners = _scanners.size();

COUNTER_SET(_parent->_pre_alloc_free_blocks_num, (int64_t)pre_alloc_block_count);
COUNTER_SET(_parent->_max_scanner_thread_num, (int64_t)_max_thread_num);
_parent->_runtime_profile->add_info_string("UseSpecificThreadToken",
thread_token == nullptr ? "False" : "True");
Expand All @@ -121,12 +133,6 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block,
bool get_block_not_empty) {
{
std::lock_guard l(_free_blocks_lock);
*has_free_block = _free_blocks_capacity > 0;
// Always reduce _free_blocks_capacity by one since we always return a block
if (_free_blocks_capacity > 0) {
--_free_blocks_capacity;
}

if (!_free_blocks.empty()) {
if (!get_block_not_empty || _free_blocks.back()->mem_reuse()) {
auto block = std::move(_free_blocks.back());
Expand All @@ -136,6 +142,7 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block,
}
}
}
*has_free_block = false;

COUNTER_UPDATE(_newly_create_free_blocks_num, 1);
return vectorized::Block::create_unique(_real_tuple_desc->slots(), _batch_size,
Expand All @@ -147,7 +154,6 @@ void ScannerContext::return_free_block(std::unique_ptr<vectorized::Block> block)
_free_blocks_memory_usage->add(block->allocated_bytes());
std::lock_guard l(_free_blocks_lock);
_free_blocks.emplace_back(std::move(block));
++_free_blocks_capacity;
}

void ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks) {
Expand Down Expand Up @@ -281,8 +287,6 @@ void ScannerContext::clear_and_join(VScanNode* node, RuntimeState* state) {
_close_and_clear_scanners(node, state);

_blocks_queue.clear();
std::unique_lock lock(_free_blocks_lock);
_free_blocks.clear();
}

bool ScannerContext::no_schedule() {
Expand Down Expand Up @@ -345,9 +349,10 @@ void ScannerContext::get_next_batch_of_scanners(std::list<VScannerSPtr>* current
int thread_slot_num = 0;
{
// If there are enough space in blocks queue,
// the scanner number depends on the _free_blocks_capacity
// the scanner number depends on the _free_blocks numbers
std::lock_guard f(_free_blocks_lock);
thread_slot_num = (_free_blocks_capacity + _block_per_scanner - 1) / _block_per_scanner;
thread_slot_num = _free_blocks.size() / _block_per_scanner;
thread_slot_num += (_free_blocks.size() % _block_per_scanner != 0);
thread_slot_num = std::min(thread_slot_num, _max_thread_num - _num_running_scanners);
if (thread_slot_num <= 0) {
thread_slot_num = 1;
Expand Down
6 changes: 1 addition & 5 deletions be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,9 @@ class ScannerContext {
std::atomic_bool _should_stop = false;
std::atomic_bool _is_finished = false;

// Lazy-allocated blocks for all scanners to share, for memory reuse.
// Pre-allocated blocks for all scanners to share, for memory reuse.
doris::Mutex _free_blocks_lock;
std::vector<vectorized::BlockUPtr> _free_blocks;
// The current number of free blocks available to the scanners.
// Used to limit the memory usage of the scanner.
// NOTE: this is NOT the size of `_free_blocks`.
int32_t _free_blocks_capacity = 0;

int _batch_size;
// The limit from SQL's limit clause
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/scan/vscan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ Status VScanNode::_init_profile() {
// time of scan thread to wait for worker thread of the thread pool
_scanner_wait_worker_timer = ADD_TIMER(_runtime_profile, "ScannerWorkerWaitTime");

_pre_alloc_free_blocks_num =
ADD_COUNTER(_runtime_profile, "PreAllocFreeBlocksNum", TUnit::UNIT);
_max_scanner_thread_num = ADD_COUNTER(_runtime_profile, "MaxScannerThreadNum", TUnit::UNIT);

return Status::OK();
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/scan/vscan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,8 @@ class VScanNode : public ExecNode {
RuntimeProfile::Counter* _scanner_ctx_sched_counter = nullptr;
RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
RuntimeProfile::Counter* _scanner_wait_worker_timer = nullptr;
// Num of pre allocated free blocks
RuntimeProfile::Counter* _pre_alloc_free_blocks_num = nullptr;
// Num of newly created free blocks when running query
RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
// Max num of scanner thread
Expand Down