From a382464bb7a2fda596e34baef2229e3c14ef47eb Mon Sep 17 00:00:00 2001 From: Jerry Hu Date: Thu, 7 Dec 2023 11:16:25 +0800 Subject: [PATCH 1/2] [feature](scan) parallel scann on dup/mow mode --- be/src/olap/iterators.h | 2 + be/src/olap/parallel_scanner_builder.cpp | 251 ++++++++++++++++++ be/src/olap/parallel_scanner_builder.h | 105 ++++++++ be/src/olap/rowset/beta_rowset_reader.cpp | 20 +- be/src/olap/rowset/beta_rowset_reader.h | 1 + be/src/olap/rowset/rowset_reader.h | 4 + .../rowset/segment_v2/segment_iterator.cpp | 4 + be/src/vec/columns/column_dictionary.h | 2 +- be/src/vec/exec/scan/new_olap_scan_node.cpp | 29 +- 9 files changed, 411 insertions(+), 7 deletions(-) create mode 100644 be/src/olap/parallel_scanner_builder.cpp create mode 100644 be/src/olap/parallel_scanner_builder.h diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h index 1f381ac4d7143e..8b61a35c2e5b2f 100644 --- a/be/src/olap/iterators.h +++ b/be/src/olap/iterators.h @@ -24,6 +24,7 @@ #include "olap/block_column_predicate.h" #include "olap/column_predicate.h" #include "olap/olap_common.h" +#include "olap/rowset/segment_v2/row_ranges.h" #include "olap/tablet_schema.h" #include "runtime/runtime_state.h" #include "vec/core/block.h" @@ -115,6 +116,7 @@ class StorageReadOptions { int32_t tablet_id = 0; // slots that cast may be eliminated in storage layer std::map target_cast_type_for_variants; + RowRanges row_ranges; }; class RowwiseIterator; diff --git a/be/src/olap/parallel_scanner_builder.cpp b/be/src/olap/parallel_scanner_builder.cpp new file mode 100644 index 00000000000000..c95dcfb8d2277e --- /dev/null +++ b/be/src/olap/parallel_scanner_builder.cpp @@ -0,0 +1,251 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parallel_scanner_builder.h" + +#include "olap/rowset/beta_rowset.h" +#include "vec/exec/scan/new_olap_scanner.h" + +namespace doris { + +using namespace vectorized; + +template +Status ParallelScannerBuilder::build_scanners(std::list& scanners) { + return _build_scanners_by_rowid(scanners); +} + +template +Status ParallelScannerBuilder::_build_scanners_by_rowid( + std::list& scanners) { + size_t total_rows = 0; + for (auto&& [tablet, _] : _tablets) { + total_rows += tablet->num_rows(); + } + + size_t rows_per_scanner = total_rows / _max_scanners_count; + rows_per_scanner = std::max(rows_per_scanner, _min_rows_per_scanner); + + for (auto&& [tablet, version] : _tablets) { + std::vector rowsets; + RETURN_IF_ERROR(tablet->capture_consistent_rowsets({0, version}, &rowsets)); + + TabletReader::ReadSource read_source; + + int64_t rows_collected = 0; + for (auto& rowset : rowsets) { + auto beta_rowset = std::dynamic_pointer_cast(rowset); + RowsetReaderSharedPtr reader; + RETURN_IF_ERROR(beta_rowset->create_reader(&reader)); + const auto rowset_id = beta_rowset->rowset_id(); + auto& segment_cache_handle = _segment_cache_handles[rowset_id]; + + RETURN_IF_ERROR(beta_rowset->load()); + RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(beta_rowset, + &segment_cache_handle, true)); + + if (beta_rowset->num_rows() == 0) { + continue; + } + + const auto& segments = segment_cache_handle.get_segments(); + int segment_start = 0; + auto split = RowSetSplits(reader->clone()); + size_t total_collected = 0; + + for (size_t i = 0; i != segments.size(); ++i) { + const auto& segment = segments[i]; + RowRanges row_ranges; + const size_t rows_of_segment = segment->num_rows(); + int64_t offset_in_segment = 0; + + while (offset_in_segment < rows_of_segment) { + const int64_t remaining_rows = rows_of_segment - offset_in_segment; + auto rows_need = rows_per_scanner - rows_collected; + if (rows_need >= remaining_rows * 0.9) { + rows_need = remaining_rows; + } + DCHECK_LE(rows_need, remaining_rows); + + auto old_count = row_ranges.count(); + row_ranges.add({offset_in_segment, + offset_in_segment + static_cast(rows_need)}); + DCHECK_EQ(rows_need + old_count, row_ranges.count()); + rows_collected += rows_need; + offset_in_segment += rows_need; + total_collected += rows_need; + + if (rows_collected >= rows_per_scanner) { // build a new scanner + split.segment_offsets.first = segment_start, + split.segment_offsets.second = i + 1; + split.segment_row_ranges.emplace_back(std::move(row_ranges)); + + DCHECK_EQ(split.segment_offsets.second - split.segment_offsets.first, + split.segment_row_ranges.size()); + + read_source.rs_splits.emplace_back(std::move(split)); + + scanners.emplace_back(_build_scanner(tablet, version, _key_ranges, + std::move(read_source))); + + read_source = TabletReader::ReadSource(); + split = RowSetSplits(reader->clone()); + row_ranges = RowRanges(); + DCHECK(row_ranges.is_empty()); + DCHECK_EQ(row_ranges.range_size(), 0); + + segment_start = offset_in_segment < rows_of_segment ? i : i + 1; + rows_collected = 0; + } + } + + if (!row_ranges.is_empty()) { + DCHECK_GT(rows_collected, 0); + DCHECK_EQ(row_ranges.to(), segment->num_rows()); + split.segment_row_ranges.emplace_back(std::move(row_ranges)); + } + } + DCHECK_EQ(total_collected, beta_rowset->num_rows()); + + DCHECK_LE(rows_collected, rows_per_scanner); + if (rows_collected > 0) { + split.segment_offsets.first = segment_start; + split.segment_offsets.second = segments.size(); + DCHECK_GT(split.segment_offsets.second, split.segment_offsets.first); + DCHECK_EQ(split.segment_row_ranges.size(), + split.segment_offsets.second - split.segment_offsets.first); + read_source.rs_splits.emplace_back(std::move(split)); + } + } // end `for (auto& rowset : rowsets)` + + DCHECK_LE(rows_collected, rows_per_scanner); + if (rows_collected > 0) { + DCHECK_GT(read_source.rs_splits.size(), 0); + for (auto& split : read_source.rs_splits) { + DCHECK(split.rs_reader != nullptr); + DCHECK_LT(split.segment_offsets.first, split.segment_offsets.second); + DCHECK_EQ(split.segment_row_ranges.size(), + split.segment_offsets.second - split.segment_offsets.first); + } + scanners.emplace_back( + _build_scanner(tablet, version, _key_ranges, std::move(read_source))); + } + } + + return Status::OK(); +} +template +Status ParallelScannerBuilder::_load() { + for (auto&& [tablet, version] : _tablets) { + const auto tablet_id = tablet->table_id(); + auto& rowsets = _all_rowsets[tablet_id]; + RETURN_IF_ERROR(tablet->capture_consistent_rowsets({0, version}, &rowsets)); + + for (auto& rowset : rowsets) { + RETURN_IF_ERROR(rowset->load()); + const auto rowset_id = rowset->rowset_id(); + auto& segment_cache_handle = _segment_cache_handles[rowset_id]; + + RETURN_IF_ERROR(SegmentLoader::instance()->load_segments( + std::dynamic_pointer_cast(rowset), &segment_cache_handle, true)); + } + } +} + +template +std::unique_ptr +ParallelScannerBuilder::_create_segment_group_from_rowsets( + const std::vector& rowsets) { + if (rowsets.empty()) { + return {}; + } + + size_t max_rows = 0; + for (const auto& rowset : rowsets) { + const auto rowset_id = rowset->rowset_id(); + DCHECK(_segment_cache_handles.contains(rowset_id)); + auto& cache_handle = _segment_cache_handles[rowset_id]; + auto& segments = cache_handle.get_segments(); + + if (segments.empty()) { + continue; + } + + if (rowset->is_segments_overlapping()) { + auto* largest_segment = segments[0].get(); + for (size_t i = 1; i != segments.size(); ++i) { + if (segments[i]->num_rows() > largest_segment->num_rows()) { + largest_segment = segments[i].get(); + } + } + + if (largest_segment->num_rows() > max_rows) { + + } + } + } + + auto get_max_rows = [this](Rowset* rowset) -> uint32_t { + const auto rowset_id = rowset->rowset_id(); + DCHECK(_segment_cache_handles.contains(rowset_id)); + auto& cache_handle = _segment_cache_handles[rowset_id]; + + auto& segments = cache_handle.get_segments(); + if (segments.empty()) { + return 0; + } + + if (rowset->is_segments_overlapping()) { + auto* largest_segment = segments[0].get(); + for (size_t i = 1; i != segments.size(); ++i) { + if (segments[i]->num_rows() > largest_segment->num_rows()) { + largest_segment = segments[i].get(); + } + } + return largest_segment->num_rows(); + } + + return rowset->num_rows(); + }; + + auto* biggest_rowset = rowsets[0].get(); + for (size_t i = 1; i != rowsets.size(); ++i) { + if (rowsets[i]->num_rows() > biggest_rowset->num_rows()) { + biggest_rowset = rowsets[i].get(); + } + } +} + + template +std::shared_ptr ParallelScannerBuilder::_build_scanner( + BaseTabletSPtr tablet, int64_t version, const std::vector& key_ranges, + TabletReader::ReadSource&& read_source) { + NewOlapScanner::Params params { + _state, _scanner_profile.get(), key_ranges, std::move(tablet), + version, std::move(read_source), _limit_per_scanner, _is_preaggregation}; + return NewOlapScanner::create_shared(_parent, std::move(params)); +} + +template +Status ParallelScannerBuilder::_build_scanners_by_key_range( + std::list& scanners) { + return Status::OK(); +} + +template class ParallelScannerBuilder; + +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/parallel_scanner_builder.h b/be/src/olap/parallel_scanner_builder.h new file mode 100644 index 00000000000000..ea8931d7ab682e --- /dev/null +++ b/be/src/olap/parallel_scanner_builder.h @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "olap/reader.h" +#include "olap/rowset/segment_v2/row_ranges.h" +#include "olap/segment_loader.h" +#include "olap/tablet.h" +#include "vec/exec/scan/new_olap_scanner.h" + +namespace doris { + +namespace vectorized { +class VScanner; +} + +using TabletSPtr = std::shared_ptr; +using VScannerSPtr = std::shared_ptr; + +struct TabletWithVersion { + TabletSPtr tablet; + int64_t version; +}; + +struct SegmentGroup { +private: + std::vector _segments; +}; + +template +class ParallelScannerBuilder { +public: + ParallelScannerBuilder(ParentType* parent, const std::vector& tablets, + const std::shared_ptr& profile, + const std::vector& key_ranges, RuntimeState* state, + int64_t limit_per_scanner, bool is_preaggregation) + : _parent(parent), + _scanner_profile(profile), + _state(state), + _limit_per_scanner(limit_per_scanner), + _is_preaggregation(is_preaggregation), + _tablets(tablets.cbegin(), tablets.cend()), + _key_ranges(key_ranges.cbegin(), key_ranges.cend()) {} + + Status build_scanners(std::list& scanners); + + void set_max_scanners_count(size_t count) { _max_scanners_count = count; } + +private: + Status _load(); + + Status _build_scanners_by_rowid(std::list& scanners); + + Status _build_scanners_by_key_range(std::list& scanners); + + std::unique_ptr _create_segment_group_from_rowsets(const std::vector& rowsets); + + std::vector _load_segments_of_rowset(const RowsetSharedPtr& rowset); + + Status _parse_segment_group(const SegmentGroup& group, size_t& index_count, + std::string& min_value, std::string& max_value); + + std::shared_ptr _build_scanner( + BaseTabletSPtr tablet, int64_t version, const std::vector& key_ranges, + TabletReader::ReadSource&& read_source); + + ParentType* _parent; + + /// Max scanners count limit to build + size_t _max_scanners_count {16}; + + /// Min rows per scanner + size_t _min_rows_per_scanner {16384}; + + std::map _segment_cache_handles; + + std::shared_ptr _scanner_profile; + RuntimeState* _state; + int64_t _limit_per_scanner; + bool _is_preaggregation; + std::vector _tablets; + std::vector _key_ranges; + std::unordered_map> _all_rowsets; +}; + +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index 18725b116d1934..81d36b6dbb537e 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -237,10 +237,21 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context for (int i = seg_start; i < seg_end; i++) { auto& seg_ptr = segments[i]; std::unique_ptr iter; - auto s = seg_ptr->new_iterator(_input_schema, _read_options, &iter); - if (!s.ok()) { - LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); - return Status::Error(s.to_string()); + Status status; + if (_segment_row_ranges.empty()) { + _read_options.row_ranges = RowRanges::create_single(seg_ptr->num_rows()); + status = seg_ptr->new_iterator(_input_schema, _read_options, &iter); + } else { + DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size()); + auto local_options = _read_options; + local_options.row_ranges = _segment_row_ranges[i - seg_start]; + status = seg_ptr->new_iterator(_input_schema, std::move(local_options), &iter); + } + + if (!status.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() + << "]: " << status.to_string(); + return Status::Error(status.to_string()); } if (iter->empty()) { continue; @@ -255,6 +266,7 @@ Status BetaRowsetReader::init(RowsetReaderContext* read_context, const RowSetSpl _read_context = read_context; _read_context->rowset_id = _rowset->rowset_id(); _segment_offsets = rs_splits.segment_offsets; + _segment_row_ranges = rs_splits.segment_row_ranges; return Status::OK(); } diff --git a/be/src/olap/rowset/beta_rowset_reader.h b/be/src/olap/rowset/beta_rowset_reader.h index 45aef035950278..3834f2f10bb946 100644 --- a/be/src/olap/rowset/beta_rowset_reader.h +++ b/be/src/olap/rowset/beta_rowset_reader.h @@ -96,6 +96,7 @@ class BetaRowsetReader : public RowsetReader { DorisCallOnce _init_iter_once; std::pair _segment_offsets; + std::vector _segment_row_ranges; SchemaSPtr _input_schema; RowsetReaderContext* _read_context = nullptr; diff --git a/be/src/olap/rowset/rowset_reader.h b/be/src/olap/rowset/rowset_reader.h index 3a93888be90419..45449952431bd6 100644 --- a/be/src/olap/rowset/rowset_reader.h +++ b/be/src/olap/rowset/rowset_reader.h @@ -25,6 +25,7 @@ #include "olap/iterators.h" #include "olap/rowset/rowset_fwd.h" #include "olap/rowset/rowset_reader_context.h" +#include "olap/rowset/segment_v2/row_ranges.h" #include "vec/core/block.h" namespace doris { @@ -41,6 +42,9 @@ struct RowSetSplits { // and pipeline std::pair segment_offsets; + // RowRanges of each segment. + std::vector segment_row_ranges; + RowSetSplits(RowsetReaderSharedPtr rs_reader_) : rs_reader(rs_reader_), segment_offsets({0, 0}) {} RowSetSplits() = default; diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index f85ed5abce5982..ae9433bcde5bcf 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -381,6 +381,10 @@ Status SegmentIterator::_lazy_init() { << _opts.delete_bitmap.at(segment_id())->cardinality() << ", " << _opts.stats->rows_del_by_bitmap << " rows deleted by bitmap"; } + + if (!_opts.row_ranges.is_empty()) { + _row_bitmap &= RowRanges::ranges_to_roaring(_opts.row_ranges); + } if (_opts.read_orderby_key_reverse) { _range_iter.reset(new BackwardBitmapRangeIterator(_row_bitmap)); } else { diff --git a/be/src/vec/columns/column_dictionary.h b/be/src/vec/columns/column_dictionary.h index 06e0a352be2c1a..6de729fd2de7e0 100644 --- a/be/src/vec/columns/column_dictionary.h +++ b/be/src/vec/columns/column_dictionary.h @@ -207,7 +207,7 @@ class ColumnDictionary final : public COWHelper> { } Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* col_ptr) override { - auto* res_col = reinterpret_cast(col_ptr); + auto* res_col = assert_cast(col_ptr); StringRef strings[sel_size]; size_t length = 0; for (size_t i = 0; i != sel_size; ++i) { diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index 38fd97f2413688..b511e12ec8235b 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -40,9 +40,10 @@ #include "common/status.h" #include "exec/exec_node.h" #include "olap/rowset/rowset.h" +#include "olap/parallel_scanner_builder.h" +#include "olap/reader.h" #include "olap/rowset/rowset_reader.h" #include "olap/storage_engine.h" -#include "olap/tablet.h" #include "olap/tablet_manager.h" #include "olap/tablet_reader.h" #include "runtime/decimalv2_value.h" @@ -50,7 +51,6 @@ #include "runtime/query_statistics.h" #include "runtime/runtime_state.h" #include "runtime/types.h" -#include "service/backend_options.h" #include "util/time.h" #include "util/to_string.h" #include "vec/columns/column.h" @@ -518,14 +518,30 @@ Status NewOlapScanNode::_init_scanners(std::list* scanners) { std::vector> tablets_to_scan; tablets_to_scan.reserve(_scan_ranges.size()); + std::vector tablets; + for (auto&& scan_range : _scan_ranges) { auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id)); int64_t version = 0; std::from_chars(scan_range->version.data(), scan_range->version.data() + scan_range->version.size(), version); + tablets.emplace_back( + TabletWithVersion {std::dynamic_pointer_cast(tablet), version}); tablets_to_scan.emplace_back(std::move(tablet), version); } + std::vector key_ranges; + for (auto& range : _cond_ranges) { + key_ranges.emplace_back(range.get()); + } + ParallelScannerBuilder scanner_builder(this, tablets, _scanner_profile, + key_ranges, _state, _limit_per_scanner, + _olap_scan_node.is_preaggregation); + + if (_shared_scan_opt) { + scanner_builder.set_max_scanners_count(config::doris_scanner_thread_pool_thread_num); + } + // Split tablet segment by scanner, only use in pipeline in duplicate key // 1. if tablet count lower than scanner thread num, count segment num of all tablet ready for scan // TODO: some tablet may do not have segment, may need split segment all case @@ -583,6 +599,14 @@ Status NewOlapScanNode::_init_scanners(std::list* scanners) { }; if (is_dup_mow_key) { + RETURN_IF_ERROR(scanner_builder.build_scanners(*scanners)); + for (auto& scanner : *scanners) { + auto* olap_scanner = assert_cast(scanner.get()); + RETURN_IF_ERROR(olap_scanner->prepare(_state, _conjuncts)); + olap_scanner->set_compound_filters(_compound_filters); + } + LOG(INFO) << "segment count: " << segment_count << ", scanners count: " << scanners->size(); +#if 0 // 2. Split segment evenly to each scanner (e.g. each scanner need to scan `avg_segment_count_per_scanner` segments) const auto avg_segment_count_by_scanner = std::max(segment_count / config::doris_scanner_thread_pool_thread_num, (size_t)1); @@ -664,6 +688,7 @@ Status NewOlapScanNode::_init_scanners(std::list* scanners) { {std::move(rs_splits), read_source.delete_predicates})); } } +#endif } else { for (auto&& [tablet, version] : tablets_to_scan) { std::vector>* ranges = &_cond_ranges; From 7ef1f9ec7d7d7a26222216e9aea30db88bb785b1 Mon Sep 17 00:00:00 2001 From: Jerry Hu Date: Mon, 25 Dec 2023 15:16:22 +0800 Subject: [PATCH 2/2] fix bugs --- be/src/olap/parallel_scanner_builder.cpp | 148 ++++++------------ be/src/olap/parallel_scanner_builder.h | 27 ++-- be/src/olap/rowset/beta_rowset_reader.cpp | 6 +- be/src/runtime/runtime_state.h | 16 ++ be/src/vec/exec/scan/new_olap_scan_node.cpp | 66 +++++--- .../org/apache/doris/qe/SessionVariable.java | 34 +++- .../doris/planner/ColocatePlanTest.java | 1 + gensrc/thrift/PaloInternalService.thrift | 6 + .../performance_p0/redundant_conjuncts.groovy | 4 +- 9 files changed, 164 insertions(+), 144 deletions(-) diff --git a/be/src/olap/parallel_scanner_builder.cpp b/be/src/olap/parallel_scanner_builder.cpp index c95dcfb8d2277e..32b163b7a1a29a 100644 --- a/be/src/olap/parallel_scanner_builder.cpp +++ b/be/src/olap/parallel_scanner_builder.cpp @@ -26,23 +26,29 @@ using namespace vectorized; template Status ParallelScannerBuilder::build_scanners(std::list& scanners) { - return _build_scanners_by_rowid(scanners); + RETURN_IF_ERROR(_load()); + if (_is_dup_mow_key) { + return _build_scanners_by_rowid(scanners); + } else { + // TODO: support to split by key range + return Status::NotSupported("split by key range not supported yet."); + } } template Status ParallelScannerBuilder::_build_scanners_by_rowid( std::list& scanners) { - size_t total_rows = 0; - for (auto&& [tablet, _] : _tablets) { - total_rows += tablet->num_rows(); - } - - size_t rows_per_scanner = total_rows / _max_scanners_count; - rows_per_scanner = std::max(rows_per_scanner, _min_rows_per_scanner); - + DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner); for (auto&& [tablet, version] : _tablets) { - std::vector rowsets; - RETURN_IF_ERROR(tablet->capture_consistent_rowsets({0, version}, &rowsets)); + DCHECK(_all_rowsets.contains(tablet->tablet_id())); + auto& rowsets = _all_rowsets[tablet->tablet_id()]; + + TabletReader::ReadSource reade_source_with_delete_info; + if (!_state->skip_delete_predicate()) { + RETURN_IF_ERROR(tablet->capture_rs_readers( + {0, version}, &reade_source_with_delete_info.rs_splits, false)); + reade_source_with_delete_info.fill_delete_predicates(); + } TabletReader::ReadSource read_source; @@ -52,11 +58,9 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid( RowsetReaderSharedPtr reader; RETURN_IF_ERROR(beta_rowset->create_reader(&reader)); const auto rowset_id = beta_rowset->rowset_id(); - auto& segment_cache_handle = _segment_cache_handles[rowset_id]; - RETURN_IF_ERROR(beta_rowset->load()); - RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(beta_rowset, - &segment_cache_handle, true)); + DCHECK(_segment_cache_handles.contains(rowset_id)); + auto& segment_cache_handle = _segment_cache_handles[rowset_id]; if (beta_rowset->num_rows() == 0) { continue; @@ -65,7 +69,6 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid( const auto& segments = segment_cache_handle.get_segments(); int segment_start = 0; auto split = RowSetSplits(reader->clone()); - size_t total_collected = 0; for (size_t i = 0; i != segments.size(); ++i) { const auto& segment = segments[i]; @@ -73,23 +76,25 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid( const size_t rows_of_segment = segment->num_rows(); int64_t offset_in_segment = 0; + // try to split large segments into RowRanges while (offset_in_segment < rows_of_segment) { const int64_t remaining_rows = rows_of_segment - offset_in_segment; - auto rows_need = rows_per_scanner - rows_collected; + auto rows_need = _rows_per_scanner - rows_collected; + + // 0.9: try to avoid splitting the segments into excessively small parts. if (rows_need >= remaining_rows * 0.9) { rows_need = remaining_rows; } DCHECK_LE(rows_need, remaining_rows); - auto old_count = row_ranges.count(); + // RowRange stands for range: [From, To), From is inclusive, To is exclusive. row_ranges.add({offset_in_segment, offset_in_segment + static_cast(rows_need)}); - DCHECK_EQ(rows_need + old_count, row_ranges.count()); rows_collected += rows_need; offset_in_segment += rows_need; - total_collected += rows_need; - if (rows_collected >= rows_per_scanner) { // build a new scanner + // If collected enough rows, build a new scanner + if (rows_collected >= _rows_per_scanner) { split.segment_offsets.first = segment_start, split.segment_offsets.second = i + 1; split.segment_row_ranges.emplace_back(std::move(row_ranges)); @@ -99,29 +104,29 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid( read_source.rs_splits.emplace_back(std::move(split)); - scanners.emplace_back(_build_scanner(tablet, version, _key_ranges, - std::move(read_source))); + scanners.emplace_back( + _build_scanner(tablet, version, _key_ranges, + {std::move(read_source.rs_splits), + reade_source_with_delete_info.delete_predicates})); read_source = TabletReader::ReadSource(); split = RowSetSplits(reader->clone()); row_ranges = RowRanges(); - DCHECK(row_ranges.is_empty()); - DCHECK_EQ(row_ranges.range_size(), 0); segment_start = offset_in_segment < rows_of_segment ? i : i + 1; rows_collected = 0; } } + // The non-empty `row_ranges` means there are some rows left in this segment not added into `split`. if (!row_ranges.is_empty()) { DCHECK_GT(rows_collected, 0); DCHECK_EQ(row_ranges.to(), segment->num_rows()); split.segment_row_ranges.emplace_back(std::move(row_ranges)); } } - DCHECK_EQ(total_collected, beta_rowset->num_rows()); - DCHECK_LE(rows_collected, rows_per_scanner); + DCHECK_LE(rows_collected, _rows_per_scanner); if (rows_collected > 0) { split.segment_offsets.first = segment_start; split.segment_offsets.second = segments.size(); @@ -132,26 +137,35 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid( } } // end `for (auto& rowset : rowsets)` - DCHECK_LE(rows_collected, rows_per_scanner); + DCHECK_LE(rows_collected, _rows_per_scanner); if (rows_collected > 0) { DCHECK_GT(read_source.rs_splits.size(), 0); +#ifndef NDEBUG for (auto& split : read_source.rs_splits) { DCHECK(split.rs_reader != nullptr); DCHECK_LT(split.segment_offsets.first, split.segment_offsets.second); DCHECK_EQ(split.segment_row_ranges.size(), split.segment_offsets.second - split.segment_offsets.first); } +#endif scanners.emplace_back( - _build_scanner(tablet, version, _key_ranges, std::move(read_source))); + _build_scanner(tablet, version, _key_ranges, + {std::move(read_source.rs_splits), + reade_source_with_delete_info.delete_predicates})); } } return Status::OK(); } + +/** + * Load rowsets of each tablet with specified version, segments of each rowset. + */ template Status ParallelScannerBuilder::_load() { + _total_rows = 0; for (auto&& [tablet, version] : _tablets) { - const auto tablet_id = tablet->table_id(); + const auto tablet_id = tablet->tablet_id(); auto& rowsets = _all_rowsets[tablet_id]; RETURN_IF_ERROR(tablet->capture_consistent_rowsets({0, version}, &rowsets)); @@ -162,75 +176,17 @@ Status ParallelScannerBuilder::_load() { RETURN_IF_ERROR(SegmentLoader::instance()->load_segments( std::dynamic_pointer_cast(rowset), &segment_cache_handle, true)); + _total_rows += rowset->num_rows(); } } -} -template -std::unique_ptr -ParallelScannerBuilder::_create_segment_group_from_rowsets( - const std::vector& rowsets) { - if (rowsets.empty()) { - return {}; - } - - size_t max_rows = 0; - for (const auto& rowset : rowsets) { - const auto rowset_id = rowset->rowset_id(); - DCHECK(_segment_cache_handles.contains(rowset_id)); - auto& cache_handle = _segment_cache_handles[rowset_id]; - auto& segments = cache_handle.get_segments(); - - if (segments.empty()) { - continue; - } + _rows_per_scanner = _total_rows / _max_scanners_count; + _rows_per_scanner = std::max(_rows_per_scanner, _min_rows_per_scanner); - if (rowset->is_segments_overlapping()) { - auto* largest_segment = segments[0].get(); - for (size_t i = 1; i != segments.size(); ++i) { - if (segments[i]->num_rows() > largest_segment->num_rows()) { - largest_segment = segments[i].get(); - } - } - - if (largest_segment->num_rows() > max_rows) { - - } - } - } - - auto get_max_rows = [this](Rowset* rowset) -> uint32_t { - const auto rowset_id = rowset->rowset_id(); - DCHECK(_segment_cache_handles.contains(rowset_id)); - auto& cache_handle = _segment_cache_handles[rowset_id]; - - auto& segments = cache_handle.get_segments(); - if (segments.empty()) { - return 0; - } - - if (rowset->is_segments_overlapping()) { - auto* largest_segment = segments[0].get(); - for (size_t i = 1; i != segments.size(); ++i) { - if (segments[i]->num_rows() > largest_segment->num_rows()) { - largest_segment = segments[i].get(); - } - } - return largest_segment->num_rows(); - } - - return rowset->num_rows(); - }; - - auto* biggest_rowset = rowsets[0].get(); - for (size_t i = 1; i != rowsets.size(); ++i) { - if (rowsets[i]->num_rows() > biggest_rowset->num_rows()) { - biggest_rowset = rowsets[i].get(); - } - } + return Status::OK(); } - template +template std::shared_ptr ParallelScannerBuilder::_build_scanner( BaseTabletSPtr tablet, int64_t version, const std::vector& key_ranges, TabletReader::ReadSource&& read_source) { @@ -240,12 +196,6 @@ std::shared_ptr ParallelScannerBuilder::_build_scann return NewOlapScanner::create_shared(_parent, std::move(params)); } -template -Status ParallelScannerBuilder::_build_scanners_by_key_range( - std::list& scanners) { - return Status::OK(); -} - template class ParallelScannerBuilder; } // namespace doris \ No newline at end of file diff --git a/be/src/olap/parallel_scanner_builder.h b/be/src/olap/parallel_scanner_builder.h index ea8931d7ab682e..b9d659abc27e30 100644 --- a/be/src/olap/parallel_scanner_builder.h +++ b/be/src/olap/parallel_scanner_builder.h @@ -21,7 +21,6 @@ #include #include -#include "olap/reader.h" #include "olap/rowset/segment_v2/row_ranges.h" #include "olap/segment_loader.h" #include "olap/tablet.h" @@ -41,22 +40,18 @@ struct TabletWithVersion { int64_t version; }; -struct SegmentGroup { -private: - std::vector _segments; -}; - template class ParallelScannerBuilder { public: ParallelScannerBuilder(ParentType* parent, const std::vector& tablets, const std::shared_ptr& profile, const std::vector& key_ranges, RuntimeState* state, - int64_t limit_per_scanner, bool is_preaggregation) + int64_t limit_per_scanner, bool is_dup_mow_key, bool is_preaggregation) : _parent(parent), _scanner_profile(profile), _state(state), _limit_per_scanner(limit_per_scanner), + _is_dup_mow_key(is_dup_mow_key), _is_preaggregation(is_preaggregation), _tablets(tablets.cbegin(), tablets.cend()), _key_ranges(key_ranges.cbegin(), key_ranges.cend()) {} @@ -65,20 +60,13 @@ class ParallelScannerBuilder { void set_max_scanners_count(size_t count) { _max_scanners_count = count; } + void set_min_rows_per_scanner(int64_t size) { _min_rows_per_scanner = size; } + private: Status _load(); Status _build_scanners_by_rowid(std::list& scanners); - Status _build_scanners_by_key_range(std::list& scanners); - - std::unique_ptr _create_segment_group_from_rowsets(const std::vector& rowsets); - - std::vector _load_segments_of_rowset(const RowsetSharedPtr& rowset); - - Status _parse_segment_group(const SegmentGroup& group, size_t& index_count, - std::string& min_value, std::string& max_value); - std::shared_ptr _build_scanner( BaseTabletSPtr tablet, int64_t version, const std::vector& key_ranges, TabletReader::ReadSource&& read_source); @@ -89,13 +77,18 @@ class ParallelScannerBuilder { size_t _max_scanners_count {16}; /// Min rows per scanner - size_t _min_rows_per_scanner {16384}; + size_t _min_rows_per_scanner {2 * 1024 * 1024}; + + size_t _total_rows {}; + + size_t _rows_per_scanner {_min_rows_per_scanner}; std::map _segment_cache_handles; std::shared_ptr _scanner_profile; RuntimeState* _state; int64_t _limit_per_scanner; + bool _is_dup_mow_key; bool _is_preaggregation; std::vector _tablets; std::vector _key_ranges; diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index 81d36b6dbb537e..568fa89d848b81 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -238,14 +238,16 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context auto& seg_ptr = segments[i]; std::unique_ptr iter; Status status; + + /// If `_segment_row_ranges` is empty, the segment is not split. if (_segment_row_ranges.empty()) { - _read_options.row_ranges = RowRanges::create_single(seg_ptr->num_rows()); + _read_options.row_ranges.clear(); status = seg_ptr->new_iterator(_input_schema, _read_options, &iter); } else { DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size()); auto local_options = _read_options; local_options.row_ranges = _segment_row_ranges[i - seg_start]; - status = seg_ptr->new_iterator(_input_schema, std::move(local_options), &iter); + status = seg_ptr->new_iterator(_input_schema, local_options, &iter); } if (!status.ok()) { diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 6ced139eb4c8b8..ee515007eb604c 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -488,6 +488,22 @@ class RuntimeState { _query_options.enable_hash_join_early_start_probe; } + bool enable_parallel_scan() const { + return _query_options.__isset.enable_parallel_scan && _query_options.enable_parallel_scan; + } + + int parallel_scan_max_scanners_count() const { + return _query_options.__isset.parallel_scan_max_scanners_count + ? _query_options.parallel_scan_max_scanners_count + : 0; + } + + int64_t parallel_scan_min_rows_per_scanner() const { + return _query_options.__isset.parallel_scan_min_rows_per_scanner + ? _query_options.parallel_scan_min_rows_per_scanner + : 0; + } + int repeat_max_num() const { #ifndef BE_TEST if (!_query_options.__isset.repeat_max_num) { diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index b511e12ec8235b..b33c656a8df409 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -39,14 +39,11 @@ #include "common/object_pool.h" #include "common/status.h" #include "exec/exec_node.h" -#include "olap/rowset/rowset.h" #include "olap/parallel_scanner_builder.h" -#include "olap/reader.h" #include "olap/rowset/rowset_reader.h" #include "olap/storage_engine.h" #include "olap/tablet_manager.h" #include "olap/tablet_reader.h" -#include "runtime/decimalv2_value.h" #include "runtime/define_primitive_type.h" #include "runtime/query_statistics.h" #include "runtime/runtime_state.h" @@ -530,17 +527,7 @@ Status NewOlapScanNode::_init_scanners(std::list* scanners) { tablets_to_scan.emplace_back(std::move(tablet), version); } - std::vector key_ranges; - for (auto& range : _cond_ranges) { - key_ranges.emplace_back(range.get()); - } - ParallelScannerBuilder scanner_builder(this, tablets, _scanner_profile, - key_ranges, _state, _limit_per_scanner, - _olap_scan_node.is_preaggregation); - - if (_shared_scan_opt) { - scanner_builder.set_max_scanners_count(config::doris_scanner_thread_pool_thread_num); - } + bool enable_parallel_scan = _state->enable_parallel_scan(); // Split tablet segment by scanner, only use in pipeline in duplicate key // 1. if tablet count lower than scanner thread num, count segment num of all tablet ready for scan @@ -578,6 +565,48 @@ Status NewOlapScanNode::_init_scanners(std::list* scanners) { } } + bool has_cpu_limit = _state->query_options().__isset.resource_limit && + _state->query_options().resource_limit.__isset.cpu_limit; + + if (enable_parallel_scan && is_dup_mow_key && !has_cpu_limit && !should_run_serial() && + _push_down_agg_type == TPushAggOp::NONE) { + std::vector key_ranges; + for (auto& range : _cond_ranges) { + if (range->begin_scan_range.size() == 1 && + range->begin_scan_range.get_value(0) == NEGATIVE_INFINITY) { + continue; + } + key_ranges.emplace_back(range.get()); + } + + ParallelScannerBuilder scanner_builder( + this, tablets, _scanner_profile, key_ranges, _state, _limit_per_scanner, + is_dup_mow_key, _olap_scan_node.is_preaggregation); + + int max_scanners_count = _state->parallel_scan_max_scanners_count(); + + // If the `max_scanners_count` was not set, + // use `config::doris_scanner_thread_pool_thread_num` as the default value. + if (max_scanners_count <= 0) { + max_scanners_count = config::doris_scanner_thread_pool_thread_num; + } + + // Too small value of `min_rows_per_scanner` is meaningless. + auto min_rows_per_scanner = + std::max(1024, _state->parallel_scan_min_rows_per_scanner()); + scanner_builder.set_max_scanners_count(max_scanners_count); + scanner_builder.set_min_rows_per_scanner(min_rows_per_scanner); + + RETURN_IF_ERROR(scanner_builder.build_scanners(*scanners)); + for (auto& scanner : *scanners) { + auto* olap_scanner = assert_cast(scanner.get()); + RETURN_IF_ERROR(olap_scanner->prepare(_state, _conjuncts)); + olap_scanner->set_compound_filters(_compound_filters); + } + LOG(INFO) << "segment count: " << segment_count << ", scanners count: " << scanners->size(); + return Status::OK(); + } + auto build_new_scanner = [&](BaseTabletSPtr tablet, int64_t version, const std::vector& key_ranges, TabletReader::ReadSource read_source) { @@ -599,14 +628,6 @@ Status NewOlapScanNode::_init_scanners(std::list* scanners) { }; if (is_dup_mow_key) { - RETURN_IF_ERROR(scanner_builder.build_scanners(*scanners)); - for (auto& scanner : *scanners) { - auto* olap_scanner = assert_cast(scanner.get()); - RETURN_IF_ERROR(olap_scanner->prepare(_state, _conjuncts)); - olap_scanner->set_compound_filters(_compound_filters); - } - LOG(INFO) << "segment count: " << segment_count << ", scanners count: " << scanners->size(); -#if 0 // 2. Split segment evenly to each scanner (e.g. each scanner need to scan `avg_segment_count_per_scanner` segments) const auto avg_segment_count_by_scanner = std::max(segment_count / config::doris_scanner_thread_pool_thread_num, (size_t)1); @@ -688,7 +709,6 @@ Status NewOlapScanNode::_init_scanners(std::list* scanners) { {std::move(rs_splits), read_source.delete_predicates})); } } -#endif } else { for (auto&& [tablet, version] : tablets_to_scan) { std::vector>* ranges = &_cond_ranges; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 332e0f5bfaa8f7..55e8bf2127bc3a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -224,6 +224,14 @@ public class SessionVariable implements Serializable, Writable { public static final String IGNORE_STORAGE_DATA_DISTRIBUTION = "ignore_storage_data_distribution"; + public static final String ENABLE_PARALLEL_SCAN = "enable_parallel_scan"; + + // Limit the max count of scanners to prevent generate too many scanners. + public static final String PARALLEL_SCAN_MAX_SCANNERS_COUNT = "parallel_scan_max_scanners_count"; + + // Avoid splitting small segments, each scanner should scan `parallel_scan_min_rows_per_scanner` rows. + public static final String PARALLEL_SCAN_MIN_ROWS_PER_SCANNER = "parallel_scan_min_rows_per_scanner"; + public static final String ENABLE_LOCAL_SHUFFLE = "enable_local_shuffle"; public static final String ENABLE_AGG_STATE = "enable_agg_state"; @@ -784,7 +792,19 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_SHARED_SCAN, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, needForward = true) - private boolean enableSharedScan = false; + private boolean enableSharedScan = true; + + @VariableMgr.VarAttr(name = ENABLE_PARALLEL_SCAN, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, + needForward = true) + private boolean enableParallelScan = true; + + @VariableMgr.VarAttr(name = PARALLEL_SCAN_MAX_SCANNERS_COUNT, fuzzy = false, + varType = VariableAnnotation.EXPERIMENTAL, needForward = true) + private int parallelScanMaxScannersCount = 48; + + @VariableMgr.VarAttr(name = PARALLEL_SCAN_MIN_ROWS_PER_SCANNER, fuzzy = false, + varType = VariableAnnotation.EXPERIMENTAL, needForward = true) + private long parallelScanMinRowsPerScanner = 65536; // 2MB @VariableMgr.VarAttr(name = IGNORE_STORAGE_DATA_DISTRIBUTION, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, needForward = true) @@ -2746,6 +2766,10 @@ public TQueryOptions toThrift() { tResult.setInvertedIndexSkipThreshold(invertedIndexSkipThreshold); + tResult.setEnableParallelScan(enableParallelScan); + tResult.setParallelScanMaxScannersCount(parallelScanMaxScannersCount); + tResult.setParallelScanMinRowsPerScanner(parallelScanMinRowsPerScanner); + return tResult; } @@ -3059,6 +3083,14 @@ public boolean getEnableSharedScan() { return enableSharedScan; } + public void setEnableSharedScan(boolean value) { + enableSharedScan = value; + } + + public boolean getEnableParallelScan() { + return enableParallelScan; + } + public boolean getEnablePipelineXEngine() { return enablePipelineXEngine; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java index 6b976e1f273d8d..c5275c20f0d6f7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java @@ -169,6 +169,7 @@ public void sqlAggWithColocateTable() throws Exception { @Test public void checkColocatePlanFragment() throws Exception { + connectContext.getSessionVariable().setEnableSharedScan(false); String sql = "select /*+ SET_VAR(enable_nereids_planner=false) */ a.k1 from db1.test_colocate a, db1.test_colocate b where a.k1=b.k1 and a.k2=b.k2 group by a.k1;"; StmtExecutor executor = getSqlStmtExecutor(sql); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index b1ce326c826db0..e07797b593d06c 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -263,6 +263,12 @@ struct TQueryOptions { 93: optional i32 inverted_index_max_expansions = 50; 94: optional i32 inverted_index_skip_threshold = 50; + + 95: optional bool enable_parallel_scan = false; + + 96: optional i32 parallel_scan_max_scanners_count = 0; + + 97: optional i64 parallel_scan_min_rows_per_scanner = 0; } diff --git a/regression-test/suites/performance_p0/redundant_conjuncts.groovy b/regression-test/suites/performance_p0/redundant_conjuncts.groovy index 9d81b5f91ba5b9..4aec6275dc356c 100644 --- a/regression-test/suites/performance_p0/redundant_conjuncts.groovy +++ b/regression-test/suites/performance_p0/redundant_conjuncts.groovy @@ -32,10 +32,10 @@ suite("redundant_conjuncts") { """ qt_redundant_conjuncts """ - EXPLAIN SELECT /*+SET_VAR(enable_nereids_planner=false, REWRITE_OR_TO_IN_PREDICATE_THRESHOLD=2, parallel_fragment_exec_instance_num = 1) */ v1 FROM redundant_conjuncts WHERE k1 = 1 AND k1 = 1; + EXPLAIN SELECT /*+SET_VAR(enable_nereids_planner=false, REWRITE_OR_TO_IN_PREDICATE_THRESHOLD=2, parallel_fragment_exec_instance_num = 1, enable_shared_scan = false) */ v1 FROM redundant_conjuncts WHERE k1 = 1 AND k1 = 1; """ qt_redundant_conjuncts_gnerated_by_extract_common_filter """ - EXPLAIN SELECT /*+SET_VAR(enable_nereids_planner=false, REWRITE_OR_TO_IN_PREDICATE_THRESHOLD=100, parallel_fragment_exec_instance_num = 1) */ v1 FROM redundant_conjuncts WHERE k1 = 1 OR k1 = 2; + EXPLAIN SELECT /*+SET_VAR(enable_nereids_planner=false, REWRITE_OR_TO_IN_PREDICATE_THRESHOLD=100, parallel_fragment_exec_instance_num = 1, enable_shared_scan = false) */ v1 FROM redundant_conjuncts WHERE k1 = 1 OR k1 = 2; """ }