diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index f55ef7da36981c..21c3103fe5a708 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -990,7 +990,7 @@ Status ScanLocalState::_start_scanners( auto& p = _parent->cast(); _scanner_ctx = vectorized::ScannerContext::create_shared( state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(), - _scan_dependency, p.is_serial_operator()); + _scan_dependency, p.is_serial_operator(), p.is_file_scan_operator()); return Status::OK(); } diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index bea222bd0f35b0..d37d26b09f7815 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -46,7 +46,8 @@ ScannerContext::ScannerContext( RuntimeState* state, pipeline::ScanLocalStateBase* local_state, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list>& scanners, int64_t limit_, - std::shared_ptr dependency, bool ignore_data_distribution) + std::shared_ptr dependency, bool ignore_data_distribution, + bool is_file_scan_operator) : HasTaskExecutionCtx(state), _state(state), _local_state(local_state), @@ -58,7 +59,8 @@ ScannerContext::ScannerContext( limit(limit_), _scanner_scheduler_global(state->exec_env()->scanner_scheduler()), _all_scanners(scanners.begin(), scanners.end()), - _ignore_data_distribution(ignore_data_distribution) { + _ignore_data_distribution(ignore_data_distribution), + _is_file_scan_operator(is_file_scan_operator) { DCHECK(_output_row_descriptor == nullptr || _output_row_descriptor->tuple_descriptors().size() == 1); _query_id = _state->get_query_ctx()->query_id(); @@ -143,7 +145,10 @@ Status ScannerContext::init() { } // _scannner_scheduler will be used to submit scan task. - if (_scanner_scheduler->get_queue_size() * 2 > config::doris_scanner_thread_pool_queue_size) { + // file_scan_operator currentlly has performance issue if we submit too many scan tasks to scheduler. + // we should fix this problem in the future. + if (_scanner_scheduler->get_queue_size() * 2 > config::doris_scanner_thread_pool_queue_size || + _is_file_scan_operator) { submit_many_scan_tasks_for_potential_performance_issue = false; } @@ -166,8 +171,9 @@ Status ScannerContext::init() { if (submit_many_scan_tasks_for_potential_performance_issue || _ignore_data_distribution) { _max_thread_num = config::doris_scanner_thread_pool_thread_num / 1; } else { - _max_thread_num = - 4 * (config::doris_scanner_thread_pool_thread_num / num_parallel_instances); + const size_t factor = _is_file_scan_operator ? 1 : 4; + _max_thread_num = factor * (config::doris_scanner_thread_pool_thread_num / + num_parallel_instances); // In some rare cases, user may set num_parallel_instances to 1 handly to make many query could be executed // in parallel. We need to make sure the _max_thread_num is smaller than previous value. _max_thread_num = diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index c70313c98bca65..82e0a06799940b 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -107,7 +107,7 @@ class ScannerContext : public std::enable_shared_from_this, const RowDescriptor* output_row_descriptor, const std::list>& scanners, int64_t limit_, std::shared_ptr dependency, - bool ignore_data_distribution); + bool ignore_data_distribution, bool is_file_scan_operator); ~ScannerContext() override { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker); @@ -214,6 +214,7 @@ class ScannerContext : public std::enable_shared_from_this, QueryThreadContext _query_thread_context; std::shared_ptr _dependency = nullptr; bool _ignore_data_distribution = false; + bool _is_file_scan_operator = false; // for scaling up the running scanners size_t _estimated_block_size = 0;