Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,10 @@ CONF_mInt32(doris_scan_range_row_count, "524288");
CONF_mInt32(doris_scan_range_max_mb, "0");
// size of scanner queue between scanner thread and compute thread
CONF_mInt32(doris_scanner_queue_size, "1024");
// single read execute fragment row size
// single read execute fragment row number
CONF_mInt32(doris_scanner_row_num, "16384");
// single read execute fragment row bytes
CONF_mInt32(doris_scanner_row_bytes, "10485760");
// number of max scan keys
CONF_mInt32(doris_max_scan_key_num, "1024");
// the max number of push down values of a single column.
Expand Down
24 changes: 20 additions & 4 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
_max_pushdown_conditions_per_column = config::max_pushdown_conditions_per_column;
}

_max_scanner_queue_size_bytes = query_options.mem_limit / 20; //TODO: session variable percent

/// TODO: could one filter used in the different scan_node ?
int filter_size = _runtime_filter_descs.size();
_runtime_filter_ctxs.resize(filter_size);
Expand Down Expand Up @@ -309,6 +311,7 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo
materialized_batch = _materialized_row_batches.front();
DCHECK(materialized_batch != nullptr);
_materialized_row_batches.pop_front();
_materialized_row_batches_bytes -= materialized_batch->tuple_data_pool()->total_reserved_bytes();
}
}

Expand Down Expand Up @@ -395,12 +398,14 @@ Status OlapScanNode::close(RuntimeState* state) {
}

_materialized_row_batches.clear();
_materialized_row_batches_bytes = 0;

for (auto row_batch : _scan_row_batches) {
delete row_batch;
}

_scan_row_batches.clear();
_scan_row_batches_bytes = 0;

// OlapScanNode terminate by exception
// so that initiative close the Scanner
Expand Down Expand Up @@ -1376,6 +1381,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
int max_thread = _max_materialized_row_batches;
if (config::doris_scanner_row_num > state->batch_size()) {
max_thread /= config::doris_scanner_row_num / state->batch_size();
if (max_thread <= 0) max_thread = 1;
}
// read from scanner
while (LIKELY(status.ok())) {
Expand All @@ -1394,7 +1400,9 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
// How many thread can apply to this query
size_t thread_slot_num = 0;
mem_consume = _scanner_mem_tracker->consumption();
if (mem_consume < (mem_limit * 6) / 10) {
// check limit for total memory and _scan_row_batches memory
if (mem_consume < (mem_limit * 6) / 10 &&
_scan_row_batches_bytes < _max_scanner_queue_size_bytes / 2) {
thread_slot_num = max_thread - assigned_thread_num;
} else {
// Memory already exceed
Expand Down Expand Up @@ -1474,6 +1482,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
if (LIKELY(!_scan_row_batches.empty())) {
scan_batch = _scan_row_batches.front();
_scan_row_batches.pop_front();
_scan_row_batches_bytes -= scan_batch->tuple_data_pool()->total_reserved_bytes();

// delete scan_batch if transfer thread should be stopped
// because scan_batch wouldn't be useful anymore
Expand Down Expand Up @@ -1574,10 +1583,12 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
// need yield this thread when we do enough work. However, OlapStorage read
// data in pre-aggregate mode, then we can't use storage returned data to
// judge if we need to yield. So we record all raw data read in this round
// scan, if this exceed threshold, we yield this thread.
// scan, if this exceed row number or bytes threshold, we yield this thread.
int64_t raw_rows_read = scanner->raw_rows_read();
int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num;
while (!eos && raw_rows_read < raw_rows_threshold) {
int64_t raw_bytes_read = 0;
int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
while (!eos && raw_rows_read < raw_rows_threshold && raw_bytes_read < raw_bytes_threshold) {
if (UNLIKELY(_transfer_done)) {
eos = true;
status = Status::Cancelled("Cancelled");
Expand All @@ -1601,6 +1612,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
row_batch = nullptr;
} else {
row_batchs.push_back(row_batch);
raw_bytes_read += row_batch->tuple_data_pool()->total_reserved_bytes();
}
raw_rows_read = scanner->raw_rows_read();
}
Expand Down Expand Up @@ -1629,6 +1641,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
} else {
for (auto rb : row_batchs) {
_scan_row_batches.push_back(rb);
_scan_row_batches_bytes += rb->tuple_data_pool()->total_reserved_bytes();
}
}
// If eos is true, we will process out of this lock block.
Expand Down Expand Up @@ -1668,13 +1681,16 @@ Status OlapScanNode::add_one_batch(RowBatch* row_batch) {
{
std::unique_lock<std::mutex> l(_row_batches_lock);

while (UNLIKELY(_materialized_row_batches.size() >= _max_materialized_row_batches &&
// check queue limit for both both batch size and bytes
while (UNLIKELY((_materialized_row_batches.size() >= _max_materialized_row_batches ||
_materialized_row_batches_bytes >= _max_scanner_queue_size_bytes / 2) &&
!_transfer_done)) {
_row_batch_consumed_cv.wait(l);
}

VLOG_CRITICAL << "Push row_batch to materialized_row_batches";
_materialized_row_batches.push_back(row_batch);
_materialized_row_batches_bytes += row_batch->tuple_data_pool()->total_reserved_bytes();
}
// remove one batch, notify main thread
_row_batch_added_cv.notify_one();
Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/olap_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,17 +210,23 @@ class OlapScanNode : public ScanNode {
std::condition_variable _row_batch_consumed_cv;

std::list<RowBatch*> _materialized_row_batches;
// to limit _materialized_row_batches_bytes < _max_scanner_queue_size_bytes / 2
std::atomic_size_t _materialized_row_batches_bytes = 0;

std::mutex _scan_batches_lock;
std::condition_variable _scan_batch_added_cv;
std::atomic_int _running_thread = 0;
std::condition_variable _scan_thread_exit_cv;

std::list<RowBatch*> _scan_row_batches;
// to limit _scan_row_batches_bytes < _max_scanner_queue_size_bytes / 2
std::atomic_size_t _scan_row_batches_bytes = 0;

std::list<OlapScanner*> _olap_scanners;

int _max_materialized_row_batches;
// to limit _materialized_row_batches_bytes and _scan_row_batches_bytes
size_t _max_scanner_queue_size_bytes;
bool _start;
// Used in Scan thread to ensure thread-safe
std::atomic_bool _scanner_done;
Expand Down
10 changes: 5 additions & 5 deletions be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,14 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) {

std::unique_ptr<MemPool> mem_pool(new MemPool(_mem_tracker.get()));
int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num;
int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
{
SCOPED_TIMER(_parent->_scan_timer);
while (true) {
// Batch is full, break
if (batch->is_full()) {
// Batch is full or reach raw_rows_threshold or raw_bytes_threshold, break
if (batch->is_full() ||
batch->tuple_data_pool()->total_reserved_bytes() >= raw_bytes_threshold ||
raw_rows_read() >= raw_rows_threshold) {
_update_realtime_counter();
break;
}
Expand Down Expand Up @@ -420,9 +423,6 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) {
}
} while (false);

if (raw_rows_read() >= raw_rows_threshold) {
break;
}
}
}

Expand Down
15 changes: 13 additions & 2 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Status NodeChannel::init(RuntimeState* state) {

_rpc_timeout_ms = state->query_options().query_timeout * 1000;
_timeout_watch.start();
_max_pending_batches_bytes = _parent->_load_mem_limit / 20; //TODO: session variable percent

_load_info = "load_id=" + print_id(_parent->_load_id) +
", txn_id=" + std::to_string(_parent->_txn_id);
Expand Down Expand Up @@ -246,7 +247,10 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) {
// But there is still some unfinished things, we do mem limit here temporarily.
// _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below.
// It's fine to do a fake add_row() and return OK, because we will check _cancelled in next add_row() or mark_close().
while (!_cancelled && _parent->_mem_tracker->any_limit_exceeded() && _pending_batches_num > 0) {
while (!_cancelled &&
_pending_batches_num > 0 &&
(_pending_batches_bytes > _max_pending_batches_bytes ||
_parent->_mem_tracker->any_limit_exceeded())) {
SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
SleepFor(MonoDelta::FromMilliseconds(10));
}
Expand All @@ -256,6 +260,7 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) {
{
SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
std::lock_guard<std::mutex> l(_pending_batches_lock);
_pending_batches_bytes += _cur_batch->tuple_data_pool()->total_reserved_bytes();
//To simplify the add_row logic, postpone adding batch into req until the time of sending req
_pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request);
_pending_batches_num++;
Expand Down Expand Up @@ -294,7 +299,10 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) {
// But there is still some unfinished things, we do mem limit here temporarily.
// _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below.
// It's fine to do a fake add_row() and return OK, because we will check _cancelled in next add_row() or mark_close().
while (!_cancelled && _parent->_mem_tracker->any_limit_exceeded() && _pending_batches_num > 0) {
while (!_cancelled &&
_pending_batches_num > 0 &&
(_pending_batches_bytes > _max_pending_batches_bytes ||
_parent->_mem_tracker->any_limit_exceeded())) {
SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
SleepFor(MonoDelta::FromMilliseconds(10));
}
Expand All @@ -304,6 +312,7 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) {
{
SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
std::lock_guard<std::mutex> l(_pending_batches_lock);
_pending_batches_bytes += _cur_batch->tuple_data_pool()->total_reserved_bytes();
//To simplify the add_row logic, postpone adding batch into req until the time of sending req
_pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request);
_pending_batches_num++;
Expand Down Expand Up @@ -339,6 +348,7 @@ Status NodeChannel::mark_close() {
{
debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
std::lock_guard<std::mutex> l(_pending_batches_lock);
_pending_batches_bytes += _cur_batch->tuple_data_pool()->total_reserved_bytes();
_pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request);
_pending_batches_num++;
DCHECK(_pending_batches.back().second.eos());
Expand Down Expand Up @@ -445,6 +455,7 @@ void NodeChannel::try_send_batch() {
send_batch = std::move(_pending_batches.front());
_pending_batches.pop();
_pending_batches_num--;
_pending_batches_bytes -= send_batch.first->tuple_data_pool()->total_reserved_bytes();
}

auto row_batch = std::move(send_batch.first);
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/tablet_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ class NodeChannel {
using AddBatchReq = std::pair<std::unique_ptr<RowBatch>, PTabletWriterAddBatchRequest>;
std::queue<AddBatchReq> _pending_batches;
std::atomic<int> _pending_batches_num {0};
// limit _pending_batches size
std::atomic<size_t> _pending_batches_bytes {0};
size_t _max_pending_batches_bytes {10 * 1024 * 1024};

std::shared_ptr<PBackendService_Stub> _stub = nullptr;
RefCountClosure<PTabletWriterOpenResult>* _open_closure = nullptr;
Expand Down
48 changes: 41 additions & 7 deletions be/src/vec/exec/volap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
// 3 transfer result block when queue is not empty
if (LIKELY(!_scan_blocks.empty())) {
blocks.swap(_scan_blocks);
for (auto b : blocks) {
_scan_row_batches_bytes -= b->allocated_bytes();
}
// delete scan_block if transfer thread should be stopped
// because scan_block wouldn't be useful anymore
if (UNLIKELY(_transfer_done)) {
Expand Down Expand Up @@ -193,12 +196,15 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
// need yield this thread when we do enough work. However, OlapStorage read
// data in pre-aggregate mode, then we can't use storage returned data to
// judge if we need to yield. So we record all raw data read in this round
// scan, if this exceed threshold, we yield this thread.
// scan, if this exceed row number or bytes threshold, we yield this thread.
int64_t raw_rows_read = scanner->raw_rows_read();
int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num;
int64_t raw_bytes_read = 0;
int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
bool get_free_block = true;

while (!eos && raw_rows_read < raw_rows_threshold && get_free_block) {
while (!eos && raw_rows_read < raw_rows_threshold &&
raw_bytes_read < raw_bytes_threshold && get_free_block) {
if (UNLIKELY(_transfer_done)) {
eos = true;
status = Status::Cancelled("Cancelled");
Expand All @@ -216,6 +222,9 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
eos = true;
break;
}

raw_bytes_read += block->allocated_bytes();

// 4. if status not ok, change status_.
if (UNLIKELY(block->rows() == 0)) {
std::lock_guard<std::mutex> l(_free_blocks_lock);
Expand Down Expand Up @@ -254,6 +263,9 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
} else {
std::lock_guard<std::mutex> l(_scan_blocks_lock);
_scan_blocks.insert(_scan_blocks.end(), blocks.begin(), blocks.end());
for (auto b : blocks) {
_scan_row_batches_bytes += b->allocated_bytes();
}
}
// If eos is true, we will process out of this lock block.
if (eos) {
Expand Down Expand Up @@ -286,13 +298,18 @@ Status VOlapScanNode::_add_blocks(std::vector<Block*>& block) {
{
std::unique_lock<std::mutex> l(_blocks_lock);

while (UNLIKELY(_materialized_blocks.size() >= _max_materialized_blocks &&
// check queue limit for both block queue size and bytes
while (UNLIKELY((_materialized_blocks.size() >= _max_materialized_blocks ||
_materialized_row_batches_bytes >= _max_scanner_queue_size_bytes / 2) &&
!_transfer_done)) {
_block_consumed_cv.wait(l);
}

VLOG_CRITICAL << "Push block to materialized_blocks";
_materialized_blocks.insert(_materialized_blocks.end(), block.cbegin(), block.cend());
for (auto b : block) {
_materialized_row_batches_bytes += b->allocated_bytes();
}
}
// remove one block, notify main thread
_block_added_cv.notify_one();
Expand Down Expand Up @@ -399,7 +416,9 @@ Status VOlapScanNode::close(RuntimeState* state) {
// which may lead to potential performance problems. we should rethink whether to delete the transfer thread
std::for_each(_materialized_blocks.begin(), _materialized_blocks.end(),
std::default_delete<Block>());
_materialized_row_batches_bytes = 0;
std::for_each(_scan_blocks.begin(), _scan_blocks.end(), std::default_delete<Block>());
_scan_row_batches_bytes = 0;
std::for_each(_free_blocks.begin(), _free_blocks.end(), std::default_delete<Block>());
_mem_tracker->release(_buffered_bytes);

Expand Down Expand Up @@ -473,6 +492,7 @@ Status VOlapScanNode::get_next(RuntimeState* state, Block* block, bool* eos) {
materialized_block = _materialized_blocks.back();
DCHECK(materialized_block != NULL);
_materialized_blocks.pop_back();
_materialized_row_batches_bytes -= materialized_block->allocated_bytes();
}
}

Expand Down Expand Up @@ -531,15 +551,29 @@ Block* VOlapScanNode::_alloc_block(bool& get_free_block) {
int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per_scanner) {
std::list<VOlapScanner*> olap_scanners;
int assigned_thread_num = _running_thread;
size_t max_thread = std::min(_volap_scanners.size(),
static_cast<size_t>(config::doris_scanner_thread_pool_thread_num));
// copy to local
{
// How many thread can apply to this query
size_t thread_slot_num = 0;
{
std::lock_guard<std::mutex> l(_free_blocks_lock);
thread_slot_num = _free_blocks.size() / block_per_scanner;
thread_slot_num += (_free_blocks.size() % block_per_scanner != 0);
if (thread_slot_num == 0) thread_slot_num++;
if (_scan_row_batches_bytes < _max_scanner_queue_size_bytes / 2) {
std::lock_guard<std::mutex> l(_free_blocks_lock);
thread_slot_num = _free_blocks.size() / block_per_scanner;
thread_slot_num += (_free_blocks.size() % block_per_scanner != 0);
thread_slot_num = std::min(thread_slot_num, max_thread - assigned_thread_num);
if (thread_slot_num <= 0) thread_slot_num = 1;
} else {
std::lock_guard<std::mutex> l(_scan_blocks_lock);
if (_scan_blocks.empty()) {
// Just for notify if _scan_blocks is empty and no running thread
if (assigned_thread_num == 0) {
thread_slot_num = 1;
// NOTE: if olap_scanners_ is empty, scanner_done_ should be true
}
}
}
}

{
Expand Down
8 changes: 7 additions & 1 deletion be/src/vec/exec/volap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Status VOlapScanner::get_block(RuntimeState* state, vectorized::Block* block, bo
DCHECK(block->rows() == 0);

int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num;
int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
if (!block->mem_reuse()) {
for (const auto slot_desc : _tuple_desc->slots()) {
block->insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
Expand All @@ -65,7 +66,12 @@ Status VOlapScanner::get_block(RuntimeState* state, vectorized::Block* block, bo

RETURN_IF_ERROR(
VExprContext::filter_block(_vconjunct_ctx, block, _tuple_desc->slots().size()));
} while (block->rows() == 0 && !(*eof) && raw_rows_read() < raw_rows_threshold);
} while (block->rows() == 0 && !(*eof) && raw_rows_read() < raw_rows_threshold &&
block->allocated_bytes() < raw_bytes_threshold);
// NOTE:
// There is no need to check raw_bytes_threshold since block->rows() == 0 is checked first.
// But checking raw_bytes_threshold is still added here for consistency with raw_rows_threshold
// and olap_scanner.cpp.

return Status::OK();
}
Expand Down