From b363cb9050c45008dd2d5a32be147d34464c6d09 Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Fri, 8 Nov 2024 20:59:53 +0800 Subject: [PATCH 1/2] NED --- be/src/pipeline/exec/scan_operator.cpp | 2 +- be/src/vec/exec/scan/scanner_context.cpp | 16 +++++++++++----- be/src/vec/exec/scan/scanner_context.h | 3 ++- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index f55ef7da36981c..21c3103fe5a708 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -990,7 +990,7 @@ Status ScanLocalState::_start_scanners( auto& p = _parent->cast(); _scanner_ctx = vectorized::ScannerContext::create_shared( state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(), - _scan_dependency, p.is_serial_operator()); + _scan_dependency, p.is_serial_operator(), p.is_file_scan_operator()); return Status::OK(); } diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index bea222bd0f35b0..f13c937d99007d 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -46,7 +46,8 @@ ScannerContext::ScannerContext( RuntimeState* state, pipeline::ScanLocalStateBase* local_state, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list>& scanners, int64_t limit_, - std::shared_ptr dependency, bool ignore_data_distribution) + std::shared_ptr dependency, bool ignore_data_distribution, + bool is_file_scan_operator) : HasTaskExecutionCtx(state), _state(state), _local_state(local_state), @@ -58,7 +59,8 @@ ScannerContext::ScannerContext( limit(limit_), _scanner_scheduler_global(state->exec_env()->scanner_scheduler()), _all_scanners(scanners.begin(), scanners.end()), - _ignore_data_distribution(ignore_data_distribution) { + _ignore_data_distribution(ignore_data_distribution), + is_file_scan_operator(is_file_scan_operator) { DCHECK(_output_row_descriptor == nullptr || _output_row_descriptor->tuple_descriptors().size() == 1); _query_id = _state->get_query_ctx()->query_id(); @@ -143,7 +145,10 @@ Status ScannerContext::init() { } // _scannner_scheduler will be used to submit scan task. - if (_scanner_scheduler->get_queue_size() * 2 > config::doris_scanner_thread_pool_queue_size) { + // file_scan_operator currentlly has performance issue if we submit too many scan tasks to scheduler. + // we should fix this problem in the future. + if (_scanner_scheduler->get_queue_size() * 2 > config::doris_scanner_thread_pool_queue_size || + is_file_scan_operator) { submit_many_scan_tasks_for_potential_performance_issue = false; } @@ -166,8 +171,9 @@ Status ScannerContext::init() { if (submit_many_scan_tasks_for_potential_performance_issue || _ignore_data_distribution) { _max_thread_num = config::doris_scanner_thread_pool_thread_num / 1; } else { - _max_thread_num = - 4 * (config::doris_scanner_thread_pool_thread_num / num_parallel_instances); + const size_t factor = is_file_scan_operator ? 1 : 4; + _max_thread_num = factor * (config::doris_scanner_thread_pool_thread_num / + num_parallel_instances); // In some rare cases, user may set num_parallel_instances to 1 handly to make many query could be executed // in parallel. We need to make sure the _max_thread_num is smaller than previous value. _max_thread_num = diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index c70313c98bca65..221167358e6f0c 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -107,7 +107,7 @@ class ScannerContext : public std::enable_shared_from_this, const RowDescriptor* output_row_descriptor, const std::list>& scanners, int64_t limit_, std::shared_ptr dependency, - bool ignore_data_distribution); + bool ignore_data_distribution, bool is_file_scan_operator); ~ScannerContext() override { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker); @@ -214,6 +214,7 @@ class ScannerContext : public std::enable_shared_from_this, QueryThreadContext _query_thread_context; std::shared_ptr _dependency = nullptr; bool _ignore_data_distribution = false; + bool is_file_scan_operator = false; // for scaling up the running scanners size_t _estimated_block_size = 0; From 2f5d51f5ff7485f08d08814ed37e6b8cee8f9bd8 Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Mon, 11 Nov 2024 10:39:05 +0800 Subject: [PATCH 2/2] REFINE --- be/src/vec/exec/scan/scanner_context.cpp | 6 +++--- be/src/vec/exec/scan/scanner_context.h | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index f13c937d99007d..d37d26b09f7815 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -60,7 +60,7 @@ ScannerContext::ScannerContext( _scanner_scheduler_global(state->exec_env()->scanner_scheduler()), _all_scanners(scanners.begin(), scanners.end()), _ignore_data_distribution(ignore_data_distribution), - is_file_scan_operator(is_file_scan_operator) { + _is_file_scan_operator(is_file_scan_operator) { DCHECK(_output_row_descriptor == nullptr || _output_row_descriptor->tuple_descriptors().size() == 1); _query_id = _state->get_query_ctx()->query_id(); @@ -148,7 +148,7 @@ Status ScannerContext::init() { // file_scan_operator currentlly has performance issue if we submit too many scan tasks to scheduler. // we should fix this problem in the future. if (_scanner_scheduler->get_queue_size() * 2 > config::doris_scanner_thread_pool_queue_size || - is_file_scan_operator) { + _is_file_scan_operator) { submit_many_scan_tasks_for_potential_performance_issue = false; } @@ -171,7 +171,7 @@ Status ScannerContext::init() { if (submit_many_scan_tasks_for_potential_performance_issue || _ignore_data_distribution) { _max_thread_num = config::doris_scanner_thread_pool_thread_num / 1; } else { - const size_t factor = is_file_scan_operator ? 1 : 4; + const size_t factor = _is_file_scan_operator ? 1 : 4; _max_thread_num = factor * (config::doris_scanner_thread_pool_thread_num / num_parallel_instances); // In some rare cases, user may set num_parallel_instances to 1 handly to make many query could be executed diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 221167358e6f0c..82e0a06799940b 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -214,7 +214,7 @@ class ScannerContext : public std::enable_shared_from_this, QueryThreadContext _query_thread_context; std::shared_ptr _dependency = nullptr; bool _ignore_data_distribution = false; - bool is_file_scan_operator = false; + bool _is_file_scan_operator = false; // for scaling up the running scanners size_t _estimated_block_size = 0;