From 181cacd42d970439210d477fd1083f4105b270d1 Mon Sep 17 00:00:00 2001 From: xiaokang Date: Fri, 11 Mar 2022 23:23:58 +0800 Subject: [PATCH 1/2] fix olap table scan and sink memory comsumption problem --- be/src/common/config.h | 4 +++- be/src/exec/olap_scan_node.cpp | 24 ++++++++++++++++++++---- be/src/exec/olap_scan_node.h | 6 ++++++ be/src/exec/olap_scanner.cpp | 10 +++++----- be/src/exec/tablet_sink.cpp | 15 +++++++++++++-- be/src/exec/tablet_sink.h | 3 +++ 6 files changed, 50 insertions(+), 12 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 893c5bdcc4015c..8d3819d973b9b7 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -169,8 +169,10 @@ CONF_mInt32(doris_scan_range_row_count, "524288"); CONF_mInt32(doris_scan_range_max_mb, "0"); // size of scanner queue between scanner thread and compute thread CONF_mInt32(doris_scanner_queue_size, "1024"); -// single read execute fragment row size +// single read execute fragment row number CONF_mInt32(doris_scanner_row_num, "16384"); +// single read execute fragment row bytes +CONF_mInt32(doris_scanner_row_bytes, "10485760"); // number of max scan keys CONF_mInt32(doris_max_scan_key_num, "1024"); // the max number of push down values of a single column. diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 8a6f3946e35289..0e750bde44d2d1 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -77,6 +77,8 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) { _max_pushdown_conditions_per_column = config::max_pushdown_conditions_per_column; } + _max_scanner_queue_size_bytes = query_options.mem_limit / 20; //TODO: session variable percent + /// TODO: could one filter used in the different scan_node ? int filter_size = _runtime_filter_descs.size(); _runtime_filter_ctxs.resize(filter_size); @@ -309,6 +311,7 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo materialized_batch = _materialized_row_batches.front(); DCHECK(materialized_batch != nullptr); _materialized_row_batches.pop_front(); + _materialized_row_batches_bytes -= materialized_batch->tuple_data_pool()->total_reserved_bytes(); } } @@ -395,12 +398,14 @@ Status OlapScanNode::close(RuntimeState* state) { } _materialized_row_batches.clear(); + _materialized_row_batches_bytes = 0; for (auto row_batch : _scan_row_batches) { delete row_batch; } _scan_row_batches.clear(); + _scan_row_batches_bytes = 0; // OlapScanNode terminate by exception // so that initiative close the Scanner @@ -1376,6 +1381,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { int max_thread = _max_materialized_row_batches; if (config::doris_scanner_row_num > state->batch_size()) { max_thread /= config::doris_scanner_row_num / state->batch_size(); + if (max_thread <= 0) max_thread = 1; } // read from scanner while (LIKELY(status.ok())) { @@ -1394,7 +1400,9 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { // How many thread can apply to this query size_t thread_slot_num = 0; mem_consume = _scanner_mem_tracker->consumption(); - if (mem_consume < (mem_limit * 6) / 10) { + // check limit for total memory and _scan_row_batches memory + if (mem_consume < (mem_limit * 6) / 10 && + _scan_row_batches_bytes < _max_scanner_queue_size_bytes / 2) { thread_slot_num = max_thread - assigned_thread_num; } else { // Memory already exceed @@ -1474,6 +1482,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { if (LIKELY(!_scan_row_batches.empty())) { scan_batch = _scan_row_batches.front(); _scan_row_batches.pop_front(); + _scan_row_batches_bytes -= scan_batch->tuple_data_pool()->total_reserved_bytes(); // delete scan_batch if transfer thread should be stopped // because scan_batch wouldn't be useful anymore @@ -1574,10 +1583,12 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) { // need yield this thread when we do enough work. However, OlapStorage read // data in pre-aggregate mode, then we can't use storage returned data to // judge if we need to yield. So we record all raw data read in this round - // scan, if this exceed threshold, we yield this thread. + // scan, if this exceed row number or bytes threshold, we yield this thread. int64_t raw_rows_read = scanner->raw_rows_read(); int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num; - while (!eos && raw_rows_read < raw_rows_threshold) { + int64_t raw_bytes_read = 0; + int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; + while (!eos && raw_rows_read < raw_rows_threshold && raw_bytes_read < raw_bytes_threshold) { if (UNLIKELY(_transfer_done)) { eos = true; status = Status::Cancelled("Cancelled"); @@ -1601,6 +1612,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) { row_batch = nullptr; } else { row_batchs.push_back(row_batch); + raw_bytes_read += row_batch->tuple_data_pool()->total_reserved_bytes(); } raw_rows_read = scanner->raw_rows_read(); } @@ -1629,6 +1641,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) { } else { for (auto rb : row_batchs) { _scan_row_batches.push_back(rb); + _scan_row_batches_bytes += rb->tuple_data_pool()->total_reserved_bytes(); } } // If eos is true, we will process out of this lock block. @@ -1668,13 +1681,16 @@ Status OlapScanNode::add_one_batch(RowBatch* row_batch) { { std::unique_lock l(_row_batches_lock); - while (UNLIKELY(_materialized_row_batches.size() >= _max_materialized_row_batches && + // check queue limit for both both batch size and bytes + while (UNLIKELY((_materialized_row_batches.size() >= _max_materialized_row_batches || + _materialized_row_batches_bytes >= _max_scanner_queue_size_bytes / 2) && !_transfer_done)) { _row_batch_consumed_cv.wait(l); } VLOG_CRITICAL << "Push row_batch to materialized_row_batches"; _materialized_row_batches.push_back(row_batch); + _materialized_row_batches_bytes += row_batch->tuple_data_pool()->total_reserved_bytes(); } // remove one batch, notify main thread _row_batch_added_cv.notify_one(); diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index b4c3dde1f28f15..44a5cd245f502e 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -210,6 +210,8 @@ class OlapScanNode : public ScanNode { std::condition_variable _row_batch_consumed_cv; std::list _materialized_row_batches; + // to limit _materialized_row_batches_bytes < _max_scanner_queue_size_bytes / 2 + std::atomic_size_t _materialized_row_batches_bytes = 0; std::mutex _scan_batches_lock; std::condition_variable _scan_batch_added_cv; @@ -217,10 +219,14 @@ class OlapScanNode : public ScanNode { std::condition_variable _scan_thread_exit_cv; std::list _scan_row_batches; + // to limit _scan_row_batches_bytes < _max_scanner_queue_size_bytes / 2 + std::atomic_size_t _scan_row_batches_bytes = 0; std::list _olap_scanners; int _max_materialized_row_batches; + // to limit _materialized_row_batches_bytes and _scan_row_batches_bytes + size_t _max_scanner_queue_size_bytes; bool _start; // Used in Scan thread to ensure thread-safe std::atomic_bool _scanner_done; diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 71cd4c3a445caf..d711acd199cd87 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -264,11 +264,14 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { std::unique_ptr mem_pool(new MemPool(_mem_tracker.get())); int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num; + int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; { SCOPED_TIMER(_parent->_scan_timer); while (true) { - // Batch is full, break - if (batch->is_full()) { + // Batch is full or reach raw_rows_threshold or raw_bytes_threshold, break + if (batch->is_full() || + batch->tuple_data_pool()->total_reserved_bytes() >= raw_bytes_threshold || + raw_rows_read() >= raw_rows_threshold) { _update_realtime_counter(); break; } @@ -420,9 +423,6 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { } } while (false); - if (raw_rows_read() >= raw_rows_threshold) { - break; - } } } diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 757abe2071781e..c4faf128a915b4 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -102,6 +102,7 @@ Status NodeChannel::init(RuntimeState* state) { _rpc_timeout_ms = state->query_options().query_timeout * 1000; _timeout_watch.start(); + _max_pending_batches_bytes = _parent->_load_mem_limit / 20; //TODO: session variable percent _load_info = "load_id=" + print_id(_parent->_load_id) + ", txn_id=" + std::to_string(_parent->_txn_id); @@ -246,7 +247,10 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) { // But there is still some unfinished things, we do mem limit here temporarily. // _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below. // It's fine to do a fake add_row() and return OK, because we will check _cancelled in next add_row() or mark_close(). - while (!_cancelled && _parent->_mem_tracker->any_limit_exceeded() && _pending_batches_num > 0) { + while (!_cancelled && + _pending_batches_num > 0 && + (_pending_batches_bytes > _max_pending_batches_bytes || + _parent->_mem_tracker->any_limit_exceeded())) { SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns); SleepFor(MonoDelta::FromMilliseconds(10)); } @@ -256,6 +260,7 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) { { SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns); std::lock_guard l(_pending_batches_lock); + _pending_batches_bytes += _cur_batch->tuple_data_pool()->total_reserved_bytes(); //To simplify the add_row logic, postpone adding batch into req until the time of sending req _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request); _pending_batches_num++; @@ -294,7 +299,10 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) { // But there is still some unfinished things, we do mem limit here temporarily. // _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below. // It's fine to do a fake add_row() and return OK, because we will check _cancelled in next add_row() or mark_close(). - while (!_cancelled && _parent->_mem_tracker->any_limit_exceeded() && _pending_batches_num > 0) { + while (!_cancelled && + _pending_batches_num > 0 && + (_pending_batches_bytes > _max_pending_batches_bytes || + _parent->_mem_tracker->any_limit_exceeded())) { SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns); SleepFor(MonoDelta::FromMilliseconds(10)); } @@ -304,6 +312,7 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) { { SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns); std::lock_guard l(_pending_batches_lock); + _pending_batches_bytes += _cur_batch->tuple_data_pool()->total_reserved_bytes(); //To simplify the add_row logic, postpone adding batch into req until the time of sending req _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request); _pending_batches_num++; @@ -339,6 +348,7 @@ Status NodeChannel::mark_close() { { debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; std::lock_guard l(_pending_batches_lock); + _pending_batches_bytes += _cur_batch->tuple_data_pool()->total_reserved_bytes(); _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request); _pending_batches_num++; DCHECK(_pending_batches.back().second.eos()); @@ -445,6 +455,7 @@ void NodeChannel::try_send_batch() { send_batch = std::move(_pending_batches.front()); _pending_batches.pop(); _pending_batches_num--; + _pending_batches_bytes -= send_batch.first->tuple_data_pool()->total_reserved_bytes(); } auto row_batch = std::move(send_batch.first); diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index af6cffdb4431b5..5c0330c2b39a53 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -256,6 +256,9 @@ class NodeChannel { using AddBatchReq = std::pair, PTabletWriterAddBatchRequest>; std::queue _pending_batches; std::atomic _pending_batches_num {0}; + // limit _pending_batches size + std::atomic _pending_batches_bytes {0}; + size_t _max_pending_batches_bytes {10 * 1024 * 1024}; std::shared_ptr _stub = nullptr; RefCountClosure* _open_closure = nullptr; From fa86f4db013609eb1f7c9005c20de8378d9b76df Mon Sep 17 00:00:00 2001 From: xiaokang Date: Sun, 13 Mar 2022 16:32:10 +0800 Subject: [PATCH 2/2] transport memory problem fix to vectorized olap table scan --- be/src/vec/exec/volap_scan_node.cpp | 48 ++++++++++++++++++++++++----- be/src/vec/exec/volap_scanner.cpp | 8 ++++- 2 files changed, 48 insertions(+), 8 deletions(-) diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index 88dfa3e4ab1d26..76d6a43cbd4ef2 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -112,6 +112,9 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) { // 3 transfer result block when queue is not empty if (LIKELY(!_scan_blocks.empty())) { blocks.swap(_scan_blocks); + for (auto b : blocks) { + _scan_row_batches_bytes -= b->allocated_bytes(); + } // delete scan_block if transfer thread should be stopped // because scan_block wouldn't be useful anymore if (UNLIKELY(_transfer_done)) { @@ -193,12 +196,15 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { // need yield this thread when we do enough work. However, OlapStorage read // data in pre-aggregate mode, then we can't use storage returned data to // judge if we need to yield. So we record all raw data read in this round - // scan, if this exceed threshold, we yield this thread. + // scan, if this exceed row number or bytes threshold, we yield this thread. int64_t raw_rows_read = scanner->raw_rows_read(); int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num; + int64_t raw_bytes_read = 0; + int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; bool get_free_block = true; - while (!eos && raw_rows_read < raw_rows_threshold && get_free_block) { + while (!eos && raw_rows_read < raw_rows_threshold && + raw_bytes_read < raw_bytes_threshold && get_free_block) { if (UNLIKELY(_transfer_done)) { eos = true; status = Status::Cancelled("Cancelled"); @@ -216,6 +222,9 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { eos = true; break; } + + raw_bytes_read += block->allocated_bytes(); + // 4. if status not ok, change status_. if (UNLIKELY(block->rows() == 0)) { std::lock_guard l(_free_blocks_lock); @@ -254,6 +263,9 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { } else { std::lock_guard l(_scan_blocks_lock); _scan_blocks.insert(_scan_blocks.end(), blocks.begin(), blocks.end()); + for (auto b : blocks) { + _scan_row_batches_bytes += b->allocated_bytes(); + } } // If eos is true, we will process out of this lock block. if (eos) { @@ -286,13 +298,18 @@ Status VOlapScanNode::_add_blocks(std::vector& block) { { std::unique_lock l(_blocks_lock); - while (UNLIKELY(_materialized_blocks.size() >= _max_materialized_blocks && + // check queue limit for both block queue size and bytes + while (UNLIKELY((_materialized_blocks.size() >= _max_materialized_blocks || + _materialized_row_batches_bytes >= _max_scanner_queue_size_bytes / 2) && !_transfer_done)) { _block_consumed_cv.wait(l); } VLOG_CRITICAL << "Push block to materialized_blocks"; _materialized_blocks.insert(_materialized_blocks.end(), block.cbegin(), block.cend()); + for (auto b : block) { + _materialized_row_batches_bytes += b->allocated_bytes(); + } } // remove one block, notify main thread _block_added_cv.notify_one(); @@ -399,7 +416,9 @@ Status VOlapScanNode::close(RuntimeState* state) { // which may lead to potential performance problems. we should rethink whether to delete the transfer thread std::for_each(_materialized_blocks.begin(), _materialized_blocks.end(), std::default_delete()); + _materialized_row_batches_bytes = 0; std::for_each(_scan_blocks.begin(), _scan_blocks.end(), std::default_delete()); + _scan_row_batches_bytes = 0; std::for_each(_free_blocks.begin(), _free_blocks.end(), std::default_delete()); _mem_tracker->release(_buffered_bytes); @@ -473,6 +492,7 @@ Status VOlapScanNode::get_next(RuntimeState* state, Block* block, bool* eos) { materialized_block = _materialized_blocks.back(); DCHECK(materialized_block != NULL); _materialized_blocks.pop_back(); + _materialized_row_batches_bytes -= materialized_block->allocated_bytes(); } } @@ -531,15 +551,29 @@ Block* VOlapScanNode::_alloc_block(bool& get_free_block) { int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per_scanner) { std::list olap_scanners; int assigned_thread_num = _running_thread; + size_t max_thread = std::min(_volap_scanners.size(), + static_cast(config::doris_scanner_thread_pool_thread_num)); // copy to local { // How many thread can apply to this query size_t thread_slot_num = 0; { - std::lock_guard l(_free_blocks_lock); - thread_slot_num = _free_blocks.size() / block_per_scanner; - thread_slot_num += (_free_blocks.size() % block_per_scanner != 0); - if (thread_slot_num == 0) thread_slot_num++; + if (_scan_row_batches_bytes < _max_scanner_queue_size_bytes / 2) { + std::lock_guard l(_free_blocks_lock); + thread_slot_num = _free_blocks.size() / block_per_scanner; + thread_slot_num += (_free_blocks.size() % block_per_scanner != 0); + thread_slot_num = std::min(thread_slot_num, max_thread - assigned_thread_num); + if (thread_slot_num <= 0) thread_slot_num = 1; + } else { + std::lock_guard l(_scan_blocks_lock); + if (_scan_blocks.empty()) { + // Just for notify if _scan_blocks is empty and no running thread + if (assigned_thread_num == 0) { + thread_slot_num = 1; + // NOTE: if olap_scanners_ is empty, scanner_done_ should be true + } + } + } } { diff --git a/be/src/vec/exec/volap_scanner.cpp b/be/src/vec/exec/volap_scanner.cpp index 6a6e771a5660e1..b01bd601369fc6 100644 --- a/be/src/vec/exec/volap_scanner.cpp +++ b/be/src/vec/exec/volap_scanner.cpp @@ -42,6 +42,7 @@ Status VOlapScanner::get_block(RuntimeState* state, vectorized::Block* block, bo DCHECK(block->rows() == 0); int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num; + int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; if (!block->mem_reuse()) { for (const auto slot_desc : _tuple_desc->slots()) { block->insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(), @@ -65,7 +66,12 @@ Status VOlapScanner::get_block(RuntimeState* state, vectorized::Block* block, bo RETURN_IF_ERROR( VExprContext::filter_block(_vconjunct_ctx, block, _tuple_desc->slots().size())); - } while (block->rows() == 0 && !(*eof) && raw_rows_read() < raw_rows_threshold); + } while (block->rows() == 0 && !(*eof) && raw_rows_read() < raw_rows_threshold && + block->allocated_bytes() < raw_bytes_threshold); + // NOTE: + // There is no need to check raw_bytes_threshold since block->rows() == 0 is checked first. + // But checking raw_bytes_threshold is still added here for consistency with raw_rows_threshold + // and olap_scanner.cpp. return Status::OK(); }