Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,15 @@ Status ScanLocalState<Derived>::_start_scanners(
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = PipXScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(),
_scan_dependency, p.ignore_data_distribution());
state()->scan_queue_mem_limit(), _scan_dependency,
// NOTE: This will logic makes _max_thread_num of ScannerContext to be C(num of cores) * 2
// For a query with C/2 instance and M scan node, scan task of this query will be C/2 * M * C*2
// and will be C*C*N at most.
// 1. If data distribution is ignored , we use 1 instance to scan.
// 2. Else, file scanner will consume much memory so we use config::doris_scanner_thread_pool_thread_num / query_parallel_instance_num scanners to scan.
p.ignore_data_distribution() && !p.is_file_scan_operator()
? 1
: state()->query_parallel_instance_num());
return Status::OK();
}

Expand Down
14 changes: 9 additions & 5 deletions be/src/vec/exec/scan/pip_scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ class PipScannerContext final : public vectorized::ScannerContext {
const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
int64_t limit_, bool ignore_data_distribution)
int64_t limit_, int64_t max_bytes_in_blocks_queue,
const int num_parallel_instances)
: vectorized::ScannerContext(state, parent, output_tuple_desc, output_row_descriptor,
scanners, limit_, ignore_data_distribution) {}
scanners, limit_, max_bytes_in_blocks_queue,
num_parallel_instances) {}
};

class PipXScannerContext final : public vectorized::ScannerContext {
Expand All @@ -44,10 +46,12 @@ class PipXScannerContext final : public vectorized::ScannerContext {
const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
int64_t limit_, std::shared_ptr<pipeline::Dependency> dependency,
bool ignore_data_distribution)
int64_t limit_, int64_t max_bytes_in_blocks_queue,
std::shared_ptr<pipeline::Dependency> dependency,
const int num_parallel_instances)
: vectorized::ScannerContext(state, output_tuple_desc, output_row_descriptor, scanners,
limit_, ignore_data_distribution, local_state) {
limit_, max_bytes_in_blocks_queue, num_parallel_instances,
local_state) {
_dependency = dependency;
}

Expand Down
202 changes: 75 additions & 127 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include "runtime/runtime_state.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/exec/scan/vscan_node.h"

namespace doris::vectorized {
Expand All @@ -43,7 +42,8 @@ using namespace std::chrono_literals;
ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<ScannerDelegate>>& scanners,
int64_t limit_, bool ignore_data_distribution,
int64_t limit_, int64_t max_bytes_in_blocks_queue,
const int num_parallel_instances,
pipeline::ScanLocalStateBase* local_state)
: HasTaskExecutionCtx(state),
_state(state),
Expand All @@ -54,35 +54,93 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu
_output_row_descriptor(output_row_descriptor),
_batch_size(state->batch_size()),
limit(limit_),
_scanner_scheduler_global(state->exec_env()->scanner_scheduler()),
_max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, (int64_t)1024) *
num_parallel_instances),
_scanner_scheduler(state->exec_env()->scanner_scheduler()),
_all_scanners(scanners.begin(), scanners.end()),
_ignore_data_distribution(ignore_data_distribution) {
_num_parallel_instances(num_parallel_instances) {
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
_query_id = _state->get_query_ctx()->query_id();
ctx_id = UniqueId::gen_uid().to_string();
// Provide more memory for wide tables, increase proportionally by multiples of 300
_max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;
if (scanners.empty()) {
_is_finished = true;
_set_scanner_done();
}
_scanners.enqueue_bulk(scanners.begin(), scanners.size());
if (limit < 0) {
limit = -1;
}
MAX_SCALE_UP_RATIO = _state->scanner_scale_up_ratio();
// _max_thread_num controls how many scanners of this ScanOperator can be submitted to scheduler at a time.
// The overall target of our system is to make full utilization of the resources.
// At the same time, we dont want too many tasks are queued by scheduler, that makes the query
// waiting too long, and existing task can not be scheduled in time.
// First of all, we try to make sure _max_thread_num of a ScanNode of a query on a single backend is less than
// config::doris_scanner_thread_pool_thread_num.
// For example, on a 64-core machine, the default value of config::doris_scanner_thread_pool_thread_num will be 64*2 =128.
// and the num_parallel_instances of this scan operator will be 64/2=32.
// For a query who has two scan nodes, the _max_thread_num of each scan node instance will be 128 / 32 = 4.
// We have 32 instances of this scan operator, so for the ScanNode, we have 4 * 32 = 128 scanner tasks can be submitted at a time.
// Remember that we have to ScanNode in this query, so the total number of scanner tasks can be submitted at a time is 128 * 2 = 256.
_max_thread_num = _state->num_scanner_threads() > 0
? _state->num_scanner_threads()
: config::doris_scanner_thread_pool_thread_num /
(_local_state ? num_parallel_instances
: state->query_parallel_instance_num());
_max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
// In some situation, there are not too many big tablets involed, so we can reduce the thread number.
_max_thread_num = std::min(_max_thread_num, (int32_t)scanners.size());
// 1. Calculate max concurrency
// For select * from table limit 10; should just use one thread.
if (_local_state && _local_state->should_run_serial()) {
_max_thread_num = 1;
}
// when user not specify scan_thread_num, so we can try downgrade _max_thread_num.
// becaue we found in a table with 5k columns, column reader may ocuppy too much memory.
// you can refer https://github.com/apache/doris/issues/35340 for details.
int32_t max_column_reader_num = state->query_options().max_column_reader_num;
if (_max_thread_num != 1 && max_column_reader_num > 0) {
int32_t scan_column_num = _output_tuple_desc->slots().size();
int32_t current_column_num = scan_column_num * _max_thread_num;
if (current_column_num > max_column_reader_num) {
int32_t new_max_thread_num = max_column_reader_num / scan_column_num;
new_max_thread_num = new_max_thread_num <= 0 ? 1 : new_max_thread_num;
if (new_max_thread_num < _max_thread_num) {
int32_t origin_max_thread_num = _max_thread_num;
_max_thread_num = new_max_thread_num;
LOG(INFO) << "downgrade query:" << print_id(state->query_id())
<< " scan's max_thread_num from " << origin_max_thread_num << " to "
<< _max_thread_num << ",column num: " << scan_column_num
<< ", max_column_reader_num: " << max_column_reader_num;
}
}
}

// 1. Calculate max concurrency
// For select * from table limit 10; should just use one thread.
if ((_parent && _parent->should_run_serial()) ||
(_local_state && _local_state->should_run_serial())) {
_max_thread_num = 1;
}
_query_thread_context = {_query_id, _state->query_mem_tracker(),
_state->get_query_ctx()->workload_group()};

DorisMetrics::instance()->scanner_ctx_cnt->increment(1);
}

ScannerContext::ScannerContext(doris::RuntimeState* state, doris::vectorized::VScanNode* parent,
const doris::TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<ScannerDelegate>>& scanners,
int64_t limit_, bool ignore_data_distribution,
int64_t limit_, int64_t max_bytes_in_blocks_queue,
const int num_parallel_instances,
pipeline::ScanLocalStateBase* local_state)
: ScannerContext(state, output_tuple_desc, output_row_descriptor, scanners, limit_,
ignore_data_distribution, local_state) {
max_bytes_in_blocks_queue, num_parallel_instances, local_state) {
_parent = parent;

// No need to increase scanner_ctx_cnt here. Since other constructor has already done it.
DorisMetrics::instance()->scanner_ctx_cnt->increment(1);
}

// After init function call, should not access _parent
Expand All @@ -105,125 +163,15 @@ Status ScannerContext::init() {

#ifndef BE_TEST
// 3. get thread token
if (!_state->get_query_ctx()) {
return Status::InternalError("Query context of {} is not set",
print_id(_state->query_id()));
}

thread_token = _state->get_query_ctx()->get_token();

if (_state->get_query_ctx()->get_scan_scheduler()) {
_should_reset_thread_name = false;
}
#endif

const int num_parallel_instances = _state->query_parallel_instance_num();

// _max_bytes_in_queue controls the maximum memory that can be used by a single scan instance.
// scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value
// is larger than 10MB.
_max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10);

// Provide more memory for wide tables, increase proportionally by multiples of 300
_max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;

// TODO: Where is the proper position to place this code?
if (_all_scanners.empty()) {
_is_finished = true;
_set_scanner_done();
}

// This is a track implementation.
// The logic is kept only for the purpose of the potential performance issue.
bool submit_many_scan_tasks_for_potential_performance_issue = true;
auto scanner = _all_scanners.front().lock();
DCHECK(scanner != nullptr);
// A query could have remote scan task and local scan task at the same time.
// So we need to compute the _scanner_scheduler in each scan operator instead of query context.
SimplifiedScanScheduler* simple_scan_scheduler = _state->get_query_ctx()->get_scan_scheduler();
SimplifiedScanScheduler* remote_scan_task_scheduler =
_state->get_query_ctx()->get_remote_scan_scheduler();
if (scanner->_scanner->get_storage_type() == TabletStorageType::STORAGE_TYPE_LOCAL) {
// scan_scheduler could be empty if query does not have a workload group.
if (simple_scan_scheduler) {
_scanner_scheduler = simple_scan_scheduler;
} else {
_scanner_scheduler = _scanner_scheduler_global->get_local_scan_thread_pool();
}
} else {
// remote_scan_task_scheduler could be empty if query does not have a workload group.
if (remote_scan_task_scheduler) {
_scanner_scheduler = remote_scan_task_scheduler;
} else {
_scanner_scheduler = _scanner_scheduler_global->get_remote_scan_thread_pool();
}
}

// _scannner_scheduler will be used to submit scan task.
if (_scanner_scheduler->get_queue_size() * 2 > config::doris_scanner_thread_pool_queue_size) {
submit_many_scan_tasks_for_potential_performance_issue = false;
}

// _max_thread_num controls how many scanners of this ScanOperator can be submitted to scheduler at a time.
// The overall target of our system is to make full utilization of the resources.
// At the same time, we dont want too many tasks are queued by scheduler, that is not necessary.
// So, first of all, we try to make sure _max_thread_num of a ScanNode of a query on a single backend is less than
// 2 * config::doris_scanner_thread_pool_thread_num, so that we can make all io threads busy.
// For example, on a 64-core machine, the default value of config::doris_scanner_thread_pool_thread_num will be 64*2 =128.
// and the num_parallel_instances of this scan operator will be 64/2=32.
// For a query who has one scan nodes, the _max_thread_num of each scan node instance will be 4 * 128 / 32 = 16.
// We have 32 instances of this scan operator, so for the ScanNode, we have 16 * 32 = 8 * 64 = 512 scanner tasks can be submitted at a time.
_max_thread_num = _state->num_scanner_threads() > 0 ? _state->num_scanner_threads() : 0;

if (_max_thread_num == 0) {
// NOTE: When ignore_data_distribution is true, the parallelism
// of the scan operator is regarded as 1 (actually maybe not).
// That will make the number of scan task can be submitted to the scheduler
// in a vary large value. This logicl is kept from the older implementation.
if (submit_many_scan_tasks_for_potential_performance_issue || _ignore_data_distribution) {
_max_thread_num = config::doris_scanner_thread_pool_thread_num / 1;
} else {
_max_thread_num =
4 * (config::doris_scanner_thread_pool_thread_num / num_parallel_instances);
// In some rare cases, user may set num_parallel_instances to 1 handly to make many query could be executed
// in parallel. We need to make sure the _max_thread_num is smaller than previous value.
_max_thread_num =
std::min(_max_thread_num, config::doris_scanner_thread_pool_thread_num);
}
}

_max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
// In some situation, there are not too many big tablets involed, so we can reduce the thread number.
// NOTE: when _all_scanners.size is zero, the _max_thread_num will be 0.
_max_thread_num = std::min(_max_thread_num, (int32_t)_all_scanners.size());

// 1. Calculate max concurrency
// For select * from table limit 10; should just use one thread.
if ((_parent && _parent->should_run_serial()) ||
(_local_state && _local_state->should_run_serial())) {
_max_thread_num = 1;
}

// when user not specify scan_thread_num, so we can try downgrade _max_thread_num.
// becaue we found in a table with 5k columns, column reader may ocuppy too much memory.
// you can refer https://github.com/apache/doris/issues/35340 for details.
int32_t max_column_reader_num = _state->query_options().max_column_reader_num;
if (_max_thread_num != 1 && max_column_reader_num > 0) {
int32_t scan_column_num = _output_tuple_desc->slots().size();
int32_t current_column_num = scan_column_num * _max_thread_num;
if (current_column_num > max_column_reader_num) {
int32_t new_max_thread_num = max_column_reader_num / scan_column_num;
new_max_thread_num = new_max_thread_num <= 0 ? 1 : new_max_thread_num;
if (new_max_thread_num < _max_thread_num) {
int32_t origin_max_thread_num = _max_thread_num;
_max_thread_num = new_max_thread_num;
LOG(INFO) << "downgrade query:" << print_id(_state->query_id())
<< " scan's max_thread_num from " << origin_max_thread_num << " to "
<< _max_thread_num << ",column num: " << scan_column_num
<< ", max_column_reader_num: " << max_column_reader_num;
}
if (_state->get_query_ctx()) {
thread_token = _state->get_query_ctx()->get_token();
_simple_scan_scheduler = _state->get_query_ctx()->get_scan_scheduler();
if (_simple_scan_scheduler) {
_should_reset_thread_name = false;
}
_remote_scan_task_scheduler = _state->get_query_ctx()->get_remote_scan_scheduler();
}
#endif

if (_parent) {
COUNTER_SET(_parent->_max_scanner_thread_num, (int64_t)_max_thread_num);
Expand Down Expand Up @@ -289,7 +237,7 @@ bool ScannerContext::empty_in_queue(int id) {
Status ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) {
_scanner_sched_counter->update(1);
_num_scheduled_scanners++;
return _scanner_scheduler_global->submit(shared_from_this(), scan_task);
return _scanner_scheduler->submit(shared_from_this(), scan_task);
}

void ScannerContext::append_block_to_queue(std::shared_ptr<ScanTask> scan_task) {
Expand Down
Loading