From 6c5ab11a0fddd6d1205dc8ed17127130d8f95700 Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Thu, 17 Oct 2024 17:42:57 +0800 Subject: [PATCH 1/2] NEED --- be/src/pipeline/exec/scan_operator.cpp | 10 +- be/src/vec/exec/scan/pip_scanner_context.h | 12 +- be/src/vec/exec/scan/scanner_context.cpp | 202 +++++++++++------- be/src/vec/exec/scan/scanner_context.h | 17 +- be/src/vec/exec/scan/scanner_scheduler.cpp | 8 +- be/src/vec/exec/scan/scanner_scheduler.h | 6 + be/src/vec/exec/scan/vscan_node.cpp | 15 +- .../org/apache/doris/qe/SessionVariable.java | 5 +- 8 files changed, 161 insertions(+), 114 deletions(-) diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 9f3d0111c2ff18..2c3e85b972b054 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -1040,15 +1040,7 @@ Status ScanLocalState::_start_scanners( auto& p = _parent->cast(); _scanner_ctx = PipXScannerContext::create_shared( state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(), - state()->scan_queue_mem_limit(), _scan_dependency, - // NOTE: This will logic makes _max_thread_num of ScannerContext to be C(num of cores) * 2 - // For a query with C/2 instance and M scan node, scan task of this query will be C/2 * M * C*2 - // and will be C*C*N at most. - // 1. If data distribution is ignored , we use 1 instance to scan. - // 2. Else, file scanner will consume much memory so we use config::doris_scanner_thread_pool_thread_num / query_parallel_instance_num scanners to scan. - p.ignore_data_distribution() && !p.is_file_scan_operator() - ? 1 - : state()->query_parallel_instance_num()); + _scan_dependency, p.ignore_data_distribution()); return Status::OK(); } diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index e93e9956ff4096..96579d67b26ab1 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -31,11 +31,9 @@ class PipScannerContext final : public vectorized::ScannerContext { const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list>& scanners, - int64_t limit_, int64_t max_bytes_in_blocks_queue, - const int num_parallel_instances) + int64_t limit_, bool ignore_data_distribution) : vectorized::ScannerContext(state, parent, output_tuple_desc, output_row_descriptor, - scanners, limit_, max_bytes_in_blocks_queue, - num_parallel_instances) {} + scanners, limit_, ignore_data_distribution) {} }; class PipXScannerContext final : public vectorized::ScannerContext { @@ -46,11 +44,11 @@ class PipXScannerContext final : public vectorized::ScannerContext { const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list>& scanners, - int64_t limit_, int64_t max_bytes_in_blocks_queue, + int64_t limit_, std::shared_ptr dependency, - const int num_parallel_instances) + bool ignore_data_distribution) : vectorized::ScannerContext(state, output_tuple_desc, output_row_descriptor, scanners, - limit_, max_bytes_in_blocks_queue, num_parallel_instances, + limit_, ignore_data_distribution, local_state) { _dependency = dependency; } diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 0fa6c1dae1e452..5c71f7f7cb723e 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -33,6 +33,7 @@ #include "runtime/runtime_state.h" #include "util/uid_util.h" #include "vec/core/block.h" +#include "vec/exec/scan/scanner_scheduler.h" #include "vec/exec/scan/vscan_node.h" namespace doris::vectorized { @@ -42,8 +43,7 @@ using namespace std::chrono_literals; ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list>& scanners, - int64_t limit_, int64_t max_bytes_in_blocks_queue, - const int num_parallel_instances, + int64_t limit_, bool ignore_data_distribution, pipeline::ScanLocalStateBase* local_state) : HasTaskExecutionCtx(state), _state(state), @@ -54,93 +54,35 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu _output_row_descriptor(output_row_descriptor), _batch_size(state->batch_size()), limit(limit_), - _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, (int64_t)1024) * - num_parallel_instances), - _scanner_scheduler(state->exec_env()->scanner_scheduler()), + _scanner_scheduler_global(state->exec_env()->scanner_scheduler()), _all_scanners(scanners.begin(), scanners.end()), - _num_parallel_instances(num_parallel_instances) { + _ignore_data_distribution(ignore_data_distribution) { DCHECK(_output_row_descriptor == nullptr || _output_row_descriptor->tuple_descriptors().size() == 1); _query_id = _state->get_query_ctx()->query_id(); ctx_id = UniqueId::gen_uid().to_string(); - // Provide more memory for wide tables, increase proportionally by multiples of 300 - _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1; - if (scanners.empty()) { - _is_finished = true; - _set_scanner_done(); - } _scanners.enqueue_bulk(scanners.begin(), scanners.size()); if (limit < 0) { limit = -1; } MAX_SCALE_UP_RATIO = _state->scanner_scale_up_ratio(); - // _max_thread_num controls how many scanners of this ScanOperator can be submitted to scheduler at a time. - // The overall target of our system is to make full utilization of the resources. - // At the same time, we dont want too many tasks are queued by scheduler, that makes the query - // waiting too long, and existing task can not be scheduled in time. - // First of all, we try to make sure _max_thread_num of a ScanNode of a query on a single backend is less than - // config::doris_scanner_thread_pool_thread_num. - // For example, on a 64-core machine, the default value of config::doris_scanner_thread_pool_thread_num will be 64*2 =128. - // and the num_parallel_instances of this scan operator will be 64/2=32. - // For a query who has two scan nodes, the _max_thread_num of each scan node instance will be 128 / 32 = 4. - // We have 32 instances of this scan operator, so for the ScanNode, we have 4 * 32 = 128 scanner tasks can be submitted at a time. - // Remember that we have to ScanNode in this query, so the total number of scanner tasks can be submitted at a time is 128 * 2 = 256. - _max_thread_num = _state->num_scanner_threads() > 0 - ? _state->num_scanner_threads() - : config::doris_scanner_thread_pool_thread_num / - (_local_state ? num_parallel_instances - : state->query_parallel_instance_num()); - _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num; - // In some situation, there are not too many big tablets involed, so we can reduce the thread number. - _max_thread_num = std::min(_max_thread_num, (int32_t)scanners.size()); - // 1. Calculate max concurrency - // For select * from table limit 10; should just use one thread. - if (_local_state && _local_state->should_run_serial()) { - _max_thread_num = 1; - } - // when user not specify scan_thread_num, so we can try downgrade _max_thread_num. - // becaue we found in a table with 5k columns, column reader may ocuppy too much memory. - // you can refer https://github.com/apache/doris/issues/35340 for details. - int32_t max_column_reader_num = state->query_options().max_column_reader_num; - if (_max_thread_num != 1 && max_column_reader_num > 0) { - int32_t scan_column_num = _output_tuple_desc->slots().size(); - int32_t current_column_num = scan_column_num * _max_thread_num; - if (current_column_num > max_column_reader_num) { - int32_t new_max_thread_num = max_column_reader_num / scan_column_num; - new_max_thread_num = new_max_thread_num <= 0 ? 1 : new_max_thread_num; - if (new_max_thread_num < _max_thread_num) { - int32_t origin_max_thread_num = _max_thread_num; - _max_thread_num = new_max_thread_num; - LOG(INFO) << "downgrade query:" << print_id(state->query_id()) - << " scan's max_thread_num from " << origin_max_thread_num << " to " - << _max_thread_num << ",column num: " << scan_column_num - << ", max_column_reader_num: " << max_column_reader_num; - } - } - } - - // 1. Calculate max concurrency - // For select * from table limit 10; should just use one thread. - if ((_parent && _parent->should_run_serial()) || - (_local_state && _local_state->should_run_serial())) { - _max_thread_num = 1; - } _query_thread_context = {_query_id, _state->query_mem_tracker(), _state->get_query_ctx()->workload_group()}; + + DorisMetrics::instance()->scanner_ctx_cnt->increment(1); } ScannerContext::ScannerContext(doris::RuntimeState* state, doris::vectorized::VScanNode* parent, const doris::TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list>& scanners, - int64_t limit_, int64_t max_bytes_in_blocks_queue, - const int num_parallel_instances, + int64_t limit_, bool ignore_data_distribution, pipeline::ScanLocalStateBase* local_state) : ScannerContext(state, output_tuple_desc, output_row_descriptor, scanners, limit_, - max_bytes_in_blocks_queue, num_parallel_instances, local_state) { + ignore_data_distribution, local_state) { _parent = parent; - DorisMetrics::instance()->scanner_ctx_cnt->increment(1); + // No need to increase scanner_ctx_cnt here. Since other constructor has already done it. } // After init function call, should not access _parent @@ -163,16 +105,126 @@ Status ScannerContext::init() { #ifndef BE_TEST // 3. get thread token - if (_state->get_query_ctx()) { - thread_token = _state->get_query_ctx()->get_token(); - _simple_scan_scheduler = _state->get_query_ctx()->get_scan_scheduler(); - if (_simple_scan_scheduler) { - _should_reset_thread_name = false; - } - _remote_scan_task_scheduler = _state->get_query_ctx()->get_remote_scan_scheduler(); + if (!_state->get_query_ctx()) { + return Status::InternalError("Query context of {} is not set", + print_id(_state->query_id())); + } + + thread_token = _state->get_query_ctx()->get_token(); + + if (_state->get_query_ctx()->get_scan_scheduler()) { + _should_reset_thread_name = false; } #endif + const int num_parallel_instances = _state->query_parallel_instance_num(); + + // _max_bytes_in_queue controls the maximum memory that can be used by a single scan instance. + // scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value + // is larger than 10MB. + _max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10); + + // Provide more memory for wide tables, increase proportionally by multiples of 300 + _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1; + + // TODO: Where is the proper position to place this code? + if (_all_scanners.empty()) { + _is_finished = true; + _set_scanner_done(); + } + + // This is a track implementation. + // The logic is kept only for the purpose of the potential performance issue. + bool submit_many_scan_tasks_for_potential_performance_issue = true; + auto scanner = _all_scanners.front().lock(); + DCHECK(scanner != nullptr); + // A query could have remote scan task and local scan task at the same time. + // So we need to compute the _scanner_scheduler in each scan operator instead of query context. + SimplifiedScanScheduler* simple_scan_scheduler = _state->get_query_ctx()->get_scan_scheduler(); + SimplifiedScanScheduler* remote_scan_task_scheduler = + _state->get_query_ctx()->get_remote_scan_scheduler(); + if (scanner->_scanner->get_storage_type() == TabletStorageType::STORAGE_TYPE_LOCAL) { + // scan_scheduler could be empty if query does not have a workload group. + if (simple_scan_scheduler) { + _scanner_scheduler = simple_scan_scheduler; + } else { + _scanner_scheduler = _scanner_scheduler_global->get_local_scan_thread_pool(); + } + } else { + // remote_scan_task_scheduler could be empty if query does not have a workload group. + if (remote_scan_task_scheduler) { + _scanner_scheduler = remote_scan_task_scheduler; + } else { + _scanner_scheduler = _scanner_scheduler_global->get_remote_scan_thread_pool(); + } + } + + // _scannner_scheduler will be used to submit scan task. + if (_scanner_scheduler->get_queue_size() * 2 > config::doris_scanner_thread_pool_queue_size) { + submit_many_scan_tasks_for_potential_performance_issue = false; + } + + // _max_thread_num controls how many scanners of this ScanOperator can be submitted to scheduler at a time. + // The overall target of our system is to make full utilization of the resources. + // At the same time, we dont want too many tasks are queued by scheduler, that is not necessary. + // So, first of all, we try to make sure _max_thread_num of a ScanNode of a query on a single backend is less than + // 2 * config::doris_scanner_thread_pool_thread_num, so that we can make all io threads busy. + // For example, on a 64-core machine, the default value of config::doris_scanner_thread_pool_thread_num will be 64*2 =128. + // and the num_parallel_instances of this scan operator will be 64/2=32. + // For a query who has one scan nodes, the _max_thread_num of each scan node instance will be 4 * 128 / 32 = 16. + // We have 32 instances of this scan operator, so for the ScanNode, we have 16 * 32 = 8 * 64 = 512 scanner tasks can be submitted at a time. + _max_thread_num = _state->num_scanner_threads() > 0 ? _state->num_scanner_threads() : 0; + + if (_max_thread_num == 0) { + // NOTE: When ignore_data_distribution is true, the parallelism + // of the scan operator is regarded as 1 (actually maybe not). + // That will make the number of scan task can be submitted to the scheduler + // in a vary large value. This logicl is kept from the older implementation. + if (submit_many_scan_tasks_for_potential_performance_issue || _ignore_data_distribution) { + _max_thread_num = config::doris_scanner_thread_pool_thread_num / 1; + } else { + _max_thread_num = + 4 * (config::doris_scanner_thread_pool_thread_num / num_parallel_instances); + // In some rare cases, user may set num_parallel_instances to 1 handly to make many query could be executed + // in parallel. We need to make sure the _max_thread_num is smaller than previous value. + _max_thread_num = + std::min(_max_thread_num, config::doris_scanner_thread_pool_thread_num); + } + } + + _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num; + // In some situation, there are not too many big tablets involed, so we can reduce the thread number. + // NOTE: when _all_scanners.size is zero, the _max_thread_num will be 0. + _max_thread_num = std::min(_max_thread_num, (int32_t)_all_scanners.size()); + + // 1. Calculate max concurrency + // For select * from table limit 10; should just use one thread. + if ((_parent && _parent->should_run_serial()) || + (_local_state && _local_state->should_run_serial())) { + _max_thread_num = 1; + } + + // when user not specify scan_thread_num, so we can try downgrade _max_thread_num. + // becaue we found in a table with 5k columns, column reader may ocuppy too much memory. + // you can refer https://github.com/apache/doris/issues/35340 for details. + int32_t max_column_reader_num = _state->query_options().max_column_reader_num; + if (_max_thread_num != 1 && max_column_reader_num > 0) { + int32_t scan_column_num = _output_tuple_desc->slots().size(); + int32_t current_column_num = scan_column_num * _max_thread_num; + if (current_column_num > max_column_reader_num) { + int32_t new_max_thread_num = max_column_reader_num / scan_column_num; + new_max_thread_num = new_max_thread_num <= 0 ? 1 : new_max_thread_num; + if (new_max_thread_num < _max_thread_num) { + int32_t origin_max_thread_num = _max_thread_num; + _max_thread_num = new_max_thread_num; + LOG(INFO) << "downgrade query:" << print_id(_state->query_id()) + << " scan's max_thread_num from " << origin_max_thread_num << " to " + << _max_thread_num << ",column num: " << scan_column_num + << ", max_column_reader_num: " << max_column_reader_num; + } + } + } + if (_parent) { COUNTER_SET(_parent->_max_scanner_thread_num, (int64_t)_max_thread_num); _parent->_runtime_profile->add_info_string("UseSpecificThreadToken", @@ -237,7 +289,7 @@ bool ScannerContext::empty_in_queue(int id) { Status ScannerContext::submit_scan_task(std::shared_ptr scan_task) { _scanner_sched_counter->update(1); _num_scheduled_scanners++; - return _scanner_scheduler->submit(shared_from_this(), scan_task); + return _scanner_scheduler_global->submit(shared_from_this(), scan_task); } void ScannerContext::append_block_to_queue(std::shared_ptr scan_task) { diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index fe72b740a1b239..cb374bdcb8deb2 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -107,7 +107,7 @@ class ScannerContext : public std::enable_shared_from_this, ScannerContext(RuntimeState* state, VScanNode* parent, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list>& scanners, int64_t limit_, - int64_t max_bytes_in_blocks_queue, const int num_parallel_instances = 1, + bool ignore_data_distribution, pipeline::ScanLocalStateBase* local_state = nullptr); ~ScannerContext() override { @@ -162,9 +162,7 @@ class ScannerContext : public std::enable_shared_from_this, virtual bool empty_in_queue(int id); - SimplifiedScanScheduler* get_simple_scan_scheduler() { return _simple_scan_scheduler; } - - SimplifiedScanScheduler* get_remote_scan_scheduler() { return _remote_scan_task_scheduler; } + SimplifiedScanScheduler* get_scan_scheduler() { return _scanner_scheduler; } void stop_scanners(RuntimeState* state); @@ -185,7 +183,7 @@ class ScannerContext : public std::enable_shared_from_this, ScannerContext(RuntimeState* state_, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list>& scanners_, int64_t limit_, - int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances, + bool ignore_data_distribution, pipeline::ScanLocalStateBase* local_state); /// Four criteria to determine whether to increase the parallelism of the scanners @@ -220,17 +218,15 @@ class ScannerContext : public std::enable_shared_from_this, int64_t limit; int32_t _max_thread_num = 0; - int64_t _max_bytes_in_queue; - doris::vectorized::ScannerScheduler* _scanner_scheduler; - SimplifiedScanScheduler* _simple_scan_scheduler = nullptr; - SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr; + int64_t _max_bytes_in_queue = 0; + doris::vectorized::ScannerScheduler* _scanner_scheduler_global = nullptr; + SimplifiedScanScheduler* _scanner_scheduler = nullptr; moodycamel::ConcurrentQueue> _scanners; int32_t _num_scheduled_scanners = 0; int32_t _num_finished_scanners = 0; int32_t _num_running_scanners = 0; // weak pointer for _scanners, used in stop function std::vector> _all_scanners; - const int _num_parallel_instances; std::shared_ptr _scanner_profile; RuntimeProfile::Counter* _scanner_sched_counter = nullptr; RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr; @@ -238,6 +234,7 @@ class ScannerContext : public std::enable_shared_from_this, RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr; RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr; QueryThreadContext _query_thread_context; + bool _ignore_data_distribution = false; // for scaling up the running scanners size_t _estimated_block_size = 0; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index f69165186c5933..f7b6887d7466f8 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -169,13 +169,7 @@ Status ScannerScheduler::submit(std::shared_ptr ctx, scanner_delegate->_scanner->start_wait_worker_timer(); TabletStorageType type = scanner_delegate->_scanner->get_storage_type(); auto sumbit_task = [&]() { - bool is_local = type == TabletStorageType::STORAGE_TYPE_LOCAL; - SimplifiedScanScheduler* scan_sched = - is_local ? ctx->get_simple_scan_scheduler() : ctx->get_remote_scan_scheduler(); - if (!scan_sched) { // query without workload group - scan_sched = - is_local ? _local_scan_thread_pool.get() : _remote_scan_thread_pool.get(); - } + SimplifiedScanScheduler* scan_sched = ctx->get_scan_scheduler(); auto work_func = [scanner_ref = scan_task, ctx]() { DorisMetrics::instance()->scanner_task_queued->increment(-1); DorisMetrics::instance()->scanner_task_running->increment(1); diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index f832e348088cdb..56c49368598adc 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -72,6 +72,12 @@ class ScannerScheduler { static int get_remote_scan_thread_queue_size(); + SimplifiedScanScheduler* get_local_scan_thread_pool() { return _local_scan_thread_pool.get(); } + + SimplifiedScanScheduler* get_remote_scan_thread_pool() { + return _remote_scan_thread_pool.get(); + } + private: static void _scanner_scan(std::shared_ptr ctx, std::shared_ptr scan_task); diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 9f7b6fa06649c2..e3d9f916b5d751 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -306,14 +306,19 @@ Status VScanNode::_init_profile() { void VScanNode::_start_scanners(const std::list>& scanners, const int query_parallel_instance_num) { if (_is_pipeline_scan) { - int max_queue_size = _shared_scan_opt ? std::max(query_parallel_instance_num, 1) : 1; - _scanner_ctx = pipeline::PipScannerContext::create_shared( - _state, this, _output_tuple_desc, _output_row_descriptor.get(), scanners, limit(), - _state->scan_queue_mem_limit(), max_queue_size); + // int max_queue_size = _shared_scan_opt ? std::max(query_parallel_instance_num, 1) : 1; + // In the previous (maybe version 2.1.6), above code is used to set max_queue_size. + // Actually, this value is not working for the pipeline, + // https://github.com/apache/doris/blob/967801ca666f84a60010bef1f6ca5ca777d5b901/be/src/vec/exec/scan/scanner_context.cpp#L91 + // The actual working value comes from state->query_parallel_instance_num() + // so remove above code changes nothing for pipeline. + _scanner_ctx = pipeline::PipScannerContext::create_shared(_state, this, _output_tuple_desc, + _output_row_descriptor.get(), + scanners, limit(), false); } else { _scanner_ctx = ScannerContext::create_shared(_state, this, _output_tuple_desc, _output_row_descriptor.get(), scanners, - limit(), _state->scan_queue_mem_limit()); + limit(), false); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 79503ca77d83bb..5a5feef0e422d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -674,7 +674,10 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT) public long maxExecMemByte = 2147483648L; - @VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT) + @VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT, + description = {"每个 Scan Instance 的 block queue 能够保存多少字节的 block", + "How many bytes of block can be saved in the block queue of each Scan Instance"}) + // 100MB public long maxScanQueueMemByte = 2147483648L / 20; @VariableMgr.VarAttr(name = NUM_SCANNER_THREADS, needForward = true, description = { From 9bb76ac9ed804a6882650ce24e45bdbe94017c95 Mon Sep 17 00:00:00 2001 From: zhiqiang-hhhh Date: Thu, 17 Oct 2024 17:45:18 +0800 Subject: [PATCH 2/2] FMT --- be/src/vec/exec/scan/pip_scanner_context.h | 6 ++---- be/src/vec/exec/scan/scanner_context.h | 3 +-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 96579d67b26ab1..b7684ac5fe3750 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -44,12 +44,10 @@ class PipXScannerContext final : public vectorized::ScannerContext { const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list>& scanners, - int64_t limit_, - std::shared_ptr dependency, + int64_t limit_, std::shared_ptr dependency, bool ignore_data_distribution) : vectorized::ScannerContext(state, output_tuple_desc, output_row_descriptor, scanners, - limit_, ignore_data_distribution, - local_state) { + limit_, ignore_data_distribution, local_state) { _dependency = dependency; } diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index cb374bdcb8deb2..449a5b1b470de0 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -183,8 +183,7 @@ class ScannerContext : public std::enable_shared_from_this, ScannerContext(RuntimeState* state_, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list>& scanners_, int64_t limit_, - bool ignore_data_distribution, - pipeline::ScanLocalStateBase* local_state); + bool ignore_data_distribution, pipeline::ScanLocalStateBase* local_state); /// Four criteria to determine whether to increase the parallelism of the scanners /// 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up