From 6baa417077961a5877c0736c58b863c5213869a0 Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Mon, 9 Sep 2024 23:08:20 +0800 Subject: [PATCH 1/8] X --- be/src/pipeline/exec/scan_operator.cpp | 18 +--- be/src/vec/exec/scan/scanner_context.cpp | 120 +++++++++++++---------- be/src/vec/exec/scan/scanner_context.h | 10 +- 3 files changed, 79 insertions(+), 69 deletions(-) diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 0c0cfb18c77e2c..0721a0309e4366 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -117,7 +117,8 @@ Status ScanLocalState::open(RuntimeState* state) { RETURN_IF_ERROR(status); if (_scanner_ctx) { DCHECK(!_eos && _num_scanners->value() > 0); - RETURN_IF_ERROR(_scanner_ctx->init()); + RETURN_IF_ERROR( + _scanner_ctx->init(p.ignore_data_distribution(), p.is_file_scan_operator())); } _opened = true; return status; @@ -994,18 +995,9 @@ template Status ScanLocalState::_start_scanners( const std::list>& scanners) { auto& p = _parent->cast(); - _scanner_ctx = vectorized::ScannerContext::create_shared( - state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(), - state()->scan_queue_mem_limit(), _scan_dependency, - // NOTE: This will logic makes _max_thread_num of ScannerContext to be C(num of cores) * 2 - // For a query with C/2 instance and M scan node, scan task of this query will be C/2 * M * C*2 - // and will be C*C*N at most. - // 1. If data distribution is ignored , we use 1 instance to scan. - // 2. Else if this operator is not file scan operator, we use config::doris_scanner_thread_pool_thread_num scanners to scan. - // 3. Else, file scanner will consume much memory so we use config::doris_scanner_thread_pool_thread_num / query_parallel_instance_num scanners to scan. - p.ignore_data_distribution() || !p.is_file_scan_operator() - ? 1 - : state()->query_parallel_instance_num()); + _scanner_ctx = vectorized::ScannerContext::create_shared(state(), this, p._output_tuple_desc, + p.output_row_descriptor(), scanners, + p.limit(), _scan_dependency); return Status::OK(); } diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 5cc20c214c103b..609469692de692 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -42,8 +42,7 @@ ScannerContext::ScannerContext( RuntimeState* state, pipeline::ScanLocalStateBase* local_state, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list>& scanners, int64_t limit_, - int64_t max_bytes_in_blocks_queue, std::shared_ptr dependency, - const int num_parallel_instances) + std::shared_ptr dependency) : HasTaskExecutionCtx(state), _state(state), _local_state(local_state), @@ -53,53 +52,102 @@ ScannerContext::ScannerContext( _output_row_descriptor(output_row_descriptor), _batch_size(state->batch_size()), limit(limit_), - _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, (int64_t)1024) * - num_parallel_instances), _scanner_scheduler(state->exec_env()->scanner_scheduler()), - _all_scanners(scanners.begin(), scanners.end()), - _num_parallel_instances(num_parallel_instances) { + _all_scanners(scanners.begin(), scanners.end()) { DCHECK(_output_row_descriptor == nullptr || _output_row_descriptor->tuple_descriptors().size() == 1); _query_id = _state->get_query_ctx()->query_id(); ctx_id = UniqueId::gen_uid().to_string(); - // Provide more memory for wide tables, increase proportionally by multiples of 300 - _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1; - if (scanners.empty()) { - _is_finished = true; - _set_scanner_done(); - } _scanners.enqueue_bulk(scanners.begin(), scanners.size()); if (limit < 0) { limit = -1; } MAX_SCALE_UP_RATIO = _state->scanner_scale_up_ratio(); + _query_thread_context = {_query_id, _state->query_mem_tracker(), + _state->get_query_ctx()->workload_group()}; + _dependency = dependency; +} + +// After init function call, should not access _parent +Status ScannerContext::init(bool ignore_data_distribution, bool is_file_scan) { + _scanner_profile = _local_state->_scanner_profile; + _scanner_sched_counter = _local_state->_scanner_sched_counter; + _newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num; + _scanner_wait_batch_timer = _local_state->_scanner_wait_batch_timer; + _scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time; + _scale_up_scanners_counter = _local_state->_scale_up_scanners_counter; + +#ifndef BE_TEST + // 3. get thread token + if (_state->get_query_ctx()) { + thread_token = _state->get_query_ctx()->get_token(); + _simple_scan_scheduler = _state->get_query_ctx()->get_scan_scheduler(); + if (_simple_scan_scheduler) { + _should_reset_thread_name = false; + } + _remote_scan_task_scheduler = _state->get_query_ctx()->get_remote_scan_scheduler(); + } +#endif + _local_state->_runtime_profile->add_info_string("UseSpecificThreadToken", + thread_token == nullptr ? "False" : "True"); + + int num_parallel_instances = _state->query_parallel_instance_num(); + + // NOTE: When ignore_data_distribution is true or if_file_scan is false, the parallelism + // of the scan operator is regarded as 1 (actually maybe not). + // That will make the number of scan task can be submitted to the scheduler + // in a vary large value. This logicl is kept from the older implementation. + // https://github.com/apache/doris/pull/28266 + // https://github.com/apache/doris/pull/33223 + if (_ignore_data_distribution || !is_file_scan) { + num_parallel_instances = 1; + } + + // The caculation seems incorrect. When _ignore_data_distribution is true, num_parallel_instances will be 1. + // But actually we will have C*C scan tasks submited to the scheduler. + _max_bytes_in_queue = + std::max(_state->scan_queue_mem_limit(), (int64_t)1024) * num_parallel_instances; + + // Provide more memory for wide tables, increase proportionally by multiples of 300 + _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1; + + // TODO: Where is the proper position to place this code? + if (_all_scanners.empty()) { + _is_finished = true; + _set_scanner_done(); + } + // _max_thread_num controls how many scanners of this ScanOperator can be submitted to scheduler at a time. // The overall target of our system is to make full utilization of the resources. - // At the same time, we dont want too many tasks are queued by scheduler, that makes the query - // waiting too long, and existing task can not be scheduled in time. - // First of all, we try to make sure _max_thread_num of a ScanNode of a query on a single backend is less than - // config::doris_scanner_thread_pool_thread_num. + // At the same time, we dont want too many tasks are queued by scheduler, that is not necessary. + // So, first of all, we try to make sure _max_thread_num of a ScanNode of a query on a single backend is less than + // 2 * config::doris_scanner_thread_pool_thread_num, so that we can make all io threads busy. // For example, on a 64-core machine, the default value of config::doris_scanner_thread_pool_thread_num will be 64*2 =128. // and the num_parallel_instances of this scan operator will be 64/2=32. - // For a query who has two scan nodes, the _max_thread_num of each scan node instance will be 128 / 32 = 4. - // We have 32 instances of this scan operator, so for the ScanNode, we have 4 * 32 = 128 scanner tasks can be submitted at a time. - // Remember that we have to ScanNode in this query, so the total number of scanner tasks can be submitted at a time is 128 * 2 = 256. + // For a query who has one scan nodes, the _max_thread_num of each scan node instance will be 2 * 128 / 32 = 8. + // We have 32 instances of this scan operator, so for the ScanNode, we have 8 * 32 = 256 scanner tasks can be submitted at a time. + // The thread pool of scanner is 128, that means we will have 128 tasks running in parallel and another 128 tasks are waiting in the queue. + // When first 128 tasks are finished, the next 128 tasks will be extricated from the queue and be executed, + // and another 128 tasks will be submitted to the queue if there are remaining. _max_thread_num = _state->num_scanner_threads() > 0 ? _state->num_scanner_threads() : config::doris_scanner_thread_pool_thread_num / num_parallel_instances; _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num; // In some situation, there are not too many big tablets involed, so we can reduce the thread number. - _max_thread_num = std::min(_max_thread_num, (int32_t)scanners.size()); + // NOTE: when _all_scanners.size is zero, the _max_thread_num will be 0. + _max_thread_num = std::min(_max_thread_num, (int32_t)_all_scanners.size()); + // 1. Calculate max concurrency // For select * from table limit 10; should just use one thread. if (_local_state->should_run_serial()) { _max_thread_num = 1; } + // when user not specify scan_thread_num, so we can try downgrade _max_thread_num. // becaue we found in a table with 5k columns, column reader may ocuppy too much memory. // you can refer https://github.com/apache/doris/issues/35340 for details. - int32_t max_column_reader_num = state->query_options().max_column_reader_num; + int32_t max_column_reader_num = _state->query_options().max_column_reader_num; if (_max_thread_num != 1 && max_column_reader_num > 0) { int32_t scan_column_num = _output_tuple_desc->slots().size(); int32_t current_column_num = scan_column_num * _max_thread_num; @@ -109,7 +157,7 @@ ScannerContext::ScannerContext( if (new_max_thread_num < _max_thread_num) { int32_t origin_max_thread_num = _max_thread_num; _max_thread_num = new_max_thread_num; - LOG(INFO) << "downgrade query:" << print_id(state->query_id()) + LOG(INFO) << "downgrade query:" << print_id(_state->query_id()) << " scan's max_thread_num from " << origin_max_thread_num << " to " << _max_thread_num << ",column num: " << scan_column_num << ", max_column_reader_num: " << max_column_reader_num; @@ -117,35 +165,7 @@ ScannerContext::ScannerContext( } } - _query_thread_context = {_query_id, _state->query_mem_tracker(), - _state->get_query_ctx()->workload_group()}; - _dependency = dependency; -} - -// After init function call, should not access _parent -Status ScannerContext::init() { - _scanner_profile = _local_state->_scanner_profile; - _scanner_sched_counter = _local_state->_scanner_sched_counter; - _newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num; - _scanner_wait_batch_timer = _local_state->_scanner_wait_batch_timer; - _scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time; - _scale_up_scanners_counter = _local_state->_scale_up_scanners_counter; - -#ifndef BE_TEST - // 3. get thread token - if (_state->get_query_ctx()) { - thread_token = _state->get_query_ctx()->get_token(); - _simple_scan_scheduler = _state->get_query_ctx()->get_scan_scheduler(); - if (_simple_scan_scheduler) { - _should_reset_thread_name = false; - } - _remote_scan_task_scheduler = _state->get_query_ctx()->get_remote_scan_scheduler(); - } -#endif - COUNTER_SET(_local_state->_max_scanner_thread_num, (int64_t)_max_thread_num); - _local_state->_runtime_profile->add_info_string("UseSpecificThreadToken", - thread_token == nullptr ? "False" : "True"); // submit `_max_thread_num` running scanners to `ScannerScheduler` // When a running scanners is finished, it will submit one of the remaining scanners. diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index f93d01eef88427..86c71de66fcff8 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -105,9 +105,7 @@ class ScannerContext : public std::enable_shared_from_this, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list>& scanners, - int64_t limit_, int64_t max_bytes_in_blocks_queue, - std::shared_ptr dependency, - const int num_parallel_instances); + int64_t limit_, std::shared_ptr dependency); ~ScannerContext() override { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker); @@ -118,7 +116,7 @@ class ScannerContext : public std::enable_shared_from_this, } block.reset(); } - Status init(); + Status init(bool ignore_data_distribution, bool is_file_scan); vectorized::BlockUPtr get_free_block(bool force); void return_free_block(vectorized::BlockUPtr block); @@ -210,7 +208,7 @@ class ScannerContext : public std::enable_shared_from_this, int64_t limit; int32_t _max_thread_num = 0; - int64_t _max_bytes_in_queue; + int64_t _max_bytes_in_queue = 0; doris::vectorized::ScannerScheduler* _scanner_scheduler; SimplifiedScanScheduler* _simple_scan_scheduler = nullptr; SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr; @@ -220,7 +218,6 @@ class ScannerContext : public std::enable_shared_from_this, int32_t _num_running_scanners = 0; // weak pointer for _scanners, used in stop function std::vector> _all_scanners; - const int _num_parallel_instances; std::shared_ptr _scanner_profile; RuntimeProfile::Counter* _scanner_sched_counter = nullptr; RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr; @@ -229,6 +226,7 @@ class ScannerContext : public std::enable_shared_from_this, RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr; QueryThreadContext _query_thread_context; std::shared_ptr _dependency = nullptr; + bool _ignore_data_distribution = false; // for scaling up the running scanners size_t _estimated_block_size = 0; From 23a2efdb66d994b5a67844fd0d5c108835e04cfb Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Mon, 9 Sep 2024 23:12:36 +0800 Subject: [PATCH 2/8] REF --- be/src/vec/exec/scan/scanner_context.h | 1 - 1 file changed, 1 deletion(-) diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 86c71de66fcff8..44d87d79d9d6f2 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -226,7 +226,6 @@ class ScannerContext : public std::enable_shared_from_this, RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr; QueryThreadContext _query_thread_context; std::shared_ptr _dependency = nullptr; - bool _ignore_data_distribution = false; // for scaling up the running scanners size_t _estimated_block_size = 0; From d37aa09505d072baf025b40ad841583cb44cb50f Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Mon, 9 Sep 2024 23:20:45 +0800 Subject: [PATCH 3/8] X --- be/src/vec/exec/scan/scanner_context.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 609469692de692..abd83afd4974c8 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -99,7 +99,7 @@ Status ScannerContext::init(bool ignore_data_distribution, bool is_file_scan) { // in a vary large value. This logicl is kept from the older implementation. // https://github.com/apache/doris/pull/28266 // https://github.com/apache/doris/pull/33223 - if (_ignore_data_distribution || !is_file_scan) { + if (ignore_data_distribution || !is_file_scan) { num_parallel_instances = 1; } From e91aa7d91e847f6557823c3e1135a90ad1c575c2 Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Tue, 10 Sep 2024 16:28:46 +0800 Subject: [PATCH 4/8] X --- be/src/pipeline/exec/scan_operator.cpp | 3 +-- be/src/vec/exec/scan/scanner_context.cpp | 12 ++++++------ be/src/vec/exec/scan/scanner_context.h | 2 +- .../java/org/apache/doris/qe/SessionVariable.java | 5 ++++- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 0721a0309e4366..dbf48749b57baf 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -117,8 +117,7 @@ Status ScanLocalState::open(RuntimeState* state) { RETURN_IF_ERROR(status); if (_scanner_ctx) { DCHECK(!_eos && _num_scanners->value() > 0); - RETURN_IF_ERROR( - _scanner_ctx->init(p.ignore_data_distribution(), p.is_file_scan_operator())); + RETURN_IF_ERROR(_scanner_ctx->init(p.ignore_data_distribution())); } _opened = true; return status; diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index abd83afd4974c8..59f34aec20f0ee 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -69,7 +69,7 @@ ScannerContext::ScannerContext( } // After init function call, should not access _parent -Status ScannerContext::init(bool ignore_data_distribution, bool is_file_scan) { +Status ScannerContext::init(bool ignore_data_distribution) { _scanner_profile = _local_state->_scanner_profile; _scanner_sched_counter = _local_state->_scanner_sched_counter; _newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num; @@ -98,15 +98,15 @@ Status ScannerContext::init(bool ignore_data_distribution, bool is_file_scan) { // That will make the number of scan task can be submitted to the scheduler // in a vary large value. This logicl is kept from the older implementation. // https://github.com/apache/doris/pull/28266 - // https://github.com/apache/doris/pull/33223 - if (ignore_data_distribution || !is_file_scan) { + if (ignore_data_distribution) { num_parallel_instances = 1; } - // The caculation seems incorrect. When _ignore_data_distribution is true, num_parallel_instances will be 1. - // But actually we will have C*C scan tasks submited to the scheduler. + // _max_bytes_in_queue controls the maximum memory that can be used by a single scan instance. + // scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value + // is larger than 10MB. _max_bytes_in_queue = - std::max(_state->scan_queue_mem_limit(), (int64_t)1024) * num_parallel_instances; + std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 1024 * 10); // Provide more memory for wide tables, increase proportionally by multiples of 300 _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1; diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 44d87d79d9d6f2..6a710efbf6bde5 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -116,7 +116,7 @@ class ScannerContext : public std::enable_shared_from_this, } block.reset(); } - Status init(bool ignore_data_distribution, bool is_file_scan); + Status init(bool ignore_data_distribution); vectorized::BlockUPtr get_free_block(bool force); void return_free_block(vectorized::BlockUPtr block); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 8db24006293249..0cb87ee7fa2513 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -693,7 +693,10 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT) public long maxExecMemByte = 2147483648L; - @VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT) + @VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT, + description = {"每个 Scan Instance 的 block queue 能够保存多少字节的 block", + "How many bytes of block can be saved in the block queue of each Scan Instance"}) + // 100MB public long maxScanQueueMemByte = 2147483648L / 20; @VariableMgr.VarAttr(name = NUM_SCANNER_THREADS, needForward = true, description = { From 76ae1294a554762bc53fb9c75a75184c630ba7e6 Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Tue, 10 Sep 2024 17:02:05 +0800 Subject: [PATCH 5/8] FIX --- be/src/vec/exec/scan/scanner_context.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 59f34aec20f0ee..f9925c1e1aa9f2 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -93,7 +93,7 @@ Status ScannerContext::init(bool ignore_data_distribution) { int num_parallel_instances = _state->query_parallel_instance_num(); - // NOTE: When ignore_data_distribution is true or if_file_scan is false, the parallelism + // NOTE: When ignore_data_distribution is true, the parallelism // of the scan operator is regarded as 1 (actually maybe not). // That will make the number of scan task can be submitted to the scheduler // in a vary large value. This logicl is kept from the older implementation. @@ -106,7 +106,7 @@ Status ScannerContext::init(bool ignore_data_distribution) { // scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value // is larger than 10MB. _max_bytes_in_queue = - std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 1024 * 10); + std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10); // Provide more memory for wide tables, increase proportionally by multiples of 300 _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1; @@ -132,7 +132,7 @@ Status ScannerContext::init(bool ignore_data_distribution) { _max_thread_num = _state->num_scanner_threads() > 0 ? _state->num_scanner_threads() - : config::doris_scanner_thread_pool_thread_num / num_parallel_instances; + : 2 * (config::doris_scanner_thread_pool_thread_num / num_parallel_instances); _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num; // In some situation, there are not too many big tablets involed, so we can reduce the thread number. // NOTE: when _all_scanners.size is zero, the _max_thread_num will be 0. From d6b88d80c718ed34d8cafad084da48f58708d37c Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Wed, 11 Sep 2024 00:24:22 +0800 Subject: [PATCH 6/8] X --- be/src/pipeline/exec/scan_operator.cpp | 8 ++++---- be/src/vec/exec/scan/scanner_context.cpp | 12 ++++++------ be/src/vec/exec/scan/scanner_context.h | 6 ++++-- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index dbf48749b57baf..c6bfbf0825cc9d 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -117,7 +117,7 @@ Status ScanLocalState::open(RuntimeState* state) { RETURN_IF_ERROR(status); if (_scanner_ctx) { DCHECK(!_eos && _num_scanners->value() > 0); - RETURN_IF_ERROR(_scanner_ctx->init(p.ignore_data_distribution())); + RETURN_IF_ERROR(_scanner_ctx->init()); } _opened = true; return status; @@ -994,9 +994,9 @@ template Status ScanLocalState::_start_scanners( const std::list>& scanners) { auto& p = _parent->cast(); - _scanner_ctx = vectorized::ScannerContext::create_shared(state(), this, p._output_tuple_desc, - p.output_row_descriptor(), scanners, - p.limit(), _scan_dependency); + _scanner_ctx = vectorized::ScannerContext::create_shared( + state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(), + _scan_dependency, _p.ignore_data_distribution); return Status::OK(); } diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index f9925c1e1aa9f2..404e204c3fa45f 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -42,7 +42,7 @@ ScannerContext::ScannerContext( RuntimeState* state, pipeline::ScanLocalStateBase* local_state, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list>& scanners, int64_t limit_, - std::shared_ptr dependency) + std::shared_ptr dependency, bool ignore_data_distribution) : HasTaskExecutionCtx(state), _state(state), _local_state(local_state), @@ -53,7 +53,8 @@ ScannerContext::ScannerContext( _batch_size(state->batch_size()), limit(limit_), _scanner_scheduler(state->exec_env()->scanner_scheduler()), - _all_scanners(scanners.begin(), scanners.end()) { + _all_scanners(scanners.begin(), scanners.end()), + _ignore_data_distribution(ignore_data_distribution) { DCHECK(_output_row_descriptor == nullptr || _output_row_descriptor->tuple_descriptors().size() == 1); _query_id = _state->get_query_ctx()->query_id(); @@ -69,7 +70,7 @@ ScannerContext::ScannerContext( } // After init function call, should not access _parent -Status ScannerContext::init(bool ignore_data_distribution) { +Status ScannerContext::init() { _scanner_profile = _local_state->_scanner_profile; _scanner_sched_counter = _local_state->_scanner_sched_counter; _newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num; @@ -98,15 +99,14 @@ Status ScannerContext::init(bool ignore_data_distribution) { // That will make the number of scan task can be submitted to the scheduler // in a vary large value. This logicl is kept from the older implementation. // https://github.com/apache/doris/pull/28266 - if (ignore_data_distribution) { + if (_ignore_data_distribution) { num_parallel_instances = 1; } // _max_bytes_in_queue controls the maximum memory that can be used by a single scan instance. // scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value // is larger than 10MB. - _max_bytes_in_queue = - std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10); + _max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10); // Provide more memory for wide tables, increase proportionally by multiples of 300 _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1; diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 6a710efbf6bde5..70649f1c1266c4 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -105,7 +105,8 @@ class ScannerContext : public std::enable_shared_from_this, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list>& scanners, - int64_t limit_, std::shared_ptr dependency); + int64_t limit_, std::shared_ptr dependency, + bool ignore_data_distribution); ~ScannerContext() override { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker); @@ -116,7 +117,7 @@ class ScannerContext : public std::enable_shared_from_this, } block.reset(); } - Status init(bool ignore_data_distribution); + Status init(); vectorized::BlockUPtr get_free_block(bool force); void return_free_block(vectorized::BlockUPtr block); @@ -226,6 +227,7 @@ class ScannerContext : public std::enable_shared_from_this, RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr; QueryThreadContext _query_thread_context; std::shared_ptr _dependency = nullptr; + bool _ignore_data_distribution = false; // for scaling up the running scanners size_t _estimated_block_size = 0; From 0e051e5aa8f138be74032b9bd74e9b2afc6dfd62 Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Wed, 11 Sep 2024 00:45:52 +0800 Subject: [PATCH 7/8] X --- be/src/pipeline/exec/scan_operator.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index c6bfbf0825cc9d..adae4912af9d91 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -996,7 +996,7 @@ Status ScanLocalState::_start_scanners( auto& p = _parent->cast(); _scanner_ctx = vectorized::ScannerContext::create_shared( state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(), - _scan_dependency, _p.ignore_data_distribution); + _scan_dependency, p.ignore_data_distribution); return Status::OK(); } From aa5f88250f276aa60acfb974bc9540d7a2baa365 Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Wed, 11 Sep 2024 00:46:19 +0800 Subject: [PATCH 8/8] X --- be/src/pipeline/exec/scan_operator.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index adae4912af9d91..eb30d62495d485 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -996,7 +996,7 @@ Status ScanLocalState::_start_scanners( auto& p = _parent->cast(); _scanner_ctx = vectorized::ScannerContext::create_shared( state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(), - _scan_dependency, p.ignore_data_distribution); + _scan_dependency, p.ignore_data_distribution()); return Status::OK(); }