From b23851315c7abfb22dbbccdb924e3373c8a115af Mon Sep 17 00:00:00 2001 From: zhangstar333 <2561612514@qq.com> Date: Mon, 24 Jun 2024 14:54:09 +0800 Subject: [PATCH] [cherry-pick](scan)scanner could eos early when reached limit (#36535) --- be/src/vec/exec/scan/new_es_scan_node.cpp | 4 ++-- be/src/vec/exec/scan/new_file_scan_node.cpp | 7 +++---- be/src/vec/exec/scan/new_jdbc_scan_node.cpp | 5 ++--- be/src/vec/exec/scan/new_odbc_scan_node.cpp | 2 +- be/src/vec/exec/scan/new_olap_scan_node.cpp | 8 ++++---- be/src/vec/exec/scan/vmeta_scan_node.cpp | 5 ++--- 6 files changed, 14 insertions(+), 17 deletions(-) diff --git a/be/src/vec/exec/scan/new_es_scan_node.cpp b/be/src/vec/exec/scan/new_es_scan_node.cpp index 4704f5eb8c6aa3..ab0b565a27a92e 100644 --- a/be/src/vec/exec/scan/new_es_scan_node.cpp +++ b/be/src/vec/exec/scan/new_es_scan_node.cpp @@ -165,8 +165,8 @@ Status NewEsScanNode::_init_scanners(std::list* scanners) { properties, _column_names, _docvalue_context, &doc_value_mode); std::shared_ptr scanner = NewEsScanner::create_shared( - _state, this, _limit_per_scanner, _tuple_id, properties, _docvalue_context, - doc_value_mode, _state->runtime_profile()); + _state, this, _limit, _tuple_id, properties, _docvalue_context, doc_value_mode, + _state->runtime_profile()); RETURN_IF_ERROR(scanner->prepare(_state, _conjuncts)); scanners->push_back(scanner); diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp index da33538b8c3ecf..2cc34f65f1879d 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.cpp +++ b/be/src/vec/exec/scan/new_file_scan_node.cpp @@ -121,10 +121,9 @@ Status NewFileScanNode::_init_scanners(std::list* scanners) { std::min(config::doris_scanner_thread_pool_thread_num, _scan_ranges.size()); _kv_cache.reset(new ShardedKVCache(shard_num)); for (auto& scan_range : _scan_ranges) { - std::unique_ptr scanner = - VFileScanner::create_unique(_state, this, _limit_per_scanner, - scan_range.scan_range.ext_scan_range.file_scan_range, - runtime_profile(), _kv_cache.get()); + std::unique_ptr scanner = VFileScanner::create_unique( + _state, this, _limit, scan_range.scan_range.ext_scan_range.file_scan_range, + runtime_profile(), _kv_cache.get()); RETURN_IF_ERROR( scanner->prepare(_conjuncts, &_colname_to_value_range, &_colname_to_slot_id)); scanners->push_back(std::move(scanner)); diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp index f8219b4337ef6d..deb3f6368142d8 100644 --- a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp +++ b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp @@ -65,9 +65,8 @@ Status NewJdbcScanNode::_init_scanners(std::list* scanners) { if (_eos == true) { return Status::OK(); } - std::unique_ptr scanner = - NewJdbcScanner::create_unique(_state, this, _limit_per_scanner, _tuple_id, - _query_string, _table_type, _state->runtime_profile()); + std::unique_ptr scanner = NewJdbcScanner::create_unique( + _state, this, _limit, _tuple_id, _query_string, _table_type, _state->runtime_profile()); RETURN_IF_ERROR(scanner->prepare(_state, _conjuncts)); scanners->push_back(std::move(scanner)); return Status::OK(); diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.cpp b/be/src/vec/exec/scan/new_odbc_scan_node.cpp index eafad1659680a1..c6361b49bab442 100644 --- a/be/src/vec/exec/scan/new_odbc_scan_node.cpp +++ b/be/src/vec/exec/scan/new_odbc_scan_node.cpp @@ -66,7 +66,7 @@ Status NewOdbcScanNode::_init_scanners(std::list* scanners) { return Status::OK(); } std::shared_ptr scanner = NewOdbcScanner::create_shared( - _state, this, _limit_per_scanner, _odbc_scan_node, _state->runtime_profile()); + _state, this, _limit, _odbc_scan_node, _state->runtime_profile()); RETURN_IF_ERROR(scanner->prepare(_state, _conjuncts)); scanners->push_back(scanner); return Status::OK(); diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index 4dc56c3f44c23f..fce36992761844 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -505,8 +505,8 @@ Status NewOlapScanNode::_init_scanners(std::list* scanners) { const std::vector& key_ranges, TabletReader::ReadSource read_source) { std::shared_ptr scanner = NewOlapScanner::create_shared( - _state, this, _limit_per_scanner, _olap_scan_node.is_preaggregation, scan_range, - key_ranges, std::move(read_source), _scanner_profile.get()); + _state, this, _limit, _olap_scan_node.is_preaggregation, scan_range, key_ranges, + std::move(read_source), _scanner_profile.get()); RETURN_IF_ERROR(scanner->prepare(_state, _conjuncts)); scanner->set_compound_filters(_compound_filters); @@ -597,8 +597,8 @@ Status NewOlapScanNode::_init_scanners(std::list* scanners) { auto build_new_scanner = [&](const TPaloScanRange& scan_range, const std::vector& key_ranges) { std::shared_ptr scanner = NewOlapScanner::create_shared( - _state, this, _limit_per_scanner, _olap_scan_node.is_preaggregation, scan_range, - key_ranges, _scanner_profile.get()); + _state, this, _limit, _olap_scan_node.is_preaggregation, scan_range, key_ranges, + _scanner_profile.get()); RETURN_IF_ERROR(scanner->prepare(_state, _conjuncts)); scanner->set_compound_filters(_compound_filters); diff --git a/be/src/vec/exec/scan/vmeta_scan_node.cpp b/be/src/vec/exec/scan/vmeta_scan_node.cpp index 5ba559466efa67..23aecf447afa70 100644 --- a/be/src/vec/exec/scan/vmeta_scan_node.cpp +++ b/be/src/vec/exec/scan/vmeta_scan_node.cpp @@ -68,9 +68,8 @@ Status VMetaScanNode::_init_scanners(std::list* scanners) { } for (auto& scan_range : _scan_ranges) { - std::shared_ptr scanner = - VMetaScanner::create_shared(_state, this, _tuple_id, scan_range, _limit_per_scanner, - runtime_profile(), _user_identity); + std::shared_ptr scanner = VMetaScanner::create_shared( + _state, this, _tuple_id, scan_range, _limit, runtime_profile(), _user_identity); RETURN_IF_ERROR(scanner->prepare(_state, _conjuncts)); scanners->push_back(scanner); }