diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h index 1f381ac4d7143e..8b61a35c2e5b2f 100644 --- a/be/src/olap/iterators.h +++ b/be/src/olap/iterators.h @@ -24,6 +24,7 @@ #include "olap/block_column_predicate.h" #include "olap/column_predicate.h" #include "olap/olap_common.h" +#include "olap/rowset/segment_v2/row_ranges.h" #include "olap/tablet_schema.h" #include "runtime/runtime_state.h" #include "vec/core/block.h" @@ -115,6 +116,7 @@ class StorageReadOptions { int32_t tablet_id = 0; // slots that cast may be eliminated in storage layer std::map target_cast_type_for_variants; + RowRanges row_ranges; }; class RowwiseIterator; diff --git a/be/src/olap/parallel_scanner_builder.cpp b/be/src/olap/parallel_scanner_builder.cpp new file mode 100644 index 00000000000000..32b163b7a1a29a --- /dev/null +++ b/be/src/olap/parallel_scanner_builder.cpp @@ -0,0 +1,201 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parallel_scanner_builder.h" + +#include "olap/rowset/beta_rowset.h" +#include "vec/exec/scan/new_olap_scanner.h" + +namespace doris { + +using namespace vectorized; + +template +Status ParallelScannerBuilder::build_scanners(std::list& scanners) { + RETURN_IF_ERROR(_load()); + if (_is_dup_mow_key) { + return _build_scanners_by_rowid(scanners); + } else { + // TODO: support to split by key range + return Status::NotSupported("split by key range not supported yet."); + } +} + +template +Status ParallelScannerBuilder::_build_scanners_by_rowid( + std::list& scanners) { + DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner); + for (auto&& [tablet, version] : _tablets) { + DCHECK(_all_rowsets.contains(tablet->tablet_id())); + auto& rowsets = _all_rowsets[tablet->tablet_id()]; + + TabletReader::ReadSource reade_source_with_delete_info; + if (!_state->skip_delete_predicate()) { + RETURN_IF_ERROR(tablet->capture_rs_readers( + {0, version}, &reade_source_with_delete_info.rs_splits, false)); + reade_source_with_delete_info.fill_delete_predicates(); + } + + TabletReader::ReadSource read_source; + + int64_t rows_collected = 0; + for (auto& rowset : rowsets) { + auto beta_rowset = std::dynamic_pointer_cast(rowset); + RowsetReaderSharedPtr reader; + RETURN_IF_ERROR(beta_rowset->create_reader(&reader)); + const auto rowset_id = beta_rowset->rowset_id(); + + DCHECK(_segment_cache_handles.contains(rowset_id)); + auto& segment_cache_handle = _segment_cache_handles[rowset_id]; + + if (beta_rowset->num_rows() == 0) { + continue; + } + + const auto& segments = segment_cache_handle.get_segments(); + int segment_start = 0; + auto split = RowSetSplits(reader->clone()); + + for (size_t i = 0; i != segments.size(); ++i) { + const auto& segment = segments[i]; + RowRanges row_ranges; + const size_t rows_of_segment = segment->num_rows(); + int64_t offset_in_segment = 0; + + // try to split large segments into RowRanges + while (offset_in_segment < rows_of_segment) { + const int64_t remaining_rows = rows_of_segment - offset_in_segment; + auto rows_need = _rows_per_scanner - rows_collected; + + // 0.9: try to avoid splitting the segments into excessively small parts. + if (rows_need >= remaining_rows * 0.9) { + rows_need = remaining_rows; + } + DCHECK_LE(rows_need, remaining_rows); + + // RowRange stands for range: [From, To), From is inclusive, To is exclusive. + row_ranges.add({offset_in_segment, + offset_in_segment + static_cast(rows_need)}); + rows_collected += rows_need; + offset_in_segment += rows_need; + + // If collected enough rows, build a new scanner + if (rows_collected >= _rows_per_scanner) { + split.segment_offsets.first = segment_start, + split.segment_offsets.second = i + 1; + split.segment_row_ranges.emplace_back(std::move(row_ranges)); + + DCHECK_EQ(split.segment_offsets.second - split.segment_offsets.first, + split.segment_row_ranges.size()); + + read_source.rs_splits.emplace_back(std::move(split)); + + scanners.emplace_back( + _build_scanner(tablet, version, _key_ranges, + {std::move(read_source.rs_splits), + reade_source_with_delete_info.delete_predicates})); + + read_source = TabletReader::ReadSource(); + split = RowSetSplits(reader->clone()); + row_ranges = RowRanges(); + + segment_start = offset_in_segment < rows_of_segment ? i : i + 1; + rows_collected = 0; + } + } + + // The non-empty `row_ranges` means there are some rows left in this segment not added into `split`. + if (!row_ranges.is_empty()) { + DCHECK_GT(rows_collected, 0); + DCHECK_EQ(row_ranges.to(), segment->num_rows()); + split.segment_row_ranges.emplace_back(std::move(row_ranges)); + } + } + + DCHECK_LE(rows_collected, _rows_per_scanner); + if (rows_collected > 0) { + split.segment_offsets.first = segment_start; + split.segment_offsets.second = segments.size(); + DCHECK_GT(split.segment_offsets.second, split.segment_offsets.first); + DCHECK_EQ(split.segment_row_ranges.size(), + split.segment_offsets.second - split.segment_offsets.first); + read_source.rs_splits.emplace_back(std::move(split)); + } + } // end `for (auto& rowset : rowsets)` + + DCHECK_LE(rows_collected, _rows_per_scanner); + if (rows_collected > 0) { + DCHECK_GT(read_source.rs_splits.size(), 0); +#ifndef NDEBUG + for (auto& split : read_source.rs_splits) { + DCHECK(split.rs_reader != nullptr); + DCHECK_LT(split.segment_offsets.first, split.segment_offsets.second); + DCHECK_EQ(split.segment_row_ranges.size(), + split.segment_offsets.second - split.segment_offsets.first); + } +#endif + scanners.emplace_back( + _build_scanner(tablet, version, _key_ranges, + {std::move(read_source.rs_splits), + reade_source_with_delete_info.delete_predicates})); + } + } + + return Status::OK(); +} + +/** + * Load rowsets of each tablet with specified version, segments of each rowset. + */ +template +Status ParallelScannerBuilder::_load() { + _total_rows = 0; + for (auto&& [tablet, version] : _tablets) { + const auto tablet_id = tablet->tablet_id(); + auto& rowsets = _all_rowsets[tablet_id]; + RETURN_IF_ERROR(tablet->capture_consistent_rowsets({0, version}, &rowsets)); + + for (auto& rowset : rowsets) { + RETURN_IF_ERROR(rowset->load()); + const auto rowset_id = rowset->rowset_id(); + auto& segment_cache_handle = _segment_cache_handles[rowset_id]; + + RETURN_IF_ERROR(SegmentLoader::instance()->load_segments( + std::dynamic_pointer_cast(rowset), &segment_cache_handle, true)); + _total_rows += rowset->num_rows(); + } + } + + _rows_per_scanner = _total_rows / _max_scanners_count; + _rows_per_scanner = std::max(_rows_per_scanner, _min_rows_per_scanner); + + return Status::OK(); +} + +template +std::shared_ptr ParallelScannerBuilder::_build_scanner( + BaseTabletSPtr tablet, int64_t version, const std::vector& key_ranges, + TabletReader::ReadSource&& read_source) { + NewOlapScanner::Params params { + _state, _scanner_profile.get(), key_ranges, std::move(tablet), + version, std::move(read_source), _limit_per_scanner, _is_preaggregation}; + return NewOlapScanner::create_shared(_parent, std::move(params)); +} + +template class ParallelScannerBuilder; + +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/parallel_scanner_builder.h b/be/src/olap/parallel_scanner_builder.h new file mode 100644 index 00000000000000..b9d659abc27e30 --- /dev/null +++ b/be/src/olap/parallel_scanner_builder.h @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "olap/rowset/segment_v2/row_ranges.h" +#include "olap/segment_loader.h" +#include "olap/tablet.h" +#include "vec/exec/scan/new_olap_scanner.h" + +namespace doris { + +namespace vectorized { +class VScanner; +} + +using TabletSPtr = std::shared_ptr; +using VScannerSPtr = std::shared_ptr; + +struct TabletWithVersion { + TabletSPtr tablet; + int64_t version; +}; + +template +class ParallelScannerBuilder { +public: + ParallelScannerBuilder(ParentType* parent, const std::vector& tablets, + const std::shared_ptr& profile, + const std::vector& key_ranges, RuntimeState* state, + int64_t limit_per_scanner, bool is_dup_mow_key, bool is_preaggregation) + : _parent(parent), + _scanner_profile(profile), + _state(state), + _limit_per_scanner(limit_per_scanner), + _is_dup_mow_key(is_dup_mow_key), + _is_preaggregation(is_preaggregation), + _tablets(tablets.cbegin(), tablets.cend()), + _key_ranges(key_ranges.cbegin(), key_ranges.cend()) {} + + Status build_scanners(std::list& scanners); + + void set_max_scanners_count(size_t count) { _max_scanners_count = count; } + + void set_min_rows_per_scanner(int64_t size) { _min_rows_per_scanner = size; } + +private: + Status _load(); + + Status _build_scanners_by_rowid(std::list& scanners); + + std::shared_ptr _build_scanner( + BaseTabletSPtr tablet, int64_t version, const std::vector& key_ranges, + TabletReader::ReadSource&& read_source); + + ParentType* _parent; + + /// Max scanners count limit to build + size_t _max_scanners_count {16}; + + /// Min rows per scanner + size_t _min_rows_per_scanner {2 * 1024 * 1024}; + + size_t _total_rows {}; + + size_t _rows_per_scanner {_min_rows_per_scanner}; + + std::map _segment_cache_handles; + + std::shared_ptr _scanner_profile; + RuntimeState* _state; + int64_t _limit_per_scanner; + bool _is_dup_mow_key; + bool _is_preaggregation; + std::vector _tablets; + std::vector _key_ranges; + std::unordered_map> _all_rowsets; +}; + +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index 18725b116d1934..568fa89d848b81 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -237,10 +237,23 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context for (int i = seg_start; i < seg_end; i++) { auto& seg_ptr = segments[i]; std::unique_ptr iter; - auto s = seg_ptr->new_iterator(_input_schema, _read_options, &iter); - if (!s.ok()) { - LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string(); - return Status::Error(s.to_string()); + Status status; + + /// If `_segment_row_ranges` is empty, the segment is not split. + if (_segment_row_ranges.empty()) { + _read_options.row_ranges.clear(); + status = seg_ptr->new_iterator(_input_schema, _read_options, &iter); + } else { + DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size()); + auto local_options = _read_options; + local_options.row_ranges = _segment_row_ranges[i - seg_start]; + status = seg_ptr->new_iterator(_input_schema, local_options, &iter); + } + + if (!status.ok()) { + LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() + << "]: " << status.to_string(); + return Status::Error(status.to_string()); } if (iter->empty()) { continue; @@ -255,6 +268,7 @@ Status BetaRowsetReader::init(RowsetReaderContext* read_context, const RowSetSpl _read_context = read_context; _read_context->rowset_id = _rowset->rowset_id(); _segment_offsets = rs_splits.segment_offsets; + _segment_row_ranges = rs_splits.segment_row_ranges; return Status::OK(); } diff --git a/be/src/olap/rowset/beta_rowset_reader.h b/be/src/olap/rowset/beta_rowset_reader.h index 45aef035950278..3834f2f10bb946 100644 --- a/be/src/olap/rowset/beta_rowset_reader.h +++ b/be/src/olap/rowset/beta_rowset_reader.h @@ -96,6 +96,7 @@ class BetaRowsetReader : public RowsetReader { DorisCallOnce _init_iter_once; std::pair _segment_offsets; + std::vector _segment_row_ranges; SchemaSPtr _input_schema; RowsetReaderContext* _read_context = nullptr; diff --git a/be/src/olap/rowset/rowset_reader.h b/be/src/olap/rowset/rowset_reader.h index 3a93888be90419..45449952431bd6 100644 --- a/be/src/olap/rowset/rowset_reader.h +++ b/be/src/olap/rowset/rowset_reader.h @@ -25,6 +25,7 @@ #include "olap/iterators.h" #include "olap/rowset/rowset_fwd.h" #include "olap/rowset/rowset_reader_context.h" +#include "olap/rowset/segment_v2/row_ranges.h" #include "vec/core/block.h" namespace doris { @@ -41,6 +42,9 @@ struct RowSetSplits { // and pipeline std::pair segment_offsets; + // RowRanges of each segment. + std::vector segment_row_ranges; + RowSetSplits(RowsetReaderSharedPtr rs_reader_) : rs_reader(rs_reader_), segment_offsets({0, 0}) {} RowSetSplits() = default; diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index f85ed5abce5982..ae9433bcde5bcf 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -381,6 +381,10 @@ Status SegmentIterator::_lazy_init() { << _opts.delete_bitmap.at(segment_id())->cardinality() << ", " << _opts.stats->rows_del_by_bitmap << " rows deleted by bitmap"; } + + if (!_opts.row_ranges.is_empty()) { + _row_bitmap &= RowRanges::ranges_to_roaring(_opts.row_ranges); + } if (_opts.read_orderby_key_reverse) { _range_iter.reset(new BackwardBitmapRangeIterator(_row_bitmap)); } else { diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 6ced139eb4c8b8..ee515007eb604c 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -488,6 +488,22 @@ class RuntimeState { _query_options.enable_hash_join_early_start_probe; } + bool enable_parallel_scan() const { + return _query_options.__isset.enable_parallel_scan && _query_options.enable_parallel_scan; + } + + int parallel_scan_max_scanners_count() const { + return _query_options.__isset.parallel_scan_max_scanners_count + ? _query_options.parallel_scan_max_scanners_count + : 0; + } + + int64_t parallel_scan_min_rows_per_scanner() const { + return _query_options.__isset.parallel_scan_min_rows_per_scanner + ? _query_options.parallel_scan_min_rows_per_scanner + : 0; + } + int repeat_max_num() const { #ifndef BE_TEST if (!_query_options.__isset.repeat_max_num) { diff --git a/be/src/vec/columns/column_dictionary.h b/be/src/vec/columns/column_dictionary.h index 06e0a352be2c1a..6de729fd2de7e0 100644 --- a/be/src/vec/columns/column_dictionary.h +++ b/be/src/vec/columns/column_dictionary.h @@ -207,7 +207,7 @@ class ColumnDictionary final : public COWHelper> { } Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* col_ptr) override { - auto* res_col = reinterpret_cast(col_ptr); + auto* res_col = assert_cast(col_ptr); StringRef strings[sel_size]; size_t length = 0; for (size_t i = 0; i != sel_size; ++i) { diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index 38fd97f2413688..b33c656a8df409 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -39,18 +39,15 @@ #include "common/object_pool.h" #include "common/status.h" #include "exec/exec_node.h" -#include "olap/rowset/rowset.h" +#include "olap/parallel_scanner_builder.h" #include "olap/rowset/rowset_reader.h" #include "olap/storage_engine.h" -#include "olap/tablet.h" #include "olap/tablet_manager.h" #include "olap/tablet_reader.h" -#include "runtime/decimalv2_value.h" #include "runtime/define_primitive_type.h" #include "runtime/query_statistics.h" #include "runtime/runtime_state.h" #include "runtime/types.h" -#include "service/backend_options.h" #include "util/time.h" #include "util/to_string.h" #include "vec/columns/column.h" @@ -518,14 +515,20 @@ Status NewOlapScanNode::_init_scanners(std::list* scanners) { std::vector> tablets_to_scan; tablets_to_scan.reserve(_scan_ranges.size()); + std::vector tablets; + for (auto&& scan_range : _scan_ranges) { auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id)); int64_t version = 0; std::from_chars(scan_range->version.data(), scan_range->version.data() + scan_range->version.size(), version); + tablets.emplace_back( + TabletWithVersion {std::dynamic_pointer_cast(tablet), version}); tablets_to_scan.emplace_back(std::move(tablet), version); } + bool enable_parallel_scan = _state->enable_parallel_scan(); + // Split tablet segment by scanner, only use in pipeline in duplicate key // 1. if tablet count lower than scanner thread num, count segment num of all tablet ready for scan // TODO: some tablet may do not have segment, may need split segment all case @@ -562,6 +565,48 @@ Status NewOlapScanNode::_init_scanners(std::list* scanners) { } } + bool has_cpu_limit = _state->query_options().__isset.resource_limit && + _state->query_options().resource_limit.__isset.cpu_limit; + + if (enable_parallel_scan && is_dup_mow_key && !has_cpu_limit && !should_run_serial() && + _push_down_agg_type == TPushAggOp::NONE) { + std::vector key_ranges; + for (auto& range : _cond_ranges) { + if (range->begin_scan_range.size() == 1 && + range->begin_scan_range.get_value(0) == NEGATIVE_INFINITY) { + continue; + } + key_ranges.emplace_back(range.get()); + } + + ParallelScannerBuilder scanner_builder( + this, tablets, _scanner_profile, key_ranges, _state, _limit_per_scanner, + is_dup_mow_key, _olap_scan_node.is_preaggregation); + + int max_scanners_count = _state->parallel_scan_max_scanners_count(); + + // If the `max_scanners_count` was not set, + // use `config::doris_scanner_thread_pool_thread_num` as the default value. + if (max_scanners_count <= 0) { + max_scanners_count = config::doris_scanner_thread_pool_thread_num; + } + + // Too small value of `min_rows_per_scanner` is meaningless. + auto min_rows_per_scanner = + std::max(1024, _state->parallel_scan_min_rows_per_scanner()); + scanner_builder.set_max_scanners_count(max_scanners_count); + scanner_builder.set_min_rows_per_scanner(min_rows_per_scanner); + + RETURN_IF_ERROR(scanner_builder.build_scanners(*scanners)); + for (auto& scanner : *scanners) { + auto* olap_scanner = assert_cast(scanner.get()); + RETURN_IF_ERROR(olap_scanner->prepare(_state, _conjuncts)); + olap_scanner->set_compound_filters(_compound_filters); + } + LOG(INFO) << "segment count: " << segment_count << ", scanners count: " << scanners->size(); + return Status::OK(); + } + auto build_new_scanner = [&](BaseTabletSPtr tablet, int64_t version, const std::vector& key_ranges, TabletReader::ReadSource read_source) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 332e0f5bfaa8f7..55e8bf2127bc3a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -224,6 +224,14 @@ public class SessionVariable implements Serializable, Writable { public static final String IGNORE_STORAGE_DATA_DISTRIBUTION = "ignore_storage_data_distribution"; + public static final String ENABLE_PARALLEL_SCAN = "enable_parallel_scan"; + + // Limit the max count of scanners to prevent generate too many scanners. + public static final String PARALLEL_SCAN_MAX_SCANNERS_COUNT = "parallel_scan_max_scanners_count"; + + // Avoid splitting small segments, each scanner should scan `parallel_scan_min_rows_per_scanner` rows. + public static final String PARALLEL_SCAN_MIN_ROWS_PER_SCANNER = "parallel_scan_min_rows_per_scanner"; + public static final String ENABLE_LOCAL_SHUFFLE = "enable_local_shuffle"; public static final String ENABLE_AGG_STATE = "enable_agg_state"; @@ -784,7 +792,19 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_SHARED_SCAN, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, needForward = true) - private boolean enableSharedScan = false; + private boolean enableSharedScan = true; + + @VariableMgr.VarAttr(name = ENABLE_PARALLEL_SCAN, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, + needForward = true) + private boolean enableParallelScan = true; + + @VariableMgr.VarAttr(name = PARALLEL_SCAN_MAX_SCANNERS_COUNT, fuzzy = false, + varType = VariableAnnotation.EXPERIMENTAL, needForward = true) + private int parallelScanMaxScannersCount = 48; + + @VariableMgr.VarAttr(name = PARALLEL_SCAN_MIN_ROWS_PER_SCANNER, fuzzy = false, + varType = VariableAnnotation.EXPERIMENTAL, needForward = true) + private long parallelScanMinRowsPerScanner = 65536; // 2MB @VariableMgr.VarAttr(name = IGNORE_STORAGE_DATA_DISTRIBUTION, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, needForward = true) @@ -2746,6 +2766,10 @@ public TQueryOptions toThrift() { tResult.setInvertedIndexSkipThreshold(invertedIndexSkipThreshold); + tResult.setEnableParallelScan(enableParallelScan); + tResult.setParallelScanMaxScannersCount(parallelScanMaxScannersCount); + tResult.setParallelScanMinRowsPerScanner(parallelScanMinRowsPerScanner); + return tResult; } @@ -3059,6 +3083,14 @@ public boolean getEnableSharedScan() { return enableSharedScan; } + public void setEnableSharedScan(boolean value) { + enableSharedScan = value; + } + + public boolean getEnableParallelScan() { + return enableParallelScan; + } + public boolean getEnablePipelineXEngine() { return enablePipelineXEngine; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java index 6b976e1f273d8d..c5275c20f0d6f7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java @@ -169,6 +169,7 @@ public void sqlAggWithColocateTable() throws Exception { @Test public void checkColocatePlanFragment() throws Exception { + connectContext.getSessionVariable().setEnableSharedScan(false); String sql = "select /*+ SET_VAR(enable_nereids_planner=false) */ a.k1 from db1.test_colocate a, db1.test_colocate b where a.k1=b.k1 and a.k2=b.k2 group by a.k1;"; StmtExecutor executor = getSqlStmtExecutor(sql); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index b1ce326c826db0..e07797b593d06c 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -263,6 +263,12 @@ struct TQueryOptions { 93: optional i32 inverted_index_max_expansions = 50; 94: optional i32 inverted_index_skip_threshold = 50; + + 95: optional bool enable_parallel_scan = false; + + 96: optional i32 parallel_scan_max_scanners_count = 0; + + 97: optional i64 parallel_scan_min_rows_per_scanner = 0; } diff --git a/regression-test/suites/performance_p0/redundant_conjuncts.groovy b/regression-test/suites/performance_p0/redundant_conjuncts.groovy index 9d81b5f91ba5b9..4aec6275dc356c 100644 --- a/regression-test/suites/performance_p0/redundant_conjuncts.groovy +++ b/regression-test/suites/performance_p0/redundant_conjuncts.groovy @@ -32,10 +32,10 @@ suite("redundant_conjuncts") { """ qt_redundant_conjuncts """ - EXPLAIN SELECT /*+SET_VAR(enable_nereids_planner=false, REWRITE_OR_TO_IN_PREDICATE_THRESHOLD=2, parallel_fragment_exec_instance_num = 1) */ v1 FROM redundant_conjuncts WHERE k1 = 1 AND k1 = 1; + EXPLAIN SELECT /*+SET_VAR(enable_nereids_planner=false, REWRITE_OR_TO_IN_PREDICATE_THRESHOLD=2, parallel_fragment_exec_instance_num = 1, enable_shared_scan = false) */ v1 FROM redundant_conjuncts WHERE k1 = 1 AND k1 = 1; """ qt_redundant_conjuncts_gnerated_by_extract_common_filter """ - EXPLAIN SELECT /*+SET_VAR(enable_nereids_planner=false, REWRITE_OR_TO_IN_PREDICATE_THRESHOLD=100, parallel_fragment_exec_instance_num = 1) */ v1 FROM redundant_conjuncts WHERE k1 = 1 OR k1 = 2; + EXPLAIN SELECT /*+SET_VAR(enable_nereids_planner=false, REWRITE_OR_TO_IN_PREDICATE_THRESHOLD=100, parallel_fragment_exec_instance_num = 1, enable_shared_scan = false) */ v1 FROM redundant_conjuncts WHERE k1 = 1 OR k1 = 2; """ }