From 4abd4f493eee2bf82a5bf687a8b5a7915ff71c6d Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Mon, 28 Oct 2024 17:52:47 +0800 Subject: [PATCH 1/4] NED --- be/src/pipeline/exec/scan_operator.cpp | 2 +- be/src/vec/exec/scan/scanner_context.cpp | 14 +++++++++----- be/src/vec/exec/scan/scanner_context.h | 3 ++- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 7e91b501ab6c6a..b631d1270328a0 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -1040,7 +1040,7 @@ Status ScanLocalState::_start_scanners( auto& p = _parent->cast(); _scanner_ctx = PipXScannerContext::create_shared( state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(), - _scan_dependency, p.ignore_data_distribution()); + _scan_dependency, p.ignore_data_distribution(), p.is_file_scan_operator()); return Status::OK(); } diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 5c71f7f7cb723e..13ded90872535d 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -43,7 +43,7 @@ using namespace std::chrono_literals; ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list>& scanners, - int64_t limit_, bool ignore_data_distribution, + int64_t limit_, bool ignore_data_distribution, bool is_file_scan_operator, pipeline::ScanLocalStateBase* local_state) : HasTaskExecutionCtx(state), _state(state), @@ -56,7 +56,8 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu limit(limit_), _scanner_scheduler_global(state->exec_env()->scanner_scheduler()), _all_scanners(scanners.begin(), scanners.end()), - _ignore_data_distribution(ignore_data_distribution) { + _ignore_data_distribution(ignore_data_distribution), + _is_file_scan_operator(is_file_scan_operator) { DCHECK(_output_row_descriptor == nullptr || _output_row_descriptor->tuple_descriptors().size() == 1); _query_id = _state->get_query_ctx()->query_id(); @@ -160,7 +161,8 @@ Status ScannerContext::init() { } // _scannner_scheduler will be used to submit scan task. - if (_scanner_scheduler->get_queue_size() * 2 > config::doris_scanner_thread_pool_queue_size) { + if (_scanner_scheduler->get_queue_size() * 2 > config::doris_scanner_thread_pool_queue_size || + _is_file_scan_operator) { submit_many_scan_tasks_for_potential_performance_issue = false; } @@ -183,8 +185,10 @@ Status ScannerContext::init() { if (submit_many_scan_tasks_for_potential_performance_issue || _ignore_data_distribution) { _max_thread_num = config::doris_scanner_thread_pool_thread_num / 1; } else { - _max_thread_num = - 4 * (config::doris_scanner_thread_pool_thread_num / num_parallel_instances); + const int scale_arg = _is_file_scan_operator ? 1 : 4; + _max_thread_num = scale_arg * (config::doris_scanner_thread_pool_thread_num / + num_parallel_instances); + // In some rare cases, user may set num_parallel_instances to 1 handly to make many query could be executed // in parallel. We need to make sure the _max_thread_num is smaller than previous value. _max_thread_num = diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 449a5b1b470de0..ddce8268f9dab7 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -107,7 +107,7 @@ class ScannerContext : public std::enable_shared_from_this, ScannerContext(RuntimeState* state, VScanNode* parent, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list>& scanners, int64_t limit_, - bool ignore_data_distribution, + bool ignore_data_distribution, bool is_file_scan_operator, pipeline::ScanLocalStateBase* local_state = nullptr); ~ScannerContext() override { @@ -234,6 +234,7 @@ class ScannerContext : public std::enable_shared_from_this, RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr; QueryThreadContext _query_thread_context; bool _ignore_data_distribution = false; + bool _is_file_scan_operator; // for scaling up the running scanners size_t _estimated_block_size = 0; From 989b3805796b302414fb83b6622e4ce890ce9db9 Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Mon, 28 Oct 2024 17:58:52 +0800 Subject: [PATCH 2/4] FMT --- be/src/vec/exec/scan/scanner_context.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 13ded90872535d..a913670379d10a 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -43,7 +43,8 @@ using namespace std::chrono_literals; ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list>& scanners, - int64_t limit_, bool ignore_data_distribution, bool is_file_scan_operator, + int64_t limit_, bool ignore_data_distribution, + bool is_file_scan_operator, pipeline::ScanLocalStateBase* local_state) : HasTaskExecutionCtx(state), _state(state), From 217e8c34c703ade44559a3f1c65c04eda191a78f Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Mon, 28 Oct 2024 23:18:48 +0800 Subject: [PATCH 3/4] FIX COMPILE --- be/src/vec/exec/scan/pip_scanner_context.h | 21 +++++++++++++++++---- be/src/vec/exec/scan/scanner_context.h | 3 ++- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index b7684ac5fe3750..b0bbaa76f26706 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -32,8 +32,20 @@ class PipScannerContext final : public vectorized::ScannerContext { const RowDescriptor* output_row_descriptor, const std::list>& scanners, int64_t limit_, bool ignore_data_distribution) - : vectorized::ScannerContext(state, parent, output_tuple_desc, output_row_descriptor, - scanners, limit_, ignore_data_distribution) {} + : vectorized::ScannerContext( + state, parent, output_tuple_desc, output_row_descriptor, scanners, limit_, + ignore_data_distribution, + /*non-pipeine & old pipeine does not process file scan operator seperatyly*/ + /*they use state->query_parallel_instance_num() as num_parallel_instances, see: + _max_thread_num = _state->num_scanner_threads() > 0 + ? _state->num_scanner_threads() + : config::doris_scanner_thread_pool_thread_num / + (_local_state ? num_parallel_instances + : state->query_parallel_instance_num()); + */ + // so we set is_file_scan_operator to true + // so that _max_thread_num will be same like before for engine except for pipelineX + true) {} }; class PipXScannerContext final : public vectorized::ScannerContext { @@ -45,9 +57,10 @@ class PipXScannerContext final : public vectorized::ScannerContext { const RowDescriptor* output_row_descriptor, const std::list>& scanners, int64_t limit_, std::shared_ptr dependency, - bool ignore_data_distribution) + bool ignore_data_distribution, bool is_file_scan_operator) : vectorized::ScannerContext(state, output_tuple_desc, output_row_descriptor, scanners, - limit_, ignore_data_distribution, local_state) { + limit_, ignore_data_distribution, is_file_scan_operator, + local_state) { _dependency = dependency; } diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index ddce8268f9dab7..681ee68bf38193 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -183,7 +183,8 @@ class ScannerContext : public std::enable_shared_from_this, ScannerContext(RuntimeState* state_, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list>& scanners_, int64_t limit_, - bool ignore_data_distribution, pipeline::ScanLocalStateBase* local_state); + bool ignore_data_distribution, bool is_file_scan_operator, + pipeline::ScanLocalStateBase* local_state); /// Four criteria to determine whether to increase the parallelism of the scanners /// 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up From f4d462968f2375c3b54d83e0a72c9065d9ed4f55 Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Tue, 29 Oct 2024 10:16:22 +0800 Subject: [PATCH 4/4] NEED --- be/src/vec/exec/scan/scanner_context.cpp | 3 ++- be/src/vec/exec/scan/vscan_node.cpp | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index a913670379d10a..ee34c5fb774202 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -79,9 +79,10 @@ ScannerContext::ScannerContext(doris::RuntimeState* state, doris::vectorized::VS const RowDescriptor* output_row_descriptor, const std::list>& scanners, int64_t limit_, bool ignore_data_distribution, + bool is_file_scan_operator, pipeline::ScanLocalStateBase* local_state) : ScannerContext(state, output_tuple_desc, output_row_descriptor, scanners, limit_, - ignore_data_distribution, local_state) { + ignore_data_distribution, is_file_scan_operator, local_state) { _parent = parent; // No need to increase scanner_ctx_cnt here. Since other constructor has already done it. diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index a7842188feb575..8348641e5ce0fe 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -321,7 +321,7 @@ void VScanNode::_start_scanners(const std::list } else { _scanner_ctx = ScannerContext::create_shared(_state, this, _output_tuple_desc, _output_row_descriptor.get(), scanners, - limit(), false); + limit(), false, true); } }