diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 4ec90cfa88619a..00ea1aa6363028 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -244,6 +244,7 @@ DEFINE_Validator(doris_scanner_thread_pool_thread_num, [](const int config) -> b } return true; }); +DEFINE_Int32(remote_split_source_batch_size, "1024"); DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1"); // number of olap scanner thread pool queue size DEFINE_Int32(doris_scanner_thread_pool_queue_size, "102400"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 7ff833513eb63e..e1c2ed305d9b76 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -286,6 +286,8 @@ DECLARE_mInt64(doris_blocking_priority_queue_wait_timeout_ms); // number of scanner thread pool size for olap table // and the min thread num of remote scanner thread pool DECLARE_mInt32(doris_scanner_thread_pool_thread_num); +// number of batch size to fetch the remote split source +DECLARE_mInt32(remote_split_source_batch_size); // max number of remote scanner thread pool size // if equal to -1, value is std::max(512, CpuInfo::num_cores() * 10) DECLARE_Int32(doris_max_remote_scanner_thread_pool_thread_num); diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index 392179f2dd8d4d..2484737af897d5 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -31,22 +31,20 @@ namespace doris::pipeline { Status FileScanLocalState::_init_scanners(std::list* scanners) { - if (_scan_ranges.empty()) { + if (_split_source->num_scan_ranges() == 0) { _eos = true; - _scan_dependency->set_ready(); return Status::OK(); } auto& p = _parent->cast(); size_t shard_num = std::min( config::doris_scanner_thread_pool_thread_num / state()->query_parallel_instance_num(), - _scan_ranges.size()); + _max_scanners); shard_num = std::max(shard_num, (size_t)1); _kv_cache.reset(new vectorized::ShardedKVCache(shard_num)); - for (auto& scan_range : _scan_ranges) { + for (int i = 0; i < _max_scanners; ++i) { std::unique_ptr scanner = vectorized::VFileScanner::create_unique( - state(), this, p._limit_per_scanner, - scan_range.scan_range.ext_scan_range.file_scan_range, _scanner_profile.get(), + state(), this, p._limit_per_scanner, _split_source, _scanner_profile.get(), _kv_cache.get()); RETURN_IF_ERROR( scanner->prepare(_conjuncts, &_colname_to_value_range, &_colname_to_slot_id)); @@ -62,47 +60,25 @@ std::string FileScanLocalState::name_suffix() const { void FileScanLocalState::set_scan_ranges(RuntimeState* state, const std::vector& scan_ranges) { - int max_scanners = + _max_scanners = config::doris_scanner_thread_pool_thread_num / state->query_parallel_instance_num(); - max_scanners = std::max(std::max(max_scanners, state->parallel_scan_max_scanners_count()), 1); + _max_scanners = std::max(std::max(_max_scanners, state->parallel_scan_max_scanners_count()), 1); // For select * from table limit 10; should just use one thread. if (should_run_serial()) { - max_scanners = 1; + _max_scanners = 1; } - if (scan_ranges.size() <= max_scanners) { - _scan_ranges = scan_ranges; - } else { - // There is no need for the number of scanners to exceed the number of threads in thread pool. - // scan_ranges is sorted by path(as well as partition path) in FE, so merge scan ranges in order. - // In the insert statement, reading data in partition order can reduce the memory usage of BE - // and prevent the generation of smaller tables. - _scan_ranges.resize(max_scanners); - int num_ranges = scan_ranges.size() / max_scanners; - int num_add_one = scan_ranges.size() - num_ranges * max_scanners; - int scan_index = 0; - int range_index = 0; - for (int i = 0; i < num_add_one; ++i) { - _scan_ranges[scan_index] = scan_ranges[range_index++]; - auto& ranges = - _scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; - for (int j = 0; j < num_ranges; j++) { - auto& merged_ranges = - scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges; - ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); - } + if (scan_ranges.size() == 1) { + auto scan_range = scan_ranges[0].scan_range.ext_scan_range.file_scan_range; + if (scan_range.__isset.split_source) { + auto split_source = scan_range.split_source; + _split_source = std::make_shared( + state, split_source.split_source_id, split_source.num_splits); } - for (int i = num_add_one; i < max_scanners; ++i) { - _scan_ranges[scan_index] = scan_ranges[range_index++]; - auto& ranges = - _scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; - for (int j = 0; j < num_ranges - 1; j++) { - auto& merged_ranges = - scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges; - ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); - } - } - LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size(); } + if (_split_source == nullptr) { + _split_source = std::make_shared(scan_ranges); + } + _max_scanners = std::min(_max_scanners, _split_source->num_scan_ranges()); if (scan_ranges.size() > 0 && scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) { // for compatibility. diff --git a/be/src/pipeline/exec/file_scan_operator.h b/be/src/pipeline/exec/file_scan_operator.h index 9a4c82f11e6556..a9a25cf6c31c23 100644 --- a/be/src/pipeline/exec/file_scan_operator.h +++ b/be/src/pipeline/exec/file_scan_operator.h @@ -26,6 +26,7 @@ #include "operator.h" #include "pipeline/exec/scan_operator.h" #include "vec/exec/format/format_common.h" +#include "vec/exec/scan/split_source_connector.h" namespace doris { namespace vectorized { @@ -54,7 +55,8 @@ class FileScanLocalState final : public ScanLocalState { std::string name_suffix() const override; private: - std::vector _scan_ranges; + std::shared_ptr _split_source = nullptr; + int _max_scanners; // A in memory cache to save some common components // of the this scan node. eg: // 1. iceberg delete file diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp index eed7cfaaec6f1e..d0b0f3cce9b254 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.cpp +++ b/be/src/vec/exec/scan/new_file_scan_node.cpp @@ -26,6 +26,7 @@ #include "common/config.h" #include "common/object_pool.h" +#include "runtime/client_cache.h" #include "vec/exec/scan/vfile_scanner.h" #include "vec/exec/scan/vscanner.h" @@ -36,6 +37,8 @@ class RuntimeState; namespace doris::vectorized { +using apache::thrift::transport::TTransportException; + NewFileScanNode::NewFileScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : VScanNode(pool, tnode, descs) { @@ -60,47 +63,25 @@ Status NewFileScanNode::prepare(RuntimeState* state) { void NewFileScanNode::set_scan_ranges(RuntimeState* state, const std::vector& scan_ranges) { - int max_scanners = + _max_scanners = config::doris_scanner_thread_pool_thread_num / state->query_parallel_instance_num(); - max_scanners = std::max(std::max(max_scanners, state->parallel_scan_max_scanners_count()), 1); + _max_scanners = std::max(std::max(_max_scanners, state->parallel_scan_max_scanners_count()), 1); // For select * from table limit 10; should just use one thread. if (should_run_serial()) { - max_scanners = 1; + _max_scanners = 1; } - if (scan_ranges.size() <= max_scanners) { - _scan_ranges = scan_ranges; - } else { - // There is no need for the number of scanners to exceed the number of threads in thread pool. - // scan_ranges is sorted by path(as well as partition path) in FE, so merge scan ranges in order. - // In the insert statement, reading data in partition order can reduce the memory usage of BE - // and prevent the generation of smaller tables. - _scan_ranges.resize(max_scanners); - int num_ranges = scan_ranges.size() / max_scanners; - int num_add_one = scan_ranges.size() - num_ranges * max_scanners; - int scan_index = 0; - int range_index = 0; - for (int i = 0; i < num_add_one; ++i) { - _scan_ranges[scan_index] = scan_ranges[range_index++]; - auto& ranges = - _scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; - for (int j = 0; j < num_ranges; j++) { - auto& merged_ranges = - scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges; - ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); - } - } - for (int i = num_add_one; i < max_scanners; ++i) { - _scan_ranges[scan_index] = scan_ranges[range_index++]; - auto& ranges = - _scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; - for (int j = 0; j < num_ranges - 1; j++) { - auto& merged_ranges = - scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges; - ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); - } + if (scan_ranges.size() == 1) { + auto scan_range = scan_ranges[0].scan_range.ext_scan_range.file_scan_range; + if (scan_range.__isset.split_source) { + auto split_source = scan_range.split_source; + _split_source = std::make_shared( + state, split_source.split_source_id, split_source.num_splits); } - LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size(); } + if (_split_source == nullptr) { + _split_source = std::make_shared(scan_ranges); + } + _max_scanners = std::min(_max_scanners, _split_source->num_scan_ranges()); if (scan_ranges.size() > 0 && scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) { // for compatibility. @@ -125,20 +106,19 @@ Status NewFileScanNode::_process_conjuncts() { } Status NewFileScanNode::_init_scanners(std::list* scanners) { - if (_scan_ranges.empty()) { + if (_split_source->num_scan_ranges() == 0) { _eos = true; return Status::OK(); } size_t shard_num = std::min( config::doris_scanner_thread_pool_thread_num / _state->query_parallel_instance_num(), - _scan_ranges.size()); + _max_scanners); shard_num = std::max(shard_num, (size_t)1); _kv_cache.reset(new ShardedKVCache(shard_num)); - for (auto& scan_range : _scan_ranges) { + for (int i = 0; i < _max_scanners; ++i) { std::unique_ptr scanner = - VFileScanner::create_unique(_state, this, _limit_per_scanner, - scan_range.scan_range.ext_scan_range.file_scan_range, + VFileScanner::create_unique(_state, this, _limit_per_scanner, _split_source, runtime_profile(), _kv_cache.get()); RETURN_IF_ERROR( scanner->prepare(_conjuncts, &_colname_to_value_range, &_colname_to_slot_id)); diff --git a/be/src/vec/exec/scan/new_file_scan_node.h b/be/src/vec/exec/scan/new_file_scan_node.h index e6d9fc1e621d98..24af199c96030c 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.h +++ b/be/src/vec/exec/scan/new_file_scan_node.h @@ -25,6 +25,7 @@ #include "common/status.h" #include "vec/exec/format/format_common.h" +#include "vec/exec/scan/split_source_connector.h" #include "vec/exec/scan/vscan_node.h" namespace doris { @@ -58,7 +59,8 @@ class NewFileScanNode : public VScanNode { Status _init_scanners(std::list* scanners) override; private: - std::vector _scan_ranges; + std::shared_ptr _split_source = nullptr; + int _max_scanners; // A in memory cache to save some common components // of the this scan node. eg: // 1. iceberg delete file diff --git a/be/src/vec/exec/scan/split_source_connector.cpp b/be/src/vec/exec/scan/split_source_connector.cpp new file mode 100644 index 00000000000000..2d35d3796bcc7a --- /dev/null +++ b/be/src/vec/exec/scan/split_source_connector.cpp @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/scan/split_source_connector.h" + +#include "runtime/exec_env.h" +#include "runtime/query_context.h" + +namespace doris::vectorized { + +using apache::thrift::transport::TTransportException; + +Status LocalSplitSourceConnector::get_next(bool* has_next, TFileRangeDesc* range) { + std::lock_guard l(_range_lock); + *has_next = false; + if (_scan_index < _scan_ranges.size()) { + auto& ranges = _scan_ranges[_scan_index].scan_range.ext_scan_range.file_scan_range.ranges; + if (_range_index < ranges.size()) { + *has_next = true; + *range = ranges[_range_index++]; + if (_range_index == ranges.size()) { + _scan_index++; + _range_index = 0; + } + } + } + return Status::OK(); +} + +Status RemoteSplitSourceConnector::get_next(bool* has_next, TFileRangeDesc* range) { + std::lock_guard l(_range_lock); + *has_next = false; + if (_scan_index == _scan_ranges.size() && !_last_batch) { + Status coord_status; + FrontendServiceConnection coord(_state->exec_env()->frontend_client_cache(), + _state->get_query_ctx()->coord_addr, &coord_status); + RETURN_IF_ERROR(coord_status); + TFetchSplitBatchRequest request; + request.__set_split_source_id(_split_source_id); + request.__set_max_num_splits(config::remote_split_source_batch_size); + TFetchSplitBatchResult result; + try { + coord->fetchSplitBatch(result, request); + } catch (std::exception& e1) { + LOG(WARNING) << "Failed to get batch of split source: {}, try to reopen" << e1.what(); + RETURN_IF_ERROR(coord.reopen()); + try { + coord->fetchSplitBatch(result, request); + } catch (std::exception& e2) { + return Status::IOError("Failed to get batch of split source: {}", e2.what()); + } + } + _last_batch = result.splits.empty(); + _scan_ranges = result.splits; + _scan_index = 0; + _range_index = 0; + } + if (_scan_index < _scan_ranges.size()) { + auto& ranges = _scan_ranges[_scan_index].scan_range.ext_scan_range.file_scan_range.ranges; + if (_range_index < ranges.size()) { + *has_next = true; + *range = ranges[_range_index++]; + if (_range_index == ranges.size()) { + _scan_index++; + _range_index = 0; + } + } + } + return Status::OK(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/split_source_connector.h b/be/src/vec/exec/scan/split_source_connector.h new file mode 100644 index 00000000000000..cf358846b30f34 --- /dev/null +++ b/be/src/vec/exec/scan/split_source_connector.h @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/config.h" +#include "runtime/client_cache.h" +#include "runtime/runtime_state.h" + +namespace doris::vectorized { + +/* + * Multiple scanners within a scan node share a split source. + * Each scanner call `get_next` to get the next scan range. A fast scanner will immediately obtain + * the next scan range, so there is no situation of data skewing. + */ +class SplitSourceConnector { +public: + SplitSourceConnector() = default; + virtual ~SplitSourceConnector() = default; + + /** + * Get the next scan range. has_next should be to true to fetch the next scan range. + * @param has_next whether exists the next scan range + * @param range the obtained next scan range + */ + virtual Status get_next(bool* has_next, TFileRangeDesc* range) = 0; + + virtual int num_scan_ranges() = 0; + + virtual TFileScanRangeParams* get_params() = 0; +}; + +/** + * The file splits are already assigned in `TFileScanRange.ranges`. Scan node has need to + * fetch the scan ranges from frontend. + * + * In cases where the number of files is small, the splits are directly transmitted to backend. + */ +class LocalSplitSourceConnector : public SplitSourceConnector { +private: + std::mutex _range_lock; + std::vector _scan_ranges; + int _scan_index = 0; + int _range_index = 0; + +public: + LocalSplitSourceConnector(const std::vector& scan_ranges) + : _scan_ranges(scan_ranges) {} + + Status get_next(bool* has_next, TFileRangeDesc* range) override; + + int num_scan_ranges() override { return _scan_ranges.size(); } + + TFileScanRangeParams* get_params() override { + if (_scan_ranges.size() > 0 && + _scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) { + // for compatibility. + return &_scan_ranges[0].scan_range.ext_scan_range.file_scan_range.params; + } + LOG(FATAL) << "Unreachable, params is got by file_scan_range_params_map"; + } +}; + +/** + * The file splits are lazily generated in frontend, and saved as a split source in frontend. + * The scan node needs to fetch file splits from the frontend service. Each split source is identified by + * a unique ID, and the ID is stored in `TFileScanRange.split_source.split_source_id` + * + * In the case of a large number of files, backend can scan data while obtaining splits information. + */ +class RemoteSplitSourceConnector : public SplitSourceConnector { +private: + std::mutex _range_lock; + RuntimeState* _state; + int64 _split_source_id; + int _num_splits; + + std::vector _scan_ranges; + bool _last_batch = false; + int _scan_index = 0; + int _range_index = 0; + +public: + RemoteSplitSourceConnector(RuntimeState* state, int64 split_source_id, int num_splits) + : _state(state), _split_source_id(split_source_id), _num_splits(num_splits) {} + + Status get_next(bool* has_next, TFileRangeDesc* range) override; + + /* + * Remote split source is fetched in batch mode, and the splits are generated while scanning, + * so the number of scan ranges may not be accurate. + */ + int num_scan_ranges() override { return _num_splits; } + + TFileScanRangeParams* get_params() override { + LOG(FATAL) << "Unreachable, params is got by file_scan_range_params_map"; + } +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 64d8f85560c880..15e250b3871b1c 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -88,25 +88,23 @@ namespace doris::vectorized { using namespace ErrorCode; VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, - const TFileScanRange& scan_range, RuntimeProfile* profile, - ShardedKVCache* kv_cache) + std::shared_ptr split_source, + RuntimeProfile* profile, ShardedKVCache* kv_cache) : VScanner(state, static_cast(parent), limit, profile), - _ranges(scan_range.ranges), - _next_range(0), + _split_source(split_source), _cur_reader(nullptr), _cur_reader_eof(false), _kv_cache(kv_cache), _strict_mode(false) { - if (scan_range.params.__isset.strict_mode) { - _strict_mode = scan_range.params.strict_mode; - } - if (state->get_query_ctx() != nullptr && state->get_query_ctx()->file_scan_range_params_map.count(parent->id()) > 0) { _params = &(state->get_query_ctx()->file_scan_range_params_map[parent->id()]); } else { - CHECK(scan_range.__isset.params); - _params = &(scan_range.params); + // old fe thrift protocol + _params = _split_source->get_params(); + } + if (_params->__isset.strict_mode) { + _strict_mode = _params->strict_mode; } // For load scanner, there are input and output tuple. @@ -117,25 +115,24 @@ VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t } VFileScanner::VFileScanner(RuntimeState* state, pipeline::FileScanLocalState* local_state, - int64_t limit, const TFileScanRange& scan_range, RuntimeProfile* profile, - ShardedKVCache* kv_cache) + int64_t limit, + std::shared_ptr split_source, + RuntimeProfile* profile, ShardedKVCache* kv_cache) : VScanner(state, local_state, limit, profile), - _ranges(scan_range.ranges), - _next_range(0), + _split_source(split_source), _cur_reader(nullptr), _cur_reader_eof(false), _kv_cache(kv_cache), _strict_mode(false) { - if (scan_range.params.__isset.strict_mode) { - _strict_mode = scan_range.params.strict_mode; - } - if (state->get_query_ctx() != nullptr && state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) { _params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]); } else { - CHECK(scan_range.__isset.params); - _params = &(scan_range.params); + // old fe thrift protocol + _params = _split_source->get_params(); + } + if (_params->__isset.strict_mode) { + _strict_mode = _params->strict_mode; } // For load scanner, there are input and output tuple. @@ -285,7 +282,13 @@ void VFileScanner::_get_slot_ids(VExpr* expr, std::vector* slot_ids) { Status VFileScanner::open(RuntimeState* state) { RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(VScanner::open(state)); - RETURN_IF_ERROR(_init_expr_ctxes()); + RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range)); + if (_first_scan_range) { + RETURN_IF_ERROR(_init_expr_ctxes()); + } else { + // there's no scan range in split source. stop scanner directly. + _scanner_eof = true; + } return Status::OK(); } @@ -736,19 +739,21 @@ Status VFileScanner::_get_next_reader() { if (_cur_reader) { _cur_reader->collect_profile_before_close(); RETURN_IF_ERROR(_cur_reader->close()); + _state->update_num_finished_scan_range(1); } _cur_reader.reset(nullptr); _src_block_init = false; - if (_next_range >= _ranges.size() || _should_stop) { + bool has_next = _first_scan_range; + if (!_first_scan_range) { + RETURN_IF_ERROR(_split_source->get_next(&has_next, &_current_range)); + } + _first_scan_range = false; + if (!has_next || _should_stop) { _scanner_eof = true; - _state->update_num_finished_scan_range(1); return Status::OK(); } - if (_next_range != 0) { - _state->update_num_finished_scan_range(1); - } - const TFileRangeDesc& range = _ranges[_next_range++]; + const TFileRangeDesc& range = _current_range; _current_range_path = range.path; // create reader for specific format @@ -1006,7 +1011,7 @@ Status VFileScanner::_generate_fill_columns() { _partition_col_descs.clear(); _missing_col_descs.clear(); - const TFileRangeDesc& range = _ranges.at(_next_range - 1); + const TFileRangeDesc& range = _current_range; if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) { for (const auto& slot_desc : _partition_slot_descs) { if (slot_desc) { @@ -1049,8 +1054,6 @@ Status VFileScanner::_generate_fill_columns() { } Status VFileScanner::_init_expr_ctxes() { - DCHECK(!_ranges.empty()); - std::map full_src_index_map; std::map full_src_slot_map; std::map partition_name_to_key_index_map; @@ -1066,8 +1069,8 @@ Status VFileScanner::_init_expr_ctxes() { // All ranges in _ranges vector should have identical columns_from_path_keys // because they are all file splits for the same external table. // So here use the first element of _ranges to fill the partition_name_to_key_index_map - if (_ranges[0].__isset.columns_from_path_keys) { - std::vector key_map = _ranges[0].columns_from_path_keys; + if (_current_range.__isset.columns_from_path_keys) { + std::vector key_map = _current_range.columns_from_path_keys; if (!key_map.empty()) { for (size_t i = 0; i < key_map.size(); i++) { partition_name_to_key_index_map.emplace(key_map[i], i); diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index bdaefc595e3a2f..43c1a8b13da51a 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -64,12 +64,12 @@ class VFileScanner : public VScanner { static constexpr const char* NAME = "VFileScanner"; VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, - const TFileScanRange& scan_range, RuntimeProfile* profile, - ShardedKVCache* kv_cache); + std::shared_ptr split_source, + RuntimeProfile* profile, ShardedKVCache* kv_cache); VFileScanner(RuntimeState* state, pipeline::FileScanLocalState* parent, int64_t limit, - const TFileScanRange& scan_range, RuntimeProfile* profile, - ShardedKVCache* kv_cache); + std::shared_ptr split_source, + RuntimeProfile* profile, ShardedKVCache* kv_cache); Status open(RuntimeState* state) override; @@ -99,8 +99,9 @@ class VFileScanner : public VScanner { protected: const TFileScanRangeParams* _params = nullptr; - const std::vector& _ranges; - int _next_range; + std::shared_ptr _split_source; + bool _first_scan_range = false; + TFileRangeDesc _current_range; std::unique_ptr _cur_reader; bool _cur_reader_eof; @@ -243,7 +244,7 @@ class VFileScanner : public VScanner { // Otherwise, the cache miss rate will be high bool _shoudl_enable_file_meta_cache() { return config::max_external_file_meta_cache_num > 0 && - _ranges.size() < config::max_external_file_meta_cache_num / 3; + _split_source->num_scan_ranges() < config::max_external_file_meta_cache_num / 3; } }; } // namespace doris::vectorized diff --git a/be/test/vec/exec/vwal_scanner_test.cpp b/be/test/vec/exec/vwal_scanner_test.cpp index 4455126790bfe7..d5eac0e162d529 100644 --- a/be/test/vec/exec/vwal_scanner_test.cpp +++ b/be/test/vec/exec/vwal_scanner_test.cpp @@ -35,6 +35,32 @@ namespace doris { namespace vectorized { + +class TestSplitSourceConnector : public SplitSourceConnector { +private: + std::mutex _range_lock; + TFileScanRange _scan_range; + int _range_index = 0; + +public: + TestSplitSourceConnector(const TFileScanRange& scan_range) : _scan_range(scan_range) {} + + Status get_next(bool* has_next, TFileRangeDesc* range) override { + std::lock_guard l(_range_lock); + if (_range_index < _scan_range.ranges.size()) { + *has_next = true; + *range = _scan_range.ranges[_range_index++]; + } else { + *has_next = false; + } + return Status::OK(); + } + + int num_scan_ranges() override { return _scan_range.ranges.size(); } + + TFileScanRangeParams* get_params() override { return &_scan_range.params; } +}; + class VWalScannerTest : public testing::Test { public: VWalScannerTest() : _runtime_state(TQueryGlobals()) { @@ -266,7 +292,8 @@ void VWalScannerTest::init() { } void VWalScannerTest::generate_scanner(std::shared_ptr& scanner) { - scanner = std::make_shared(&_runtime_state, _scan_node.get(), -1, _scan_range, + auto split_source = std::make_shared(_scan_range); + scanner = std::make_shared(&_runtime_state, _scan_node.get(), -1, split_source, _profile, _kv_cache.get()); scanner->_is_load = false; vectorized::VExprContextSPtrs _conjuncts; diff --git a/docker/thirdparties/docker-compose/hive/scripts/hive-metastore.sh b/docker/thirdparties/docker-compose/hive/scripts/hive-metastore.sh index 0157dc26d40581..09658778fdbc34 100755 --- a/docker/thirdparties/docker-compose/hive/scripts/hive-metastore.sh +++ b/docker/thirdparties/docker-compose/hive/scripts/hive-metastore.sh @@ -82,3 +82,10 @@ touch /mnt/SUCCESS while true; do sleep 1 done + +create catalog if not exists hive2_docker properties ( + 'type'='hms', + 'hive.metastore.uris' = 'thrift://172.21.0.101:9083', + 'fs.defaultFS' = 'hdfs://172.21.0.101:8020', + 'hadoop.username' = 'hadoop' +); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index d8f7eaa36aefb1..cbf43b60913741 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -132,6 +132,7 @@ import org.apache.doris.datasource.ExternalMetaCacheMgr; import org.apache.doris.datasource.ExternalMetaIdMgr; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.datasource.SplitSourceManager; import org.apache.doris.datasource.es.EsExternalCatalog; import org.apache.doris.datasource.es.EsRepository; import org.apache.doris.datasource.hive.HiveTransactionMgr; @@ -543,6 +544,8 @@ public class Env { private final NereidsSqlCacheManager sqlCacheManager; + private final SplitSourceManager splitSourceManager; + public List getFrontendInfos() { List res = new ArrayList<>(); @@ -782,6 +785,7 @@ public Env(boolean isCheckpointCatalog) { this.insertOverwriteManager = new InsertOverwriteManager(); this.dnsCache = new DNSCache(); this.sqlCacheManager = new NereidsSqlCacheManager(); + this.splitSourceManager = new SplitSourceManager(); } public static void destroyCheckpoint() { @@ -1743,7 +1747,7 @@ protected void startNonMasterDaemonThreads() { workloadGroupMgr.start(); workloadSchedPolicyMgr.start(); workloadRuntimeStatusMgr.start(); - + splitSourceManager.start(); } private void transferToNonMaster(FrontendNodeType newType) { @@ -6142,6 +6146,10 @@ public NereidsSqlCacheManager getSqlCacheManager() { return sqlCacheManager; } + public SplitSourceManager getSplitSourceManager() { + return splitSourceManager; + } + public StatisticsJobAppender getStatisticsJobAppender() { return statisticsJobAppender; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index 7afb04831ce029..ff25464b4c9522 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -60,12 +60,12 @@ import org.apache.doris.thrift.TScanRange; import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; +import org.apache.doris.thrift.TSplitSource; import org.apache.doris.thrift.TTableFormatFileDesc; import org.apache.doris.thrift.TTextSerdeType; import org.apache.doris.thrift.TTransactionalHiveDeleteDeltaDesc; import org.apache.doris.thrift.TTransactionalHiveDesc; -import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -124,6 +124,7 @@ public void init(Analyzer analyzer) throws UserException { */ @Override public void init() throws UserException { + super.init(); if (ConnectContext.get().getExecutor() != null) { ConnectContext.get().getExecutor().getSummaryProfile().setInitScanNodeStartTime(); } @@ -268,14 +269,6 @@ public void createScanRangeLocations() throws UserException { if (ConnectContext.get().getExecutor() != null) { ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsStartTime(); } - List inputSplits = getSplits(); - if (ConnectContext.get().getExecutor() != null) { - ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime(); - } - this.inputSplitsNum = inputSplits.size(); - if (inputSplits.isEmpty() && !(getLocationType() == TFileType.FILE_STREAM)) { - return; - } TFileFormatType fileFormatType = getFileFormatType(); if (fileFormatType == TFileFormatType.FORMAT_ORC) { genSlotToSchemaIdMapForOrc(); @@ -317,72 +310,66 @@ public void createScanRangeLocations() throws UserException { } List pathPartitionKeys = getPathPartitionKeys(); - - Multimap assignment = backendPolicy.computeScanRangeAssignment(inputSplits); - for (Backend backend : assignment.keySet()) { - Collection splits = assignment.get(backend); - for (Split split : splits) { - FileSplit fileSplit = (FileSplit) split; - TFileType locationType; - if (fileSplit instanceof IcebergSplit - && ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) { - locationType = TFileType.FILE_BROKER; - } else { - locationType = getLocationType(fileSplit.getPath().toString()); - } - + if (isBatchMode()) { + // File splits are generated lazily, and fetched by backends while scanning. + // Only provide the unique ID of split source to backend. + SplitAssignment splitAssignment = new SplitAssignment(backendPolicy, this); + splitAssignment.init(); + if (ConnectContext.get().getExecutor() != null) { + ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime(); + } + if (splitAssignment.getCurrentAssignment().isEmpty() && !(getLocationType() == TFileType.FILE_STREAM)) { + return; + } + inputSplitsNum = splitAssignment.numApproximateSplits(); + + TFileType locationType; + FileSplit fileSplit = (FileSplit) splitAssignment.getCurrentAssignment().values().iterator().next(); + if (fileSplit instanceof IcebergSplit + && ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) { + locationType = TFileType.FILE_BROKER; + } else { + locationType = getLocationType(fileSplit.getPath().toString()); + } + totalFileSize = fileSplit.getLength() * inputSplitsNum; + // Not accurate, only used to estimate concurrency. + int numSplitsPerBE = splitAssignment.numApproximateSplits() / backendPolicy.numBackends(); + for (Backend backend : backendPolicy.getBackends()) { + SplitSource splitSource = new SplitSource( + this::splitToScanRange, backend, locationProperties, splitAssignment, pathPartitionKeys); + splitSources.add(splitSource); + Env.getCurrentEnv().getSplitSourceManager().registerSplitSource(splitSource); TScanRangeLocations curLocations = newLocations(); - // If fileSplit has partition values, use the values collected from hive partitions. - // Otherwise, use the values in file path. - boolean isACID = false; - if (fileSplit instanceof HiveSplit) { - HiveSplit hiveSplit = (HiveSplit) fileSplit; - isACID = hiveSplit.isACID(); - } - List partitionValuesFromPath = fileSplit.getPartitionValues() == null - ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, - false, isACID) : fileSplit.getPartitionValues(); - - TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys, - locationType); - TFileCompressType fileCompressType = getFileCompressType(fileSplit); - rangeDesc.setCompressType(fileCompressType); - if (isACID) { - HiveSplit hiveSplit = (HiveSplit) fileSplit; - hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE); - TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); - tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value()); - AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo(); - TTransactionalHiveDesc transactionalHiveDesc = new TTransactionalHiveDesc(); - transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation()); - List deleteDeltaDescs = new ArrayList<>(); - for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas()) { - TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new TTransactionalHiveDeleteDeltaDesc(); - deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation()); - deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames()); - deleteDeltaDescs.add(deleteDeltaDesc); - } - transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs); - tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc); - rangeDesc.setTableFormatParams(tableFormatFileDesc); - } - - setScanParams(rangeDesc, fileSplit); - - curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); + TSplitSource tSource = new TSplitSource(); + tSource.setSplitSourceId(splitSource.getUniqueId()); + tSource.setNumSplits(numSplitsPerBE); + curLocations.getScanRange().getExtScanRange().getFileScanRange().setSplitSource(tSource); TScanRangeLocation location = new TScanRangeLocation(); - setLocationPropertiesIfNecessary(backend, locationType, locationProperties); location.setBackendId(backend.getId()); location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort())); curLocations.addToLocations(location); - if (LOG.isDebugEnabled()) { - LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}", - curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), - fileSplit.getStart(), fileSplit.getLength(), - Joiner.on("|").join(fileSplit.getHosts())); - } + // So there's only one scan range for each backend. + // Each backend only starts up one ScanNode instance. + // However, even one ScanNode instance can provide maximum scanning concurrency. scanRangeLocations.add(curLocations); - this.totalFileSize += fileSplit.getLength(); + setLocationPropertiesIfNecessary(backend, locationType, locationProperties); + } + } else { + List inputSplits = getSplits(); + if (ConnectContext.get().getExecutor() != null) { + ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime(); + } + inputSplitsNum = inputSplits.size(); + if (inputSplits.isEmpty() && !(getLocationType() == TFileType.FILE_STREAM)) { + return; + } + Multimap assignment = backendPolicy.computeScanRangeAssignment(inputSplits); + for (Backend backend : assignment.keySet()) { + Collection splits = assignment.get(backend); + for (Split split : splits) { + scanRangeLocations.add(splitToScanRange(backend, locationProperties, split, pathPartitionKeys)); + totalFileSize += split.getLength(); + } } } @@ -395,6 +382,66 @@ public void createScanRangeLocations() throws UserException { } } + private TScanRangeLocations splitToScanRange( + Backend backend, + Map locationProperties, + Split split, + List pathPartitionKeys) throws UserException { + FileSplit fileSplit = (FileSplit) split; + TFileType locationType; + if (fileSplit instanceof IcebergSplit + && ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) { + locationType = TFileType.FILE_BROKER; + } else { + locationType = getLocationType(fileSplit.getPath().toString()); + } + + TScanRangeLocations curLocations = newLocations(); + // If fileSplit has partition values, use the values collected from hive partitions. + // Otherwise, use the values in file path. + boolean isACID = false; + if (fileSplit instanceof HiveSplit) { + HiveSplit hiveSplit = (HiveSplit) fileSplit; + isACID = hiveSplit.isACID(); + } + List partitionValuesFromPath = fileSplit.getPartitionValues() == null + ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, + false, isACID) : fileSplit.getPartitionValues(); + + TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys, + locationType); + TFileCompressType fileCompressType = getFileCompressType(fileSplit); + rangeDesc.setCompressType(fileCompressType); + if (isACID) { + HiveSplit hiveSplit = (HiveSplit) fileSplit; + hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE); + TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); + tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value()); + AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo(); + TTransactionalHiveDesc transactionalHiveDesc = new TTransactionalHiveDesc(); + transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation()); + List deleteDeltaDescs = new ArrayList<>(); + for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas()) { + TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new TTransactionalHiveDeleteDeltaDesc(); + deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation()); + deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames()); + deleteDeltaDescs.add(deleteDeltaDesc); + } + transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs); + tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc); + rangeDesc.setTableFormatParams(tableFormatFileDesc); + } + + setScanParams(rangeDesc, fileSplit); + curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); + TScanRangeLocation location = new TScanRangeLocation(); + setLocationPropertiesIfNecessary(backend, locationType, locationProperties); + location.setBackendId(backend.getId()); + location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort())); + curLocations.addToLocations(location); + return curLocations; + } + private void setLocationPropertiesIfNecessary(Backend selectedBackend, TFileType locationType, Map locationProperties) throws UserException { if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java index 71008e21088a95..227f636e67b9cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java @@ -69,6 +69,7 @@ public abstract class FileScanNode extends ExternalScanNode { protected long totalFileSize = 0; protected long totalPartitionNum = 0; protected long readPartitionNum = 0; + protected long fileSplitSize; public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType, boolean needCheckColumnPriv) { @@ -76,6 +77,11 @@ public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, St this.needCheckColumnPriv = needCheckColumnPriv; } + @Override + public void init() throws UserException { + this.fileSplitSize = ConnectContext.get().getSessionVariable().getFileSplitSize(); + } + @Override protected void toThrift(TPlanNode planNode) { planNode.setPushDownAggTypeOpt(pushDownAggNoGroupingOp); @@ -233,18 +239,17 @@ protected List splitFile(Path path, long blockSize, BlockLocation[] block result.add(splitCreator.create(path, 0, length, length, modificationTime, hosts, partitionValues)); return result; } - long splitSize = ConnectContext.get().getSessionVariable().getFileSplitSize(); - if (splitSize <= 0) { - splitSize = blockSize; + if (fileSplitSize <= 0) { + fileSplitSize = blockSize; } // Min split size is DEFAULT_SPLIT_SIZE(128MB). - splitSize = Math.max(splitSize, DEFAULT_SPLIT_SIZE); + fileSplitSize = Math.max(fileSplitSize, DEFAULT_SPLIT_SIZE); long bytesRemaining; - for (bytesRemaining = length; (double) bytesRemaining / (double) splitSize > 1.1D; - bytesRemaining -= splitSize) { + for (bytesRemaining = length; (double) bytesRemaining / (double) fileSplitSize > 1.1D; + bytesRemaining -= fileSplitSize) { int location = getBlockIndex(blockLocations, length - bytesRemaining); String[] hosts = location == -1 ? null : blockLocations[location].getHosts(); - result.add(splitCreator.create(path, length - bytesRemaining, splitSize, + result.add(splitCreator.create(path, length - bytesRemaining, fileSplitSize, length, modificationTime, hosts, partitionValues)); } if (bytesRemaining != 0L) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java new file mode 100644 index 00000000000000..f41eaba7dd8820 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.common.UserException; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.spi.Split; +import org.apache.doris.system.Backend; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; + +import java.util.Collection; + +/** + * When file splits are supplied in batch mode, splits are generated lazily and assigned in each call of `getNextBatch`. + * `SplitGenerator` provides the file splits, and `FederationBackendPolicy` assigns these splits to backends. + */ +public class SplitAssignment { + // magic number to estimate how many splits are allocated to BE in each batch + private static final int NUM_SPLITS_PER_BE = 1024; + // magic number to estimate how many splits are generated of each partition in each batch. + private static final int NUM_SPLITS_PER_PARTITION = 10; + + private final FederationBackendPolicy backendPolicy; + private final SplitGenerator splitGenerator; + // Store the current assignment of file splits + private final Multimap assignment; + private final int maxBatchSize; + + public SplitAssignment(FederationBackendPolicy backendPolicy, SplitGenerator splitGenerator) { + this.backendPolicy = backendPolicy; + this.splitGenerator = splitGenerator; + this.assignment = ArrayListMultimap.create(); + int numPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode(); + maxBatchSize = Math.min(NUM_SPLITS_PER_PARTITION * numPartitions, + NUM_SPLITS_PER_BE * backendPolicy.numBackends()); + } + + public void init() throws UserException { + if (assignment.isEmpty() && splitGenerator.hasNext()) { + assignment.putAll(backendPolicy.computeScanRangeAssignment(splitGenerator.getNextBatch(maxBatchSize))); + } + } + + public Multimap getCurrentAssignment() { + return assignment; + } + + public int numApproximateSplits() { + return splitGenerator.numApproximateSplits(); + } + + public synchronized Collection getNextBatch(Backend backend) throws UserException { + // Each call should consume all splits + Collection splits = assignment.removeAll(backend); + while (splits.isEmpty()) { + // Get the next batch of splits, and assign to backends + // If there is data skewing, it maybe causes splits to accumulate on some BE + if (!splitGenerator.hasNext()) { + return splits; + } + // todo: In each batch, it's to find the optimal assignment for partial splits, + // how to solve the global data skew? + assignment.putAll(backendPolicy.computeScanRangeAssignment(splitGenerator.getNextBatch(maxBatchSize))); + splits = assignment.removeAll(backend); + } + return splits; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java new file mode 100644 index 00000000000000..b819c7f9ef20f5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.common.NotImplementedException; +import org.apache.doris.common.UserException; +import org.apache.doris.spi.Split; + +import java.util.List; + +/** + * The Producer(e.g. ScanNode) that provides the file splits in lazy and batch mode. + * The consumer should call `getNextBatch` to fetch the next batch of splits. + */ +public interface SplitGenerator { + /** + * Get the next batch of splits. If the producer(e.g. ScanNode) doesn't support batch mode, + * should throw user exceptions. + */ + default List getNextBatch(int maxBatchSize) throws UserException { + throw new NotImplementedException("Should implement getNextBatch if in batch mode."); + } + + /** + * Get all file splits if the producer doesn't support batch mode. + */ + default List getSplits() throws UserException { + // todo: remove this interface if batch mode is stable + throw new NotImplementedException("Scan node sub class need to implement getSplits interface."); + } + + /** + * `getNextBatch` should return empty list even if `hasNext` returns false. + */ + default boolean hasNext() { + return false; + } + + /** + * Whether the producer(e.g. ScanNode) support batch mode. + */ + default boolean isBatchMode() { + return false; + } + + /** + * Because file splits are generated lazily, the exact number of splits may not be known, + * provide an estimated value to show in describe statement. + */ + default int numApproximateSplits() { + return -1; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java new file mode 100644 index 00000000000000..74e6aa88ba3232 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.common.UserException; +import org.apache.doris.spi.Split; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TScanRangeLocations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * If there are many files, splitting these files into scan ranges will consume a lot of time. + * Even the simplest queries(e.g. select * from large_table limit 1) can get stuck or crash due to the split process. + * Furthermore, during the splitting process, the backend did not do anything. + * It is completely possible to split files whiling scanning data on the ready splits at once. + * `SplitSource` introduce a lazy and batch mode to provide the file splits. Each `SplitSource` has a unique ID, + * which is used by backends to call `FrontendServiceImpl#fetchSplitBatch` to fetch splits batch by batch. + * `SplitSource`s are managed by `SplitSourceManager`, which stores `SplitSource` as a weak reference, and clean + * the split source when its related scan node is GC. + */ +public class SplitSource { + private static final AtomicLong UNIQUE_ID_GENERATOR = new AtomicLong(0); + + private final long uniqueId; + private final SplitToScanRange splitToScanRange; + private final Backend backend; + private final Map locationProperties; + private final List pathPartitionKeys; + private final SplitAssignment splitAssignment; + private Iterator splitIterator = null; + private boolean isLastBatch = false; + + public SplitSource( + SplitToScanRange splitToScanRange, + Backend backend, + Map locationProperties, + SplitAssignment splitAssignment, + List pathPartitionKeys) { + this.uniqueId = UNIQUE_ID_GENERATOR.getAndIncrement(); + this.splitToScanRange = splitToScanRange; + this.backend = backend; + this.locationProperties = locationProperties; + this.pathPartitionKeys = pathPartitionKeys; + this.splitAssignment = splitAssignment; + } + + public long getUniqueId() { + return uniqueId; + } + + /** + * Get the next batch of file splits. If there's no more split, return empty list. + */ + public synchronized List getNextBatch(int maxBatchSize) throws UserException { + if (isLastBatch) { + return Collections.emptyList(); + } + List scanRanges = new ArrayList<>(maxBatchSize); + for (int i = 0; i < maxBatchSize; i++) { + if (splitIterator == null || !splitIterator.hasNext()) { + Collection splits = splitAssignment.getNextBatch(backend); + if (splits.isEmpty()) { + isLastBatch = true; + return scanRanges; + } + splitIterator = splits.iterator(); + } + scanRanges.add(splitToScanRange.getScanRange( + backend, locationProperties, splitIterator.next(), pathPartitionKeys)); + } + return scanRanges; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSourceManager.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSourceManager.java new file mode 100644 index 00000000000000..83a7436df9ab82 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSourceManager.java @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.common.util.MasterDaemon; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.lang.ref.ReferenceQueue; +import java.lang.ref.WeakReference; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * `SplitSource` is obtained by RPC call of `FrontendServiceImpl#fetchSplitBatch`. + * Each `SplitSource` is reference by its unique ID. `SplitSourceManager` provides the register, get, and remove + * function to manage the split sources. In order to clean the split source when the query finished, + * `SplitSource` is stored as a weak reference, and use `ReferenceQueue` to remove split source when GC. + */ +public class SplitSourceManager extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(SplitSourceManager.class); + + public static class SplitSourceReference extends WeakReference { + private final long uniqueId; + + public SplitSourceReference(SplitSource splitSource, ReferenceQueue queue) { + super(splitSource, queue); + uniqueId = splitSource.getUniqueId(); + } + + public long getUniqueId() { + return uniqueId; + } + } + + private final ReferenceQueue splitsRefQueue = new ReferenceQueue<>(); + private final Map> splits = new ConcurrentHashMap<>(); + + public void registerSplitSource(SplitSource splitSource) { + splits.put(splitSource.getUniqueId(), new SplitSourceReference(splitSource, splitsRefQueue)); + } + + public void removeSplitSource(long uniqueId) { + splits.remove(uniqueId); + } + + public SplitSource getSplitSource(long uniqueId) { + return splits.get(uniqueId).get(); + } + + @Override + protected void runAfterCatalogReady() { + while (true) { + try { + SplitSourceReference reference = (SplitSourceReference) splitsRefQueue.remove(); + removeSplitSource(reference.getUniqueId()); + } catch (Exception e) { + LOG.warn("Failed to clean split source", e); + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitToScanRange.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitToScanRange.java new file mode 100644 index 00000000000000..0e890252857583 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitToScanRange.java @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.common.UserException; +import org.apache.doris.spi.Split; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TScanRangeLocations; + +import java.util.List; +import java.util.Map; + +public interface SplitToScanRange { + TScanRangeLocations getScanRange( + Backend backend, + Map locationProperties, + Split split, + List pathPartitionKeys) throws UserException; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index 0dbaa8b462b8ff..12024c2561642d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -63,8 +63,10 @@ import org.apache.logging.log4j.Logger; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -95,6 +97,11 @@ public class HiveScanNode extends FileQueryScanNode { @Setter private SelectedPartitions selectedPartitions = null; + private boolean partitionInit = false; + private List prunedPartitions; + private Iterator prunedPartitionsIter; + private int numSplitsPerPartition = NUM_SPLITS_PER_PARTITION; + /** * * External file scan node for Query Hive table * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv @@ -195,14 +202,21 @@ protected List getPartitions() throws AnalysisException { } @Override - protected List getSplits() throws UserException { + public List getSplits() throws UserException { long start = System.currentTimeMillis(); try { + if (!partitionInit) { + prunedPartitions = getPartitions(); + partitionInit = true; + } HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog()); String bindBrokerName = hmsTable.getCatalog().bindBrokerName(); List allFiles = Lists.newArrayList(); - getFileSplitByPartitions(cache, getPartitions(), allFiles, bindBrokerName); + getFileSplitByPartitions(cache, prunedPartitions, allFiles, bindBrokerName); + if (ConnectContext.get().getExecutor() != null) { + ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionFilesFinishTime(); + } if (LOG.isDebugEnabled()) { LOG.debug("get #{} files for table: {}.{}, cost: {} ms", allFiles.size(), hmsTable.getDbName(), hmsTable.getName(), @@ -217,6 +231,59 @@ protected List getSplits() throws UserException { } } + @Override + public List getNextBatch(int maxBatchSize) throws UserException { + try { + HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() + .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog()); + String bindBrokerName = hmsTable.getCatalog().bindBrokerName(); + List allFiles = Lists.newArrayList(); + int numPartitions = 0; + while (allFiles.size() < maxBatchSize && prunedPartitionsIter.hasNext()) { + List partitions = new ArrayList<>(NUM_PARTITIONS_PER_LOOP); + for (int i = 0; i < NUM_PARTITIONS_PER_LOOP && prunedPartitionsIter.hasNext(); ++i) { + partitions.add(prunedPartitionsIter.next()); + numPartitions++; + } + getFileSplitByPartitions(cache, partitions, allFiles, bindBrokerName); + } + if (allFiles.size() / numPartitions > numSplitsPerPartition) { + numSplitsPerPartition = allFiles.size() / numPartitions; + } + return allFiles; + } catch (Throwable t) { + LOG.warn("get file split failed for table: {}", hmsTable.getName(), t); + throw new UserException( + "get file split failed for table: " + hmsTable.getName() + ", err: " + Util.getRootCauseMessage(t), + t); + } + } + + @Override + public boolean hasNext() { + return prunedPartitionsIter.hasNext(); + } + + @Override + public boolean isBatchMode() { + if (!partitionInit) { + try { + prunedPartitions = getPartitions(); + } catch (Exception e) { + return false; + } + prunedPartitionsIter = prunedPartitions.iterator(); + partitionInit = true; + } + int numPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode(); + return numPartitions >= 0 && prunedPartitions.size() >= numPartitions; + } + + @Override + public int numApproximateSplits() { + return numSplitsPerPartition * prunedPartitions.size(); + } + private void getFileSplitByPartitions(HiveMetaStoreCache cache, List partitions, List allFiles, String bindBrokerName) throws IOException { List fileCaches; @@ -225,9 +292,6 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List hiveFileStatuses = selectFiles(fileCaches); splitAllFiles(allFiles, hiveFileStatuses); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index 94748e7e4273fa..8dd853a48f373b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -35,6 +35,7 @@ import org.apache.doris.datasource.hudi.HudiUtils; import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TExplainLevel; @@ -68,6 +69,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; @@ -97,6 +99,14 @@ public class HudiScanNode extends HiveScanNode { private boolean incrementalRead = false; private IncrementalRelation incrementalRelation; + private boolean partitionInit = false; + private HoodieTimeline timeline; + private Option snapshotTimestamp; + private String queryInstant; + private List prunedPartitions; + private Iterator prunedPartitionsIter; + private int numSplitsPerPartition = NUM_SPLITS_PER_PARTITION; + /** * External file scan node for Query Hudi table * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv @@ -193,6 +203,22 @@ protected void doInitialize() throws UserException { } else { incrementalRelation = null; } + + timeline = hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); + if (desc.getRef().getTableSnapshot() != null) { + queryInstant = desc.getRef().getTableSnapshot().getTime(); + snapshotTimestamp = Option.of(queryInstant); + } else { + Option snapshotInstant = timeline.lastInstant(); + if (!snapshotInstant.isPresent()) { + prunedPartitions = Collections.emptyList(); + prunedPartitionsIter = prunedPartitions.iterator(); + partitionInit = true; + return; + } + queryInstant = snapshotInstant.get().getTimestamp(); + snapshotTimestamp = Option.empty(); + } } @Override @@ -300,32 +326,8 @@ private List getIncrementalSplits() { incrementalRelation.getEndTs())).collect(Collectors.toList()); } - @Override - public List getSplits() throws UserException { - if (incrementalRead && !incrementalRelation.fallbackFullTableScan()) { - return getIncrementalSplits(); - } - - HoodieTimeline timeline = hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); - String queryInstant; - Option snapshotTimestamp; - if (desc.getRef().getTableSnapshot() != null) { - queryInstant = desc.getRef().getTableSnapshot().getTime(); - snapshotTimestamp = Option.of(queryInstant); - } else { - Option snapshotInstant = timeline.lastInstant(); - if (!snapshotInstant.isPresent()) { - return Collections.emptyList(); - } - queryInstant = snapshotInstant.get().getTimestamp(); - snapshotTimestamp = Option.empty(); - } - // Non partition table will get one dummy partition - List partitions = HiveMetaStoreClientHelper.ugiDoAs( - HiveMetaStoreClientHelper.getConfiguration(hmsTable), - () -> getPrunedPartitions(hudiClient, snapshotTimestamp)); + private void getPartitionSplits(List partitions, List splits) { Executor executor = Env.getCurrentEnv().getExtMetaCacheMgr().getFileListingExecutor(); - List splits = Collections.synchronizedList(new ArrayList<>()); CountDownLatch countDownLatch = new CountDownLatch(partitions.size()); partitions.forEach(partition -> executor.execute(() -> { String globPath; @@ -370,9 +372,69 @@ public List getSplits() throws UserException { } catch (InterruptedException e) { throw new RuntimeException(e.getMessage(), e); } + } + + @Override + public List getSplits() throws UserException { + if (incrementalRead && !incrementalRelation.fallbackFullTableScan()) { + return getIncrementalSplits(); + } + if (!partitionInit) { + prunedPartitions = HiveMetaStoreClientHelper.ugiDoAs( + HiveMetaStoreClientHelper.getConfiguration(hmsTable), + () -> getPrunedPartitions(hudiClient, snapshotTimestamp)); + partitionInit = true; + } + List splits = Collections.synchronizedList(new ArrayList<>()); + getPartitionSplits(prunedPartitions, splits); + return splits; + } + + @Override + public List getNextBatch(int maxBatchSize) throws UserException { + List splits = Collections.synchronizedList(new ArrayList<>()); + int numPartitions = 0; + while (splits.size() < maxBatchSize && prunedPartitionsIter.hasNext()) { + List partitions = new ArrayList<>(NUM_PARTITIONS_PER_LOOP); + for (int i = 0; i < NUM_PARTITIONS_PER_LOOP && prunedPartitionsIter.hasNext(); ++i) { + partitions.add(prunedPartitionsIter.next()); + numPartitions++; + } + getPartitionSplits(partitions, splits); + } + if (splits.size() / numPartitions > numSplitsPerPartition) { + numSplitsPerPartition = splits.size() / numPartitions; + } return splits; } + @Override + public boolean hasNext() { + return prunedPartitionsIter.hasNext(); + } + + @Override + public boolean isBatchMode() { + if (incrementalRead && !incrementalRelation.fallbackFullTableScan()) { + return false; + } + if (!partitionInit) { + // Non partition table will get one dummy partition + prunedPartitions = HiveMetaStoreClientHelper.ugiDoAs( + HiveMetaStoreClientHelper.getConfiguration(hmsTable), + () -> getPrunedPartitions(hudiClient, snapshotTimestamp)); + prunedPartitionsIter = prunedPartitions.iterator(); + partitionInit = true; + } + int numPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode(); + return numPartitions >= 0 && prunedPartitions.size() >= numPartitions; + } + + @Override + public int numApproximateSplits() { + return numSplitsPerPartition * prunedPartitions.size(); + } + private HudiSplit generateHudiSplit(FileSlice fileSlice, List partitionValues, String queryInstant) { Optional baseFile = fileSlice.getBaseFile().toJavaOptional(); String filePath = baseFile.map(BaseFile::getPath).orElse(""); @@ -404,7 +466,11 @@ private HudiSplit generateHudiSplit(FileSlice fileSlice, List partitionV @Override public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { - return super.getNodeExplainString(prefix, detailLevel) - + String.format("%shudiNativeReadSplits=%d/%d\n", prefix, noLogsSplitNum.get(), inputSplitsNum); + if (isBatchMode()) { + return super.getNodeExplainString(prefix, detailLevel); + } else { + return super.getNodeExplainString(prefix, detailLevel) + + String.format("%shudiNativeReadSplits=%d/%d\n", prefix, noLogsSplitNum.get(), inputSplitsNum); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java index 279031d7f9b773..c17a1d3e3f0d48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java @@ -116,7 +116,7 @@ protected Map getLocationProperties() throws UserException { } @Override - protected List getSplits() throws UserException { + public List getSplits() throws UserException { List result = new ArrayList<>(); com.aliyun.odps.Table odpsTable = table.getOdpsTable(); if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index 3e25bd6e7f8d4a..b88a2438d9a574 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -42,14 +42,14 @@ import org.apache.doris.cloud.catalog.CloudPartition; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; -import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.FederationBackendPolicy; import org.apache.doris.datasource.FileScanNode; +import org.apache.doris.datasource.SplitGenerator; +import org.apache.doris.datasource.SplitSource; import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; import org.apache.doris.qe.ConnectContext; import org.apache.doris.rpc.RpcException; -import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.statistics.query.StatsDelta; import org.apache.doris.system.Backend; @@ -81,8 +81,10 @@ /** * Representation of the common elements of all scan nodes. */ -public abstract class ScanNode extends PlanNode { +public abstract class ScanNode extends PlanNode implements SplitGenerator { private static final Logger LOG = LogManager.getLogger(ScanNode.class); + protected static final int NUM_SPLITS_PER_PARTITION = 10; + protected static final int NUM_PARTITIONS_PER_LOOP = 100; protected final TupleDescriptor desc; // for distribution prunner protected Map columnFilters = Maps.newHashMap(); @@ -91,6 +93,7 @@ public abstract class ScanNode extends PlanNode { protected String sortColumn = null; protected Analyzer analyzer; protected List scanRangeLocations = Lists.newArrayList(); + protected List splitSources = Lists.newArrayList(); protected PartitionInfo partitionsInfo = null; // create a mapping between output slot's id and project expr @@ -129,10 +132,6 @@ public void setSortColumn(String column) { sortColumn = column; } - protected List getSplits() throws UserException { - throw new NotImplementedException("Scan node sub class need to implement getSplits interface."); - } - /** * cast expr to SlotDescriptor type */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 2d0628c1b6bdea..5ea41866b193af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1487,7 +1487,8 @@ public void cancel(Status cancelReason) { // Print an error stack here to know why send cancel again. LOG.warn("Query {} already in abnormal status {}, but received cancel again," + "so that send cancel to BE again", - DebugUtil.printId(queryId), queryStatus.toString(), new Exception()); + DebugUtil.printId(queryId), queryStatus.toString(), + new Exception("cancel failed")); } else { queryStatus.updateStatus(cancelReason.getErrorCode(), cancelReason.getErrorMsg()); } @@ -2477,12 +2478,16 @@ private void updateScanRangeNumByScanRange(TScanRangeParams param) { if (externalScanRange != null) { TFileScanRange fileScanRange = externalScanRange.getFileScanRange(); if (fileScanRange != null) { - scanRangeNum += fileScanRange.getRanges().size(); + if (fileScanRange.isSetRanges()) { + scanRangeNum += fileScanRange.getRanges().size(); + } else if (fileScanRange.isSetSplitSource()) { + scanRangeNum += fileScanRange.getSplitSource().getNumSplits(); + } } } TPaloScanRange paloScanRange = scanRange.getPaloScanRange(); if (paloScanRange != null) { - scanRangeNum = scanRangeNum + 1; + scanRangeNum += 1; } // TODO: more ranges? } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 768f54e3d7df59..af2b8f971484f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -422,6 +422,8 @@ public class SessionVariable implements Serializable, Writable { // Split size for ExternalFileScanNode. Default value 0 means use the block size of HDFS/S3. public static final String FILE_SPLIT_SIZE = "file_split_size"; + public static final String NUM_PARTITIONS_IN_BATCH_MODE = "num_partitions_in_batch_mode"; + /** * use insert stmt as the unified backend for all loads */ @@ -1460,6 +1462,13 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) { @VariableMgr.VarAttr(name = FILE_SPLIT_SIZE, needForward = true) public long fileSplitSize = 0; + @VariableMgr.VarAttr( + name = NUM_PARTITIONS_IN_BATCH_MODE, + description = {"如果分区数量超过阈值,BE将通过batch方式获取scan ranges", + "If the number of partitions exceeds the threshold, scan ranges will be got through batch mode."}, + needForward = true) + public int numPartitionsInBatchMode = 1024; + @VariableMgr.VarAttr( name = ENABLE_PARQUET_LAZY_MAT, description = {"控制 parquet reader 是否启用延迟物化技术。默认为 true。", @@ -2704,6 +2713,14 @@ public void setFileSplitSize(long fileSplitSize) { this.fileSplitSize = fileSplitSize; } + public int getNumPartitionsInBatchMode() { + return numPartitionsInBatchMode; + } + + public void setNumSplitsInBatchMode(int numPartitionsInBatchMode) { + this.numPartitionsInBatchMode = numPartitionsInBatchMode; + } + public boolean isEnableParquetLazyMat() { return enableParquetLazyMat; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 4948ea1f0ec4b7..927305e2b85dc1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -71,6 +71,7 @@ import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalDatabase; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.datasource.SplitSource; import org.apache.doris.insertoverwrite.InsertOverwriteManager; import org.apache.doris.insertoverwrite.InsertOverwriteUtil; import org.apache.doris.load.StreamLoadHandler; @@ -142,6 +143,8 @@ import org.apache.doris.thrift.TFetchResourceResult; import org.apache.doris.thrift.TFetchSchemaTableDataRequest; import org.apache.doris.thrift.TFetchSchemaTableDataResult; +import org.apache.doris.thrift.TFetchSplitBatchRequest; +import org.apache.doris.thrift.TFetchSplitBatchResult; import org.apache.doris.thrift.TFinishTaskRequest; import org.apache.doris.thrift.TFrontendPingFrontendRequest; import org.apache.doris.thrift.TFrontendPingFrontendResult; @@ -212,6 +215,7 @@ import org.apache.doris.thrift.TRestoreSnapshotResult; import org.apache.doris.thrift.TRollbackTxnRequest; import org.apache.doris.thrift.TRollbackTxnResult; +import org.apache.doris.thrift.TScanRangeLocations; import org.apache.doris.thrift.TSchemaTableName; import org.apache.doris.thrift.TShowProcessListRequest; import org.apache.doris.thrift.TShowProcessListResult; @@ -967,6 +971,23 @@ public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params) return QeProcessorImpl.INSTANCE.reportExecStatus(params, getClientAddr()); } + @Override + public TFetchSplitBatchResult fetchSplitBatch(TFetchSplitBatchRequest request) throws TException { + TFetchSplitBatchResult result = new TFetchSplitBatchResult(); + SplitSource splitSource = + Env.getCurrentEnv().getSplitSourceManager().getSplitSource(request.getSplitSourceId()); + if (splitSource == null) { + throw new TException("Split source " + request.getSplitSourceId() + " is released"); + } + try { + List locations = splitSource.getNextBatch(request.getMaxNumSplits()); + result.setSplits(locations); + return result; + } catch (Exception e) { + throw new TException("Failed to get split source " + request.getSplitSourceId(), e); + } + } + @Override public TMasterResult finishTask(TFinishTaskRequest request) throws TException { return masterImpl.finishTask(request); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 23cc6c19e7bd57..bd095db9dbca45 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1482,6 +1482,15 @@ struct TSyncQueryColumns { 2: optional list midPriorityColumns; } +struct TFetchSplitBatchRequest { + 1: optional i64 split_source_id + 2: optional i32 max_num_splits +} + +struct TFetchSplitBatchResult { + 1: optional list splits +} + service FrontendService { TGetDbsResult getDbNames(1: TGetDbsParams params) TGetTablesResult getTableNames(1: TGetTablesParams params) @@ -1573,4 +1582,6 @@ service FrontendService { Status.TStatus reportCommitTxnResult(1: TReportCommitTxnResultRequest request) TShowUserResult showUser(1: TShowUserRequest request) Status.TStatus syncQueryColumns(1: TSyncQueryColumns request) + + TFetchSplitBatchResult fetchSplitBatch(1: TFetchSplitBatchRequest request) } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index ba7e69a872a44f..6b0cecb914494b 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -451,6 +451,11 @@ struct TFileRangeDesc { 12: optional string fs_name } +struct TSplitSource { + 1: optional i64 split_source_id + 2: optional i32 num_splits +} + // TFileScanRange represents a set of descriptions of a file and the rules for reading and converting it. // TFileScanRangeParams: describe how to read and convert file // list: file location and range @@ -461,12 +466,12 @@ struct TFileScanRange { // file_scan_params in TExecPlanFragmentParams will always be set in query request, // and TFileScanRangeParams here is used for some other request such as fetch table schema for tvf. 2: optional TFileScanRangeParams params + 3: optional TSplitSource split_source } // Scan range for external datasource, such as file on hdfs, es datanode, etc. struct TExternalScanRange { 1: optional TFileScanRange file_scan_range - // TODO: add more scan range type? } enum TDataGenFunctionName {