From 08f410e72e07bc18fe55dd0dd376ff3a42c24060 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Mon, 9 Jun 2025 21:29:09 +0800 Subject: [PATCH 1/3] [opt](rowset meta) truncate segments key bounds if too large to avoid `RowsetMetaCloudPB` exceeds fdb's 100KB limits (#45287) In cloud mode, `RowsetMetaCloudPB` will be stored as a value in fdb. Segment key bounds is a member of `RowsetMetaCloudPB`. When the key columns has long data, segment key bounds may be very large, causing the size of `RowsetMetaCloudPB` be larger than the fdb 100KB limit, and will cause load txn fail when put `RowsetMetaCloudPB` KV in fdb. This PR add a config `segments_key_bounds_truncation_threshold` to limit the max length of segment key bounds to avoid the problem. Segment key bounds which are larger than the value of this config will be truncated. The config will take effect when rowset is generated(load/schema change/compaction/snapshot). **Note that as long as the config `segments_key_bounds_truncation_threshold` has ever been turned on, cluster downgrade and backup restore are not supported any more.** --- be/src/cloud/pb_convert.cpp | 4 + be/src/common/config.cpp | 10 + be/src/common/config.h | 6 + be/src/olap/base_tablet.cpp | 14 +- be/src/olap/base_tablet.h | 1 - be/src/olap/compaction.cpp | 17 +- be/src/olap/rowset/beta_rowset_writer.cpp | 7 +- be/src/olap/rowset/beta_rowset_writer.h | 2 + be/src/olap/rowset/rowset.h | 4 + be/src/olap/rowset/rowset_meta.cpp | 31 + be/src/olap/rowset/rowset_meta.h | 17 +- be/src/olap/task/index_builder.cpp | 2 + be/src/util/key_util.cpp | 32 + be/src/util/key_util.h | 11 +- be/src/util/slice.cpp | 19 + be/src/util/slice.h | 7 + be/src/vec/olap/block_reader.cpp | 17 +- be/src/vec/olap/block_reader.h | 4 +- be/test/olap/ordered_data_compaction_test.cpp | 1 + .../segments_key_bounds_truncation_test.cpp | 788 ++++++++++++++++++ gensrc/proto/olap_file.proto | 10 +- ...t_key_bounds_truncation_read_scenarios.out | 57 ++ ..._key_bounds_truncation_write_scenarios.out | 4 + .../apache/doris/regression/util/Http.groovy | 8 +- .../test_key_bounds_truncation_basic.groovy | 122 +++ ...ey_bounds_truncation_read_scenarios.groovy | 100 +++ ...y_bounds_truncation_write_scenarios.groovy | 284 +++++++ 27 files changed, 1542 insertions(+), 37 deletions(-) create mode 100644 be/src/util/key_util.cpp create mode 100644 be/test/olap/segments_key_bounds_truncation_test.cpp create mode 100644 regression-test/data/unique_with_mow_p0/key_bounds/test_key_bounds_truncation_read_scenarios.out create mode 100644 regression-test/data/unique_with_mow_p0/key_bounds/test_key_bounds_truncation_write_scenarios.out create mode 100644 regression-test/suites/unique_with_mow_p0/key_bounds/test_key_bounds_truncation_basic.groovy create mode 100644 regression-test/suites/unique_with_mow_p0/key_bounds/test_key_bounds_truncation_read_scenarios.groovy create mode 100644 regression-test/suites/unique_with_mow_p0/key_bounds/test_key_bounds_truncation_write_scenarios.groovy diff --git a/be/src/cloud/pb_convert.cpp b/be/src/cloud/pb_convert.cpp index ec483ba682ce3a..0d24192a758ae7 100644 --- a/be/src/cloud/pb_convert.cpp +++ b/be/src/cloud/pb_convert.cpp @@ -75,6 +75,7 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const RowsetMetaPB& in) } out->set_txn_expiration(in.txn_expiration()); out->set_segments_overlap_pb(in.segments_overlap_pb()); + out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated()); out->mutable_segments_file_size()->CopyFrom(in.segments_file_size()); out->set_index_id(in.index_id()); if (in.has_schema_version()) { @@ -129,6 +130,7 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) { } out->set_txn_expiration(in.txn_expiration()); out->set_segments_overlap_pb(in.segments_overlap_pb()); + out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated()); out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size()); out->set_index_id(in.index_id()); if (in.has_schema_version()) { @@ -230,6 +232,7 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in, } out->set_txn_expiration(in.txn_expiration()); out->set_segments_overlap_pb(in.segments_overlap_pb()); + out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated()); out->mutable_segments_file_size()->CopyFrom(in.segments_file_size()); out->set_index_id(in.index_id()); if (in.has_schema_version()) { @@ -285,6 +288,7 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in, } out->set_txn_expiration(in.txn_expiration()); out->set_segments_overlap_pb(in.segments_overlap_pb()); + out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated()); out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size()); out->set_index_id(in.index_id()); if (in.has_schema_version()) { diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 61758a0b4fe2a6..2fc927dcbfde0e 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1495,6 +1495,12 @@ DEFINE_mBool(enable_compaction_pause_on_high_memory, "true"); DEFINE_mBool(enable_calc_delete_bitmap_between_segments_concurrently, "false"); +// the max length of segments key bounds, in bytes +// ATTENTION: as long as this conf has ever been enabled, cluster downgrade and backup recovery will no longer be supported. +DEFINE_mInt32(segments_key_bounds_truncation_threshold, "-1"); +// ATTENTION: for test only, use random segments key bounds truncation threshold every time +DEFINE_mBool(random_segments_key_bounds_truncation, "false"); + // clang-format off #ifdef BE_TEST // test s3 @@ -1943,6 +1949,10 @@ Status set_fuzzy_configs() { fuzzy_field_and_value["string_overflow_size"] = ((distribution(*generator) % 2) == 0) ? "10" : "4294967295"; + std::uniform_int_distribution distribution2(-2, 10); + fuzzy_field_and_value["segments_key_bounds_truncation_threshold"] = + std::to_string(distribution2(*generator)); + fmt::memory_buffer buf; for (auto& it : fuzzy_field_and_value) { const auto& field = it.first; diff --git a/be/src/common/config.h b/be/src/common/config.h index a7e16c53d5e992..c342b260288f61 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1573,6 +1573,12 @@ DECLARE_mBool(enable_compaction_pause_on_high_memory); DECLARE_mBool(enable_calc_delete_bitmap_between_segments_concurrently); +// the max length of segments key bounds, in bytes +// ATTENTION: as long as this conf has ever been enabled, cluster downgrade and backup recovery will no longer be supported. +DECLARE_mInt32(segments_key_bounds_truncation_threshold); +// ATTENTION: for test only, use random segments key bounds truncation threshold every time +DECLARE_mBool(random_segments_key_bounds_truncation); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 54fcb0edd1f3a8..5b0fe1b881ee45 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -46,6 +46,7 @@ #include "util/crc32c.h" #include "util/debug_points.h" #include "util/doris_metrics.h" +#include "util/key_util.h" #include "vec/common/assert_cast.h" #include "vec/common/schema_util.h" #include "vec/data_types/data_type_factory.hpp" @@ -476,17 +477,18 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest RowLocation loc; for (size_t i = 0; i < specified_rowsets.size(); i++) { - auto& rs = specified_rowsets[i]; - auto& segments_key_bounds = rs->rowset_meta()->get_segments_key_bounds(); - int num_segments = rs->num_segments(); + const auto& rs = specified_rowsets[i]; + std::vector segments_key_bounds; + rs->rowset_meta()->get_segments_key_bounds(&segments_key_bounds); + int num_segments = static_cast(rs->num_segments()); DCHECK_EQ(segments_key_bounds.size(), num_segments); std::vector picked_segments; - for (int i = num_segments - 1; i >= 0; i--) { + for (int j = num_segments - 1; j >= 0; j--) { // If mow table has cluster keys, the key bounds is short keys, not primary keys // use PrimaryKeyIndexMetaPB in primary key index? if (schema->cluster_key_idxes().empty()) { - if (key_without_seq.compare(segments_key_bounds[i].max_key()) > 0 || - key_without_seq.compare(segments_key_bounds[i].min_key()) < 0) { + if (key_is_not_in_segment(key_without_seq, segments_key_bounds[j], + rs->rowset_meta()->is_segments_key_bounds_truncated())) { continue; } } diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 4f4b41f11abbd8..4e9ddaac1b5e0d 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -145,7 +145,6 @@ class BaseTablet { RowsetSharedPtr rowset, const TupleDescriptor* desc, OlapReaderStatistics& stats, std::string& values, bool write_to_cache = false); - // Lookup the row location of `encoded_key`, the function sets `row_location` on success. // NOTE: the method only works in unique key model with primary key index, you will got a // not supported error in other data model. diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index e5012a36eb4dee..f3f19791943115 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -84,7 +84,8 @@ using namespace ErrorCode; namespace { -bool is_rowset_tidy(std::string& pre_max_key, const RowsetSharedPtr& rhs) { +bool is_rowset_tidy(std::string& pre_max_key, bool& pre_rs_key_bounds_truncated, + const RowsetSharedPtr& rhs) { size_t min_tidy_size = config::ordered_data_compaction_min_segment_size; if (rhs->num_segments() == 0) { return true; @@ -107,11 +108,13 @@ bool is_rowset_tidy(std::string& pre_max_key, const RowsetSharedPtr& rhs) { if (!ret) { return false; } - if (min_key <= pre_max_key) { + bool cur_rs_key_bounds_truncated {rhs->is_segments_key_bounds_truncated()}; + if (!Slice::lhs_is_strictly_less_than_rhs(Slice {pre_max_key}, pre_rs_key_bounds_truncated, + Slice {min_key}, cur_rs_key_bounds_truncated)) { return false; } CHECK(rhs->last_key(&pre_max_key)); - + pre_rs_key_bounds_truncated = cur_rs_key_bounds_truncated; return true; } @@ -292,12 +295,13 @@ Status CompactionMixin::do_compact_ordered_rowsets() { << ", output_version=" << _output_version; // link data to new rowset auto seg_id = 0; + bool segments_key_bounds_truncated {false}; std::vector segment_key_bounds; for (auto rowset : _input_rowsets) { RETURN_IF_ERROR(rowset->link_files_to(tablet()->tablet_path(), _output_rs_writer->rowset_id(), seg_id)); seg_id += rowset->num_segments(); - + segments_key_bounds_truncated |= rowset->is_segments_key_bounds_truncated(); std::vector key_bounds; RETURN_IF_ERROR(rowset->get_segments_key_bounds(&key_bounds)); segment_key_bounds.insert(segment_key_bounds.end(), key_bounds.begin(), key_bounds.end()); @@ -312,7 +316,7 @@ Status CompactionMixin::do_compact_ordered_rowsets() { rowset_meta->set_num_segments(_input_num_segments); rowset_meta->set_segments_overlap(NONOVERLAPPING); rowset_meta->set_rowset_state(VISIBLE); - + rowset_meta->set_segments_key_bounds_truncated(segments_key_bounds_truncated); rowset_meta->set_segments_key_bounds(segment_key_bounds); _output_rowset = _output_rs_writer->manual_build(rowset_meta); return Status::OK(); @@ -392,8 +396,9 @@ bool CompactionMixin::handle_ordered_data_compaction() { // files to handle compaction auto input_size = _input_rowsets.size(); std::string pre_max_key; + bool pre_rs_key_bounds_truncated {false}; for (auto i = 0; i < input_size; ++i) { - if (!is_rowset_tidy(pre_max_key, _input_rowsets[i])) { + if (!is_rowset_tidy(pre_max_key, pre_rs_key_bounds_truncated, _input_rowsets[i])) { if (i <= input_size / 2) { return false; } else { diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index eb8a6a3bfb6be7..6f14f45138d32d 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -90,7 +90,8 @@ void build_rowset_meta_with_spec_field(RowsetMeta& rowset_meta, rowset_meta.set_num_segments(spec_rowset_meta.num_segments()); rowset_meta.set_segments_overlap(spec_rowset_meta.segments_overlap()); rowset_meta.set_rowset_state(spec_rowset_meta.rowset_state()); - + rowset_meta.set_segments_key_bounds_truncated( + spec_rowset_meta.is_segments_key_bounds_truncated()); std::vector segments_key_bounds; spec_rowset_meta.get_segments_key_bounds(&segments_key_bounds); rowset_meta.set_segments_key_bounds(segments_key_bounds); @@ -679,6 +680,7 @@ Status BaseBetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) { _num_segment += static_cast(rowset->num_segments()); // append key_bounds to current rowset RETURN_IF_ERROR(rowset->get_segments_key_bounds(&_segments_encoded_key_bounds)); + _segments_key_bounds_truncated = rowset->rowset_meta()->is_segments_key_bounds_truncated(); // TODO update zonemap if (rowset->rowset_meta()->has_delete_predicate()) { @@ -888,6 +890,9 @@ Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool ch for (auto& key_bound : _segments_encoded_key_bounds) { segments_encoded_key_bounds.push_back(key_bound); } + if (_segments_key_bounds_truncated.has_value()) { + rowset_meta->set_segments_key_bounds_truncated(_segments_key_bounds_truncated.value()); + } // segment key bounds are empty in old version(before version 1.2.x). So we should not modify // the overlap property when key bounds are empty. if (!segments_encoded_key_bounds.empty() && diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 2ae999eae202fe..19e3c4da31db41 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -236,8 +236,10 @@ class BaseBetaRowsetWriter : public RowsetWriter { // record rows number of every segment already written, using for rowid // conversion when compaction in unique key with MoW model std::vector _segment_num_rows; + // for unique key table with merge-on-write std::vector _segments_encoded_key_bounds; + std::optional _segments_key_bounds_truncated; // counters and statistics maintained during add_rowset std::atomic _num_rows_written; diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index db6872875a56d5..3d2a3d965f0676 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -294,6 +294,10 @@ class Rowset : public std::enable_shared_from_this, public MetadataAdder return true; } + bool is_segments_key_bounds_truncated() const { + return _rowset_meta->is_segments_key_bounds_truncated(); + } + bool check_rowset_segment(); [[nodiscard]] virtual Status add_to_binlog() { return Status::OK(); } diff --git a/be/src/olap/rowset/rowset_meta.cpp b/be/src/olap/rowset/rowset_meta.cpp index 6bed5e800ede4d..c9851cdc5fc64b 100644 --- a/be/src/olap/rowset/rowset_meta.cpp +++ b/be/src/olap/rowset/rowset_meta.cpp @@ -20,6 +20,7 @@ #include #include +#include #include "common/logging.h" #include "google/protobuf/util/message_differencer.h" @@ -220,6 +221,34 @@ int64_t RowsetMeta::segment_file_size(int seg_id) { : -1; } +void RowsetMeta::set_segments_key_bounds(const std::vector& segments_key_bounds) { + for (const KeyBoundsPB& key_bounds : segments_key_bounds) { + KeyBoundsPB* new_key_bounds = _rowset_meta_pb.add_segments_key_bounds(); + *new_key_bounds = key_bounds; + } + + int32_t truncation_threshold = config::segments_key_bounds_truncation_threshold; + if (config::random_segments_key_bounds_truncation) { + static thread_local std::mt19937 generator(std::random_device {}()); + std::uniform_int_distribution distribution(-10, 40); + truncation_threshold = distribution(generator); + } + bool really_do_truncation {false}; + if (truncation_threshold > 0) { + for (auto& segment_key_bounds : *_rowset_meta_pb.mutable_segments_key_bounds()) { + if (segment_key_bounds.min_key().size() > truncation_threshold) { + really_do_truncation = true; + segment_key_bounds.mutable_min_key()->resize(truncation_threshold); + } + if (segment_key_bounds.max_key().size() > truncation_threshold) { + really_do_truncation = true; + segment_key_bounds.mutable_max_key()->resize(truncation_threshold); + } + } + } + set_segments_key_bounds_truncated(really_do_truncation || is_segments_key_bounds_truncated()); +} + void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) { set_num_segments(num_segments() + other.num_segments()); set_num_rows(num_rows() + other.num_rows()); @@ -227,6 +256,8 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) { set_total_disk_size(total_disk_size() + other.total_disk_size()); set_index_disk_size(index_disk_size() + other.index_disk_size()); set_total_disk_size(data_disk_size() + index_disk_size()); + set_segments_key_bounds_truncated(is_segments_key_bounds_truncated() || + other.is_segments_key_bounds_truncated()); for (auto&& key_bound : other.get_segments_key_bounds()) { add_segment_key_bounds(key_bound); } diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index 40e55e074528cd..4421b6dda1fb4e 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -24,6 +24,7 @@ #include #include +#include "common/config.h" #include "io/fs/file_system.h" #include "olap/metadata_adder.h" #include "olap/olap_common.h" @@ -299,6 +300,15 @@ class RowsetMeta : public MetadataAdder { auto& get_segments_key_bounds() const { return _rowset_meta_pb.segments_key_bounds(); } + bool is_segments_key_bounds_truncated() const { + return _rowset_meta_pb.has_segments_key_bounds_truncated() && + _rowset_meta_pb.segments_key_bounds_truncated(); + } + + void set_segments_key_bounds_truncated(bool truncated) { + _rowset_meta_pb.set_segments_key_bounds_truncated(truncated); + } + bool get_first_segment_key_bound(KeyBoundsPB* key_bounds) { // for compatibility, old version has not segment key bounds if (_rowset_meta_pb.segments_key_bounds_size() == 0) { @@ -316,12 +326,7 @@ class RowsetMeta : public MetadataAdder { return true; } - void set_segments_key_bounds(const std::vector& segments_key_bounds) { - for (const KeyBoundsPB& key_bounds : segments_key_bounds) { - KeyBoundsPB* new_key_bounds = _rowset_meta_pb.add_segments_key_bounds(); - *new_key_bounds = key_bounds; - } - } + void set_segments_key_bounds(const std::vector& segments_key_bounds); void add_segment_key_bounds(KeyBoundsPB segments_key_bounds) { *_rowset_meta_pb.add_segments_key_bounds() = std::move(segments_key_bounds); diff --git a/be/src/olap/task/index_builder.cpp b/be/src/olap/task/index_builder.cpp index a8851a46e824b5..9f72056af8d1c9 100644 --- a/be/src/olap/task/index_builder.cpp +++ b/be/src/olap/task/index_builder.cpp @@ -258,6 +258,8 @@ Status IndexBuilder::update_inverted_index_info() { rowset_meta->set_rowset_state(input_rowset_meta->rowset_state()); std::vector key_bounds; RETURN_IF_ERROR(input_rowset->get_segments_key_bounds(&key_bounds)); + rowset_meta->set_segments_key_bounds_truncated( + input_rowset_meta->is_segments_key_bounds_truncated()); rowset_meta->set_segments_key_bounds(key_bounds); auto output_rowset = output_rs_writer->manual_build(rowset_meta); if (input_rowset_meta->has_delete_predicate()) { diff --git a/be/src/util/key_util.cpp b/be/src/util/key_util.cpp new file mode 100644 index 00000000000000..b49639d7075a88 --- /dev/null +++ b/be/src/util/key_util.cpp @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/key_util.h" + +namespace doris { + +bool key_is_not_in_segment(Slice key, const KeyBoundsPB& segment_key_bounds, + bool is_segments_key_bounds_truncated) { + Slice maybe_truncated_min_key {segment_key_bounds.min_key()}; + Slice maybe_truncated_max_key {segment_key_bounds.max_key()}; + bool res1 = Slice::lhs_is_strictly_less_than_rhs(key, false, maybe_truncated_min_key, + is_segments_key_bounds_truncated); + bool res2 = Slice::lhs_is_strictly_less_than_rhs(maybe_truncated_max_key, + is_segments_key_bounds_truncated, key, false); + return res1 || res2; +} +} // namespace doris \ No newline at end of file diff --git a/be/src/util/key_util.h b/be/src/util/key_util.h index fd57566fa4f8b2..01094905cf5d01 100644 --- a/be/src/util/key_util.h +++ b/be/src/util/key_util.h @@ -17,16 +17,12 @@ #pragma once +#include #include #include -#include #include -#include -#include "common/status.h" -#include "util/debug_util.h" -#include "util/faststring.h" #include "util/slice.h" namespace doris { @@ -111,4 +107,9 @@ void encode_key(std::string* buf, const RowType& row, size_t num_keys) { } } +// we can only know if a key is excluded from the segment +// based on strictly order compare result with segments key bounds +bool key_is_not_in_segment(Slice key, const KeyBoundsPB& segment_key_bounds, + bool is_segments_key_bounds_truncated); + } // namespace doris diff --git a/be/src/util/slice.cpp b/be/src/util/slice.cpp index a681b5bba7dfc6..9c15f901f25270 100644 --- a/be/src/util/slice.cpp +++ b/be/src/util/slice.cpp @@ -27,4 +27,23 @@ Slice::Slice(const faststring& s) data((char*)(s.data())), size(s.size()) {} +bool Slice::lhs_is_strictly_less_than_rhs(Slice X, bool X_is_truncated, Slice Y, + [[maybe_unused]] bool Y_is_truncated) { + // suppose X is a prefix of X', Y is a prefix of Y' + if (!X_is_truncated) { + // (X_is_truncated == false) means X' == X + // we have Y <= Y', + // so X < Y => X < Y', + // so X' = X < Y' + return X.compare(Y) < 0; + } + + // let m = min(|X|,|Y|), + // we have Y[1..m] = Y'[1..m] <= Y' + // so X'[1..m] < Y[1..m] => X' < Y' + std::size_t m {std::min(X.get_size(), Y.get_size())}; + Slice Y_to_cmp {Y.get_data(), m}; + return X.compare(Y_to_cmp) < 0; +} + } // namespace doris diff --git a/be/src/util/slice.h b/be/src/util/slice.h index fd6bcf0adfb510..e46546a03f3c20 100644 --- a/be/src/util/slice.h +++ b/be/src/util/slice.h @@ -273,6 +273,13 @@ struct Slice { } return buf; } + + // X is (maybe) a truncated prefix of string X' + // Y is (maybe) a truncated prefix of string Y' + // return true only if we can determine that X' is strictly less than Y' + // based on these maybe truncated prefixes + static bool lhs_is_strictly_less_than_rhs(Slice X, bool X_is_truncated, Slice Y, + bool Y_is_truncated); }; inline std::ostream& operator<<(std::ostream& os, const Slice& slice) { diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index 07befd47d88781..5374a2c2d73ee5 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -72,8 +72,9 @@ Status BlockReader::next_block_with_aggregation(Block* block, bool* eof) { return res; } -bool BlockReader::_rowsets_mono_asc_disjoint(const ReaderParams& read_params) { - std::string cur_rs_last_key; +bool BlockReader::_rowsets_not_mono_asc_disjoint(const ReaderParams& read_params) { + std::string pre_rs_last_key; + bool pre_rs_key_bounds_truncated {false}; const std::vector& rs_splits = read_params.rs_splits; for (const auto& rs_split : rs_splits) { if (rs_split.rs_reader->rowset()->num_rows() == 0) { @@ -87,13 +88,17 @@ bool BlockReader::_rowsets_mono_asc_disjoint(const ReaderParams& read_params) { if (!has_first_key) { return true; } - if (rs_first_key <= cur_rs_last_key) { + bool cur_rs_key_bounds_truncated { + rs_split.rs_reader->rowset()->is_segments_key_bounds_truncated()}; + if (!Slice::lhs_is_strictly_less_than_rhs(Slice {pre_rs_last_key}, + pre_rs_key_bounds_truncated, Slice {rs_first_key}, + cur_rs_key_bounds_truncated)) { return true; } - bool has_last_key = rs_split.rs_reader->rowset()->last_key(&cur_rs_last_key); + bool has_last_key = rs_split.rs_reader->rowset()->last_key(&pre_rs_last_key); + pre_rs_key_bounds_truncated = cur_rs_key_bounds_truncated; CHECK(has_last_key); } - return false; } @@ -110,7 +115,7 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params) { // check if rowsets are noneoverlapping { SCOPED_RAW_TIMER(&_stats.block_reader_vcollect_iter_init_timer_ns); - _is_rowsets_overlapping = _rowsets_mono_asc_disjoint(read_params); + _is_rowsets_overlapping = _rowsets_not_mono_asc_disjoint(read_params); _vcollect_iter.init(this, _is_rowsets_overlapping, read_params.read_orderby_key, read_params.read_orderby_key_reverse); } diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h index f33fe7431092f7..b665150cbcb0a1 100644 --- a/be/src/vec/olap/block_reader.h +++ b/be/src/vec/olap/block_reader.h @@ -86,8 +86,8 @@ class BlockReader final : public TabletReader { bool _get_next_row_same(); - // return true if keys of rowsets are mono ascending and disjoint - bool _rowsets_mono_asc_disjoint(const ReaderParams& read_params); + // return false if keys of rowsets are mono ascending and disjoint + bool _rowsets_not_mono_asc_disjoint(const ReaderParams& read_params); VCollectIterator _vcollect_iter; IteratorRowRef _next_row {{}, -1, false}; diff --git a/be/test/olap/ordered_data_compaction_test.cpp b/be/test/olap/ordered_data_compaction_test.cpp index 934dfbef3ea8c9..dc53b57ed9262a 100644 --- a/be/test/olap/ordered_data_compaction_test.cpp +++ b/be/test/olap/ordered_data_compaction_test.cpp @@ -107,6 +107,7 @@ class OrderedDataCompactionTest : public ::testing::Test { ExecEnv::GetInstance()->set_storage_engine(std::move(engine)); config::enable_ordered_data_compaction = true; config::ordered_data_compaction_min_segment_size = 10; + config::segments_key_bounds_truncation_threshold = -1; } void TearDown() override { EXPECT_TRUE(io::global_local_filesystem()->delete_directory(absolute_dir).ok()); diff --git a/be/test/olap/segments_key_bounds_truncation_test.cpp b/be/test/olap/segments_key_bounds_truncation_test.cpp new file mode 100644 index 00000000000000..8530d9d52ea87e --- /dev/null +++ b/be/test/olap/segments_key_bounds_truncation_test.cpp @@ -0,0 +1,788 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "common/config.h" +#include "io/fs/local_file_system.h" +#include "olap/cumulative_compaction.h" +#include "olap/rowset/rowset_factory.h" +#include "olap/rowset/segment_v2/segment.h" +#include "olap/rowset/segment_v2/segment_writer.h" +#include "olap/storage_engine.h" +#include "olap/tablet_meta.h" +#include "olap/tablet_reader.h" +#include "olap/tablet_schema.h" +#include "runtime/exec_env.h" +#include "util/key_util.h" +#include "vec/olap/block_reader.h" + +namespace doris { +static std::string kSegmentDir = "./ut_dir/segments_key_bounds_truncation_test"; + +class SegmentsKeyBoundsTruncationTest : public testing::Test { +private: + StorageEngine* engine_ref = nullptr; + string absolute_dir; + std::unique_ptr data_dir; + int cur_version {2}; + +public: + void SetUp() override { + auto st = io::global_local_filesystem()->delete_directory(kSegmentDir); + ASSERT_TRUE(st.ok()) << st; + st = io::global_local_filesystem()->create_directory(kSegmentDir); + ASSERT_TRUE(st.ok()) << st; + doris::EngineOptions options; + auto engine = std::make_unique(options); + engine_ref = engine.get(); + data_dir = std::make_unique(*engine_ref, kSegmentDir); + ASSERT_TRUE(data_dir->update_capacity().ok()); + ExecEnv::GetInstance()->set_storage_engine(std::move(engine)); + } + + void TearDown() override { + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(kSegmentDir).ok()); + engine_ref = nullptr; + ExecEnv::GetInstance()->set_storage_engine(nullptr); + } + + void disable_segments_key_bounds_truncation() { + config::segments_key_bounds_truncation_threshold = -1; + } + + TabletSchemaSPtr create_schema(int varchar_length) { + TabletSchemaSPtr tablet_schema = std::make_shared(); + TabletSchemaPB tablet_schema_pb; + tablet_schema_pb.set_keys_type(DUP_KEYS); + tablet_schema_pb.set_num_short_key_columns(1); + tablet_schema_pb.set_num_rows_per_row_block(1024); + tablet_schema_pb.set_compress_kind(COMPRESS_NONE); + tablet_schema_pb.set_next_column_unique_id(4); + + ColumnPB* column_1 = tablet_schema_pb.add_column(); + column_1->set_unique_id(1); + column_1->set_name("k1"); + column_1->set_type("VARCHAR"); + column_1->set_is_key(true); + column_1->set_length(varchar_length); + column_1->set_index_length(36); + column_1->set_is_nullable(false); + column_1->set_is_bf_column(false); + + ColumnPB* column_2 = tablet_schema_pb.add_column(); + column_2->set_unique_id(2); + column_2->set_name("c1"); + column_2->set_type("INT"); + column_2->set_length(4); + column_2->set_index_length(4); + column_2->set_is_nullable(true); + column_2->set_is_key(false); + column_2->set_is_nullable(true); + column_2->set_is_bf_column(false); + + tablet_schema->init_from_pb(tablet_schema_pb); + return tablet_schema; + } + + TabletSharedPtr create_tablet(const TabletSchema& tablet_schema, + bool enable_unique_key_merge_on_write) { + std::vector cols; + std::unordered_map col_ordinal_to_unique_id; + for (auto i = 0; i < tablet_schema.num_columns(); i++) { + const TabletColumn& column = tablet_schema.column(i); + TColumn col; + col.column_type.type = TPrimitiveType::INT; + col.__set_column_name(column.name()); + col.__set_is_key(column.is_key()); + cols.push_back(col); + col_ordinal_to_unique_id[i] = column.unique_id(); + } + + TTabletSchema t_tablet_schema; + t_tablet_schema.__set_short_key_column_count(tablet_schema.num_short_key_columns()); + t_tablet_schema.__set_schema_hash(3333); + if (tablet_schema.keys_type() == UNIQUE_KEYS) { + t_tablet_schema.__set_keys_type(TKeysType::UNIQUE_KEYS); + } else if (tablet_schema.keys_type() == DUP_KEYS) { + t_tablet_schema.__set_keys_type(TKeysType::DUP_KEYS); + } else if (tablet_schema.keys_type() == AGG_KEYS) { + t_tablet_schema.__set_keys_type(TKeysType::AGG_KEYS); + } + t_tablet_schema.__set_storage_type(TStorageType::COLUMN); + t_tablet_schema.__set_columns(cols); + TabletMetaSharedPtr tablet_meta {std::make_shared( + 2, 2, 2, 2, 2, 2, t_tablet_schema, 2, col_ordinal_to_unique_id, UniqueId(1, 2), + TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F, 0, + enable_unique_key_merge_on_write)}; + + TabletSharedPtr tablet {std::make_shared(*engine_ref, tablet_meta, data_dir.get())}; + EXPECT_TRUE(tablet->init().ok()); + return tablet; + } + + RowsetWriterContext create_rowset_writer_context(TabletSchemaSPtr tablet_schema, + const SegmentsOverlapPB& overlap, + uint32_t max_rows_per_segment, + Version version) { + RowsetWriterContext rowset_writer_context; + rowset_writer_context.rowset_id = engine_ref->next_rowset_id(); + rowset_writer_context.rowset_type = BETA_ROWSET; + rowset_writer_context.rowset_state = VISIBLE; + rowset_writer_context.tablet_schema = tablet_schema; + rowset_writer_context.tablet_path = kSegmentDir; + rowset_writer_context.version = version; + rowset_writer_context.segments_overlap = overlap; + rowset_writer_context.max_rows_per_segment = max_rows_per_segment; + return rowset_writer_context; + } + + void create_and_init_rowset_reader(Rowset* rowset, RowsetReaderContext& context, + RowsetReaderSharedPtr* result) { + auto s = rowset->create_reader(result); + EXPECT_TRUE(s.ok()); + EXPECT_TRUE(*result != nullptr); + + s = (*result)->init(&context); + EXPECT_TRUE(s.ok()); + } + + std::vector generate_blocks( + TabletSchemaSPtr tablet_schema, const std::vector>& data) { + std::vector ret; + int const_value = 999; + for (const auto& segment_rows : data) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (const auto& row : segment_rows) { + columns[0]->insert_data(row.data(), row.size()); + columns[1]->insert_data(reinterpret_cast(&const_value), + sizeof(const_value)); + } + ret.emplace_back(std::move(block)); + } + return ret; + } + + std::vector> get_expected_key_bounds( + const std::vector>& data) { + std::vector> ret; + for (const auto& rows : data) { + auto& cur = ret.emplace_back(); + auto min_key = rows.front(); + auto max_key = rows.front(); + for (const auto& row : rows) { + if (row < min_key) { + min_key = row; + } + if (row > max_key) { + max_key = row; + } + } + + // segments key bounds have marker + min_key = std::string {KEY_NORMAL_MARKER} + min_key; + max_key = std::string {KEY_NORMAL_MARKER} + max_key; + + cur.emplace_back(do_trunacte(min_key)); + cur.emplace_back(do_trunacte(max_key)); + } + return ret; + } + + RowsetSharedPtr create_rowset(TabletSchemaSPtr tablet_schema, SegmentsOverlapPB overlap, + const std::vector blocks, int64_t version, + bool is_vertical) { + auto writer_context = create_rowset_writer_context(tablet_schema, overlap, UINT32_MAX, + {version, version}); + auto res = RowsetFactory::create_rowset_writer(*engine_ref, writer_context, is_vertical); + EXPECT_TRUE(res.has_value()) << res.error(); + auto rowset_writer = std::move(res).value(); + + uint32_t num_rows = 0; + for (const auto& block : blocks) { + num_rows += block.rows(); + EXPECT_TRUE(rowset_writer->add_block(&block).ok()); + EXPECT_TRUE(rowset_writer->flush().ok()); + } + + RowsetSharedPtr rowset; + EXPECT_EQ(Status::OK(), rowset_writer->build(rowset)); + EXPECT_EQ(blocks.size(), rowset->rowset_meta()->num_segments()); + EXPECT_EQ(num_rows, rowset->rowset_meta()->num_rows()); + return rowset; + } + + std::string do_trunacte(std::string key) { + if (segments_key_bounds_truncation_enabled()) { + auto threshold = config::segments_key_bounds_truncation_threshold; + if (key.size() > threshold) { + key.resize(threshold); + } + } + return key; + } + + bool segments_key_bounds_truncation_enabled() { + return (config::segments_key_bounds_truncation_threshold > 0); + } + + void check_key_bounds(const std::vector>& data, + const std::vector& segments_key_bounds) { + // 1. check size + for (const auto& segments_key_bound : segments_key_bounds) { + const auto& min_key = segments_key_bound.min_key(); + const auto& max_key = segments_key_bound.max_key(); + + if (segments_key_bounds_truncation_enabled()) { + EXPECT_LE(min_key.size(), config::segments_key_bounds_truncation_threshold); + EXPECT_LE(max_key.size(), config::segments_key_bounds_truncation_threshold); + } + } + + // 2. check content + auto expected_key_bounds = get_expected_key_bounds(data); + for (std::size_t i = 0; i < expected_key_bounds.size(); i++) { + const auto& min_key = segments_key_bounds[i].min_key(); + const auto& max_key = segments_key_bounds[i].max_key(); + + EXPECT_EQ(min_key, expected_key_bounds[i][0]); + EXPECT_EQ(max_key, expected_key_bounds[i][1]); + std::cout << fmt::format("min_key={}, size={}\nmax_key={}, size={}\n", + hexdump(min_key.data(), min_key.size()), min_key.size(), + hexdump(max_key.data(), max_key.size()), max_key.size()); + } + } + + std::vector create_rowsets(TabletSchemaSPtr tablet_schema, + const std::vector>& data, + const std::vector& truncate_lengths = {}) { + std::vector rowsets; + for (size_t i {0}; i < data.size(); i++) { + const auto rows = data[i]; + if (!truncate_lengths.empty()) { + config::segments_key_bounds_truncation_threshold = truncate_lengths[i]; + } + std::vector> rowset_data {rows}; + auto blocks = generate_blocks(tablet_schema, rowset_data); + RowsetSharedPtr rowset = + create_rowset(tablet_schema, NONOVERLAPPING, blocks, cur_version++, false); + + std::vector segments_key_bounds; + rowset->rowset_meta()->get_segments_key_bounds(&segments_key_bounds); + for (const auto& segments_key_bound : segments_key_bounds) { + const auto& min_key = segments_key_bound.min_key(); + const auto& max_key = segments_key_bound.max_key(); + + LOG(INFO) << fmt::format( + "\n==== rowset_id={}, segment_key_bounds_truncated={} ====\nmin_key={}, " + "size={}\nmax_key={}, size={}\n", + rowset->rowset_id().to_string(), rowset->is_segments_key_bounds_truncated(), + min_key, min_key.size(), max_key, max_key.size()); + } + + rowsets.push_back(rowset); + RowsetReaderSharedPtr rs_reader; + EXPECT_TRUE(rowset->create_reader(&rs_reader)); + } + for (std::size_t i {0}; i < truncate_lengths.size(); i++) { + EXPECT_EQ((truncate_lengths[i] > 0), rowsets[i]->is_segments_key_bounds_truncated()); + } + return rowsets; + } + + TabletReader::ReaderParams create_reader_params( + TabletSchemaSPtr tablet_schema, const std::vector>& data, + const std::vector& truncate_lengths = {}) { + TabletReader::ReaderParams reader_params; + std::vector rowsets = + create_rowsets(tablet_schema, data, truncate_lengths); + std::vector rs_splits; + for (size_t i {0}; i < rowsets.size(); i++) { + RowsetReaderSharedPtr rs_reader; + EXPECT_TRUE(rowsets[i]->create_reader(&rs_reader)); + RowSetSplits rs_split; + rs_split.rs_reader = rs_reader; + rs_splits.emplace_back(rs_split); + } + reader_params.rs_splits = std::move(rs_splits); + return reader_params; + } +}; + +TEST_F(SegmentsKeyBoundsTruncationTest, CompareFuncTest) { + // test `Slice::lhs_is_strictly_less_than_rhs` + // enumerating all possible combinations + // this test is reduntant, n = 3 is enough + constexpr int n = 8; + std::vector datas; + for (int l = 1; l <= n; l++) { + for (int x = 0; x < (1 << l); x++) { + datas.emplace_back(fmt::format("{:0{width}b}", x, fmt::arg("width", l))); + } + } + std::cout << "datas.size()=" << datas.size() << "\n"; + + int count1 {0}, count2 {0}, total {0}; + for (size_t i = 0; i < datas.size(); i++) { + for (size_t j = 0; j < datas.size(); j++) { + Slice X {datas[i]}; + Slice Y {datas[j]}; + for (int l1 = 0; l1 <= n; l1++) { + bool X_is_truncated = (l1 != 0); + Slice a {X}; + if (X_is_truncated && X.get_size() >= l1) { + a.truncate(l1); + } + for (int l2 = 0; l2 <= n; l2++) { + bool Y_is_truncated = (l2 != 0); + Slice b {Y}; + if (Y_is_truncated && Y.get_size() >= l2) { + b.truncate(l2); + } + + bool res1 = Slice::lhs_is_strictly_less_than_rhs(a, X_is_truncated, b, + Y_is_truncated); + bool res2 = (X.compare(Y) < 0); + ++total; + if (res1 && res2) { + ++count1; + } + if (res2) { + ++count2; + } + EXPECT_FALSE(res1 && !res2) << fmt::format( + "X={}, a={}, l1={}, Y={}, b={}, l2={}, res1={}, res2={}", X.to_string(), + a.to_string(), l1, Y.to_string(), b.to_string(), l2, res1, res2); + } + } + } + } + std::cout << fmt::format("count1={}, count2={}, count1/count2={}, total={}\n", count1, count2, + double(count1) / count2, total); +} + +TEST_F(SegmentsKeyBoundsTruncationTest, BasicTruncationTest) { + { + // 1. don't do segments key bounds truncation when the config is off + config::segments_key_bounds_truncation_threshold = -1; + + auto tablet_schema = create_schema(100); + std::vector> data {{std::string(2, 'x'), std::string(3, 'y')}, + {std::string(4, 'a'), std::string(15, 'b')}, + {std::string(18, 'c'), std::string(5, 'z')}, + {std::string(20, '0'), std::string(22, '1')}}; + auto blocks = generate_blocks(tablet_schema, data); + RowsetSharedPtr rowset = create_rowset(tablet_schema, NONOVERLAPPING, blocks, 2, false); + + auto rowset_meta = rowset->rowset_meta(); + EXPECT_EQ(false, rowset_meta->is_segments_key_bounds_truncated()); + std::vector segments_key_bounds; + rowset_meta->get_segments_key_bounds(&segments_key_bounds); + EXPECT_EQ(segments_key_bounds.size(), data.size()); + check_key_bounds(data, segments_key_bounds); + } + + { + // 2. do segments key bounds truncation when the config is on + config::segments_key_bounds_truncation_threshold = 10; + + auto tablet_schema = create_schema(100); + std::vector> data {{std::string(2, 'x'), std::string(3, 'y')}, + {std::string(4, 'a'), std::string(15, 'b')}, + {std::string(18, 'c'), std::string(5, 'z')}, + {std::string(20, '0'), std::string(22, '1')}}; + auto blocks = generate_blocks(tablet_schema, data); + RowsetSharedPtr rowset = create_rowset(tablet_schema, NONOVERLAPPING, blocks, 2, false); + + auto rowset_meta = rowset->rowset_meta(); + EXPECT_EQ(true, rowset_meta->is_segments_key_bounds_truncated()); + std::vector segments_key_bounds; + rowset_meta->get_segments_key_bounds(&segments_key_bounds); + EXPECT_EQ(segments_key_bounds.size(), data.size()); + check_key_bounds(data, segments_key_bounds); + } + + { + // 3. segments_key_bounds_truncated should be set to false if no actual truncation happend + config::segments_key_bounds_truncation_threshold = 100; + + auto tablet_schema = create_schema(100); + std::vector> data {{std::string(2, 'x'), std::string(3, 'y')}, + {std::string(4, 'a'), std::string(15, 'b')}, + {std::string(18, 'c'), std::string(5, 'z')}, + {std::string(20, '0'), std::string(22, '1')}}; + auto blocks = generate_blocks(tablet_schema, data); + RowsetSharedPtr rowset = create_rowset(tablet_schema, NONOVERLAPPING, blocks, 2, false); + + auto rowset_meta = rowset->rowset_meta(); + EXPECT_EQ(false, rowset_meta->is_segments_key_bounds_truncated()); + } +} + +TEST_F(SegmentsKeyBoundsTruncationTest, BlockReaderJudgeFuncTest) { + auto tablet_schema = create_schema(100); + + { + // all rowsets are truncated to same size + // keys are distinctable from any index + std::vector> data {{"aaaaaaaaa", "bbbbb"}, + {"cccccc", "dddddd"}, + {"eeeeeee", "fffffff"}, + {"xxxxxxx", "yyyyyyyy"}}; + { + disable_segments_key_bounds_truncation(); + TabletReader::ReaderParams read_params = create_reader_params(tablet_schema, data); + vectorized::BlockReader block_reader; + EXPECT_FALSE(block_reader._rowsets_not_mono_asc_disjoint(read_params)); + } + + { + config::segments_key_bounds_truncation_threshold = 3; + TabletReader::ReaderParams read_params = create_reader_params(tablet_schema, data); + vectorized::BlockReader block_reader; + // can still determine that segments are non ascending after truncation + EXPECT_FALSE(block_reader._rowsets_not_mono_asc_disjoint(read_params)); + } + } + + { + // all rowsets are truncated to same size + // keys are distinctable from any index before truncation + // some keys are not comparable after truncation + std::vector> data {{"aaaaaaaaa", "bbbbb"}, + {"cccccccccccc", "ccdddddddd"}, + {"cceeeeeeee", "fffffff"}, + {"xxxxxxx", "yyyyyyyy"}}; + { + disable_segments_key_bounds_truncation(); + TabletReader::ReaderParams read_params = create_reader_params(tablet_schema, data); + vectorized::BlockReader block_reader; + EXPECT_FALSE(block_reader._rowsets_not_mono_asc_disjoint(read_params)); + } + + { + config::segments_key_bounds_truncation_threshold = 6; + TabletReader::ReaderParams read_params = create_reader_params(tablet_schema, data); + vectorized::BlockReader block_reader; + EXPECT_FALSE(block_reader._rowsets_not_mono_asc_disjoint(read_params)); + } + + { + config::segments_key_bounds_truncation_threshold = 3; + TabletReader::ReaderParams read_params = create_reader_params(tablet_schema, data); + vectorized::BlockReader block_reader; + // can not determine wether rowset 2 and rowset 3 are mono ascending + EXPECT_TRUE(block_reader._rowsets_not_mono_asc_disjoint(read_params)); + } + } + + { + // all rowsets are truncated to same size + // keys are not mono ascending before truncation + std::vector> data {{"aaaaaaaaa", "bbbbb"}, + {"bbbbb", "cccccccc"}, + {"cccccccc", "xxxxxxx"}, + {"xxxxxxx", "yyyyyyyy"}}; + { + disable_segments_key_bounds_truncation(); + TabletReader::ReaderParams read_params = create_reader_params(tablet_schema, data); + vectorized::BlockReader block_reader; + EXPECT_TRUE(block_reader._rowsets_not_mono_asc_disjoint(read_params)); + } + + { + config::segments_key_bounds_truncation_threshold = 3; + TabletReader::ReaderParams read_params = create_reader_params(tablet_schema, data); + vectorized::BlockReader block_reader; + EXPECT_TRUE(block_reader._rowsets_not_mono_asc_disjoint(read_params)); + } + } + + { + // some rowsets are truncated, some are not + std::vector> data {{"aaaaaaaaa", "bbbbbbccccccc"}, + {"bbbbbbddddddd", "dddddd"}}; + { + TabletReader::ReaderParams read_params = + create_reader_params(tablet_schema, data, {-1, 9}); + vectorized::BlockReader block_reader; + EXPECT_FALSE(block_reader._rowsets_not_mono_asc_disjoint(read_params)); + } + + { + TabletReader::ReaderParams read_params = + create_reader_params(tablet_schema, data, {-1, 4}); + vectorized::BlockReader block_reader; + EXPECT_TRUE(block_reader._rowsets_not_mono_asc_disjoint(read_params)); + } + + { + TabletReader::ReaderParams read_params = + create_reader_params(tablet_schema, data, {9, -1}); + vectorized::BlockReader block_reader; + EXPECT_FALSE(block_reader._rowsets_not_mono_asc_disjoint(read_params)); + } + + { + TabletReader::ReaderParams read_params = + create_reader_params(tablet_schema, data, {4, -1}); + vectorized::BlockReader block_reader; + EXPECT_TRUE(block_reader._rowsets_not_mono_asc_disjoint(read_params)); + } + } + + { + // some rowsets are truncated, some are not, truncated lengths may be different + { + std::vector> data {{"aaaaaaaaa", "bbbbbbbb"}, + {"ccccccccc", "dddddd"}, + {"eeeeeee", "ffffffggggg"}, + {"ffffffhhhhhh", "hhhhhhh"}, + {"iiiiiiii", "jjjjjjjjj"}}; + TabletReader::ReaderParams read_params = + create_reader_params(tablet_schema, data, {4, 5, 4, -1, 6}); + vectorized::BlockReader block_reader; + EXPECT_TRUE(block_reader._rowsets_not_mono_asc_disjoint(read_params)); + } + { + std::vector> data {{"aaaaaaaaa", "bbbbbbbb"}, + {"ccccccccc", "dddddd"}, + {"eeeeeee", "ffffffggggg"}, + {"ffffffhhhhhh", "hhhhhhh"}, + {"iiiiiiii", "jjjjjjjjj"}}; + TabletReader::ReaderParams read_params = + create_reader_params(tablet_schema, data, {4, 5, 8, -1, 6}); + vectorized::BlockReader block_reader; + EXPECT_FALSE(block_reader._rowsets_not_mono_asc_disjoint(read_params)); + } + + { + std::vector> data {{"aaaaaaaaa", "bbbbbbbb"}, + {"ccccccccc", "dddddd"}, + {"eeeeeee", "ffffffggggg"}, + {"ffffffhhhhhh", "hhhhhhh"}, + {"iiiiiiii", "jjjjjjjjj"}}; + TabletReader::ReaderParams read_params = + create_reader_params(tablet_schema, data, {4, 5, -1, 4, 6}); + vectorized::BlockReader block_reader; + EXPECT_TRUE(block_reader._rowsets_not_mono_asc_disjoint(read_params)); + } + { + std::vector> data {{"aaaaaaaaa", "bbbbbbbb"}, + {"ccccccccc", "dddddd"}, + {"eeeeeee", "ffffffggggg"}, + {"ffffffhhhhhh", "hhhhhhh"}, + {"iiiiiiii", "jjjjjjjjj"}}; + TabletReader::ReaderParams read_params = + create_reader_params(tablet_schema, data, {4, 5, -1, 8, 6}); + vectorized::BlockReader block_reader; + EXPECT_FALSE(block_reader._rowsets_not_mono_asc_disjoint(read_params)); + } + + { + std::vector> data {{"aaaaaaaaa", "bbbbbbbb"}, + {"ccccccccc", "dddddd"}, + {"eeeeeee", "ffffffggggg"}, + {"ffffffhhhhhh", "hhhhhhh"}, + {"iiiiiiii", "jjjjjjjjj"}}; + TabletReader::ReaderParams read_params = + create_reader_params(tablet_schema, data, {4, 5, 8, 4, 6}); + vectorized::BlockReader block_reader; + EXPECT_TRUE(block_reader._rowsets_not_mono_asc_disjoint(read_params)); + } + { + std::vector> data {{"aaaaaaaaa", "bbbbbbbb"}, + {"ccccccccc", "dddddd"}, + {"eeeeeee", "ffffffggggg"}, + {"ffffffhhhhhh", "hhhhhhh"}, + {"iiiiiiii", "jjjjjjjjj"}}; + TabletReader::ReaderParams read_params = + create_reader_params(tablet_schema, data, {4, 5, 4, 8, 6}); + vectorized::BlockReader block_reader; + EXPECT_TRUE(block_reader._rowsets_not_mono_asc_disjoint(read_params)); + } + { + std::vector> data {{"aaaaaaaaa", "bbbbbbbb"}, + {"ccccccccc", "dddddd"}, + {"eeeeeee", "ffffffggggg"}, + {"ffffffhhhhhh", "hhhhhhh"}, + {"iiiiiiii", "jjjjjjjjj"}}; + TabletReader::ReaderParams read_params = + create_reader_params(tablet_schema, data, {4, 5, 8, 9, 6}); + vectorized::BlockReader block_reader; + EXPECT_FALSE(block_reader._rowsets_not_mono_asc_disjoint(read_params)); + } + { + std::vector> data {{"aaaaaaaaa", "bbbbbbbb"}, + {"ccccccccc", "dddddd"}, + {"eeeeeee", "ffffffggggg"}, + {"ffffffhhhhhh", "hhhhhhh"}, + {"iiiiiiii", "jjjjjjjjj"}}; + TabletReader::ReaderParams read_params = + create_reader_params(tablet_schema, data, {4, 5, 3, 4, 6}); + vectorized::BlockReader block_reader; + EXPECT_TRUE(block_reader._rowsets_not_mono_asc_disjoint(read_params)); + } + } +} + +TEST_F(SegmentsKeyBoundsTruncationTest, OrderedCompactionTest) { + auto tablet_schema = create_schema(100); + config::enable_ordered_data_compaction = true; + config::ordered_data_compaction_min_segment_size = 1; + + { + disable_segments_key_bounds_truncation(); + TabletSharedPtr tablet = create_tablet(*tablet_schema, false); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(tablet->tablet_path()).ok()); + std::vector> data {{"aaaaaaaaa", "bbbbbcccccc"}, + {"bbbbbddddddd", "dddddd"}, + {"eeeeeee", "fffffffff"}, + {"gggggggg", "hhhhhhh"}, + {"iiiiiiii", "jjjjjjjjj"}}; + auto input_rowsets = create_rowsets(tablet_schema, data); + CumulativeCompaction cu_compaction(*engine_ref, tablet); + cu_compaction._input_rowsets = std::move(input_rowsets); + EXPECT_TRUE(cu_compaction.handle_ordered_data_compaction()); + EXPECT_EQ(cu_compaction._input_rowsets.size(), data.size()); + } + + { + TabletSharedPtr tablet = create_tablet(*tablet_schema, false); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(tablet->tablet_path()).ok()); + std::vector> data {{"aaaaaaaaa", "bbbbbcccccc"}, + {"bbbbbddddddd", "dddddd"}, + {"eeeeeee", "fffffffff"}, + {"gggggggg", "hhhhhhh"}, + {"iiiiiiii", "jjjjjjjjj"}}; + auto input_rowsets = create_rowsets(tablet_schema, data, {4, 4, 4, 4, 4}); + CumulativeCompaction cu_compaction(*engine_ref, tablet); + cu_compaction._input_rowsets = std::move(input_rowsets); + EXPECT_FALSE(cu_compaction.handle_ordered_data_compaction()); + } + + { + TabletSharedPtr tablet = create_tablet(*tablet_schema, false); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(tablet->tablet_path()).ok()); + std::vector> data {{"aaaaaaaaa", "bbbbbcccccc"}, + {"bbbbbddddddd", "dddddd"}, + {"eeeeeee", "fffffffff"}, + {"gggggggg", "hhhhhhh"}, + {"iiiiiiii", "jjjjjjjjj"}}; + auto input_rowsets = create_rowsets(tablet_schema, data, {4, 8, 4, 4, 4}); + CumulativeCompaction cu_compaction(*engine_ref, tablet); + cu_compaction._input_rowsets = std::move(input_rowsets); + EXPECT_FALSE(cu_compaction.handle_ordered_data_compaction()); + } + + { + TabletSharedPtr tablet = create_tablet(*tablet_schema, false); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(tablet->tablet_path()).ok()); + std::vector> data {{"aaaaaaaaa", "bbbbbcccccc"}, + {"bbbbbddddddd", "dddddd"}, + {"eeeeeee", "fffffffff"}, + {"gggggggg", "hhhhhhh"}, + {"iiiiiiii", "jjjjjjjjj"}}; + auto input_rowsets = create_rowsets(tablet_schema, data, {8, 4, 4, 4, 4}); + CumulativeCompaction cu_compaction(*engine_ref, tablet); + cu_compaction._input_rowsets = std::move(input_rowsets); + EXPECT_FALSE(cu_compaction.handle_ordered_data_compaction()); + } + + { + TabletSharedPtr tablet = create_tablet(*tablet_schema, false); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(tablet->tablet_path()).ok()); + std::vector> data {{"aaaaaaaaa", "bbbbbcccccc"}, + {"bbbbbddddddd", "dddddd"}, + {"eeeeeee", "fffffffff"}, + {"gggggggg", "hhhhhhh"}, + {"iiiiiiii", "jjjjjjjjj"}}; + auto input_rowsets = create_rowsets(tablet_schema, data, {8, 9, 4, 4, 4}); + CumulativeCompaction cu_compaction(*engine_ref, tablet); + cu_compaction._input_rowsets = std::move(input_rowsets); + EXPECT_TRUE(cu_compaction.handle_ordered_data_compaction()); + EXPECT_EQ(cu_compaction._input_rowsets.size(), data.size()); + } + + { + TabletSharedPtr tablet = create_tablet(*tablet_schema, false); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(tablet->tablet_path()).ok()); + std::vector> data {{"aaaaaaaaa", "bbbbbcccccc"}, + {"bbbbbddddddd", "dddddd"}, + {"eeeeeee", "fffffffff"}, + {"gggggggg", "hhhhhhh"}, + {"iiiiiiii", "jjjjjjjjj"}}; + auto input_rowsets = create_rowsets(tablet_schema, data, {8, -1, 4, 4, 4}); + CumulativeCompaction cu_compaction(*engine_ref, tablet); + cu_compaction._input_rowsets = std::move(input_rowsets); + EXPECT_TRUE(cu_compaction.handle_ordered_data_compaction()); + EXPECT_EQ(cu_compaction._input_rowsets.size(), data.size()); + } + + { + TabletSharedPtr tablet = create_tablet(*tablet_schema, false); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(tablet->tablet_path()).ok()); + std::vector> data {{"aaaaaaaaa", "bbbbbcccccc"}, + {"bbbbbddddddd", "dddddd"}, + {"eeeeeee", "fffffffff"}, + {"gggggggg", "hhhhhhh"}, + {"iiiiiiii", "jjjjjjjjj"}}; + auto input_rowsets = create_rowsets(tablet_schema, data, {-1, 9, 4, 4, 4}); + CumulativeCompaction cu_compaction(*engine_ref, tablet); + cu_compaction._input_rowsets = std::move(input_rowsets); + EXPECT_TRUE(cu_compaction.handle_ordered_data_compaction()); + EXPECT_EQ(cu_compaction._input_rowsets.size(), data.size()); + } + + { + TabletSharedPtr tablet = create_tablet(*tablet_schema, false); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(tablet->tablet_path()).ok()); + std::vector> data {{"aaaaaaaaa", "bbbbbcccccc"}, + {"bbbbbddddddd", "dddddd"}, + {"eeeeeee", "fffffffff"}, + {"gggggggg", "hhhhhhh"}, + {"iiiiiiii", "jjjjjjjjj"}}; + auto input_rowsets = create_rowsets(tablet_schema, data, {-1, 4, 4, 4, 4}); + CumulativeCompaction cu_compaction(*engine_ref, tablet); + cu_compaction._input_rowsets = std::move(input_rowsets); + EXPECT_FALSE(cu_compaction.handle_ordered_data_compaction()); + } + + { + TabletSharedPtr tablet = create_tablet(*tablet_schema, false); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(tablet->tablet_path()).ok()); + std::vector> data {{"aaaaaaaaa", "bbbbbcccccc"}, + {"bbbbbddddddd", "dddddd"}, + {"eeeeeee", "fffffffff"}, + {"gggggggg", "hhhhhhh"}, + {"iiiiiiii", "jjjjjjjjj"}}; + auto input_rowsets = create_rowsets(tablet_schema, data, {4, -1, 4, 4, 4}); + CumulativeCompaction cu_compaction(*engine_ref, tablet); + cu_compaction._input_rowsets = std::move(input_rowsets); + EXPECT_FALSE(cu_compaction.handle_ordered_data_compaction()); + } +} +} // namespace doris diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 8904ffc74e4450..92d6a9fe3b8e02 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -109,7 +109,8 @@ message RowsetMetaPB { // latest write time optional int64 newest_write_timestamp = 26 [default = -1]; // the encoded segment min/max key of segments in this rowset, - // only used in unique key data model with primary_key_index support. + // ATTN: segments_key_bounds may be truncated! please refer to field `segments_key_bounds_truncated` + // to check if these are truncated segments key bounds repeated KeyBoundsPB segments_key_bounds = 27; // tablet meta pb, for compaction optional TabletSchemaPB tablet_schema = 28; @@ -121,6 +122,8 @@ message RowsetMetaPB { // For backup/restore, record the tablet id and rowset id of the source cluster. optional int64 source_tablet_id = 53; optional string source_rowset_id = 54; + // indicate that whether the segments key bounds is truncated + optional bool segments_key_bounds_truncated = 55; // For cloud // for data recycling @@ -195,7 +198,8 @@ message RowsetMetaCloudPB { // latest write time optional int64 newest_write_timestamp = 26 [default = -1]; // the encoded segment min/max key of segments in this rowset, - // only used in unique key data model with primary_key_index support. + // ATTN: segments_key_bounds may be truncated! please refer to field `segments_key_bounds_truncated` + // to check if these are truncated segments key bounds repeated KeyBoundsPB segments_key_bounds = 27; // tablet meta pb, for compaction optional TabletSchemaCloudPB tablet_schema = 28; @@ -209,6 +213,8 @@ message RowsetMetaCloudPB { // For backup/restore, record the tablet id and rowset id of the source cluster. optional int64 source_tablet_id = 53; optional string source_rowset_id = 54; + // indicate that whether the segments key bounds is truncated + optional bool segments_key_bounds_truncated = 55; // cloud // the field is a vector, rename it diff --git a/regression-test/data/unique_with_mow_p0/key_bounds/test_key_bounds_truncation_read_scenarios.out b/regression-test/data/unique_with_mow_p0/key_bounds/test_key_bounds_truncation_read_scenarios.out new file mode 100644 index 00000000000000..07b0edba4b0d79 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/key_bounds/test_key_bounds_truncation_read_scenarios.out @@ -0,0 +1,57 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +3757202 1 1 9 +3757202 1 2 8 +3757202 1 3 7 +3757202 2 1 9 +3757202 2 2 8 +3757202 2 3 7 +3757202 3 1 9 +3757202 3 2 8 +3757202 3 3 7 +3757202 4 1 9 +3757202 4 2 8 +3757202 4 3 7 +3757202 5 1 9 +3757202 5 2 8 +3757202 5 3 7 +3757202 6 1 9 +3757202 6 2 8 +3757202 6 3 7 +3757202 7 1 9 +3757202 7 2 8 +3757202 7 3 7 +3757202 8 1 9 +3757202 8 2 8 +3757202 8 3 7 +3757202 9 1 9 +3757202 9 2 8 +3757202 9 3 7 + +-- !sql -- +3757202 1 1 9 + +-- !sql -- +3757202 2 1 9 + +-- !sql -- +3757202 3 1 9 + +-- !sql -- +3757202 4 1 9 + +-- !sql -- +3757202 5 1 9 + +-- !sql -- +3757202 6 1 9 + +-- !sql -- +3757202 7 1 9 + +-- !sql -- +3757202 8 1 9 + +-- !sql -- +3757202 9 1 9 + diff --git a/regression-test/data/unique_with_mow_p0/key_bounds/test_key_bounds_truncation_write_scenarios.out b/regression-test/data/unique_with_mow_p0/key_bounds/test_key_bounds_truncation_write_scenarios.out new file mode 100644 index 00000000000000..0fdc0f6bd9e48d --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/key_bounds/test_key_bounds_truncation_write_scenarios.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +210 + diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Http.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Http.groovy index 2a63f8763df80f..e8535cae91772d 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Http.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Http.groovy @@ -47,13 +47,17 @@ class Http { } } - static Object GET(url, isJson = false) { + static Object GET(url, isJson = false, printText = true) { def conn = new URL(url).openConnection() conn.setRequestMethod('GET') conn.setRequestProperty('Authorization', 'Basic cm9vdDo=') //token for root def code = conn.responseCode def text = conn.content.text - logger.info("http get url=${url}, isJson=${isJson}, response code=${code}, text=${text}") + if (printText) { + logger.info("http get url=${url}, isJson=${isJson}, response code=${code}, text=${text}") + } else { + logger.info("http get url=${url}, isJson=${isJson}, response code=${code}") + } Assert.assertEquals(200, code) if (isJson) { def json = new JsonSlurper() diff --git a/regression-test/suites/unique_with_mow_p0/key_bounds/test_key_bounds_truncation_basic.groovy b/regression-test/suites/unique_with_mow_p0/key_bounds/test_key_bounds_truncation_basic.groovy new file mode 100644 index 00000000000000..d9fad5970d0607 --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/key_bounds/test_key_bounds_truncation_basic.groovy @@ -0,0 +1,122 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import com.google.common.collect.Maps +import org.apache.doris.regression.util.Http + +suite("test_key_bounds_truncation_basic", "nonConcurrent") { + + // see be/src/util/key_util.h:50 + def keyNormalMarker = new String(new Byte[]{2}) + + def tableName = "test_key_bounds_truncation_basic" + sql """ DROP TABLE IF EXISTS ${tableName} force;""" + sql """ CREATE TABLE ${tableName} ( + `k` varchar(65533) NOT NULL, + `v` int) + UNIQUE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES("replication_num" = "1", + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true"); """ + + def getRowsetMetas = { int version -> + def metaUrl = sql_return_maparray("show tablets from ${tableName};").get(0).MetaUrl + def jsonMeta = Http.GET(metaUrl, true, false) + for (def meta : jsonMeta.rs_metas) { + int end_version = meta.end_version + if (end_version == version) { + return meta + } + } + } + + def truncateString = { String s, int l -> + if (s.size() > l) { + return s.substring(0, l) + } + return s + } + + def checkKeyBounds = { String k1, String k2, int version, boolean doTruncation, int length, boolean expected -> + def rowsetMeta = getRowsetMetas(version) + def keyBounds = rowsetMeta.segments_key_bounds + assertEquals(keyBounds.size(), 1) + def bounds = keyBounds.get(0) + + String min_key = bounds.min_key + String max_key = bounds.max_key + + String expected_min_key = keyNormalMarker + k1 + String expected_max_key = keyNormalMarker + k2 + if (doTruncation) { + expected_min_key = truncateString(expected_min_key, length) + expected_max_key = truncateString(expected_max_key, length) + } + + logger.info("\nk1=${k1}, size=${k1.size()}, k2=${k2}, size=${k2.size()}") + logger.info("\nexpected_min_key=${expected_min_key}, size=${expected_min_key.size()}, expected_max_key=${expected_max_key}, size=${expected_max_key.size()}") + logger.info("\nmin_key=${min_key}, size=${min_key.size()}\nmax_key=${max_key}, size=${max_key.size()}") + logger.info("\nsegments_key_bounds_truncated=${rowsetMeta.segments_key_bounds_truncated}, expected=${expected}") + assertEquals(min_key, expected_min_key) + assertEquals(max_key, expected_max_key) + + assertEquals(expected, rowsetMeta.segments_key_bounds_truncated) + } + + int curVersion = 1 + + // 1. turn off enable_segments_key_bounds_truncation, should not do truncation + def customBeConfig = [ + segments_key_bounds_truncation_threshold : -1 + ] + + setBeConfigTemporary(customBeConfig) { + String key1 = "aaaaaaaaaaaa" + String key2 = "bbbbbbzzzzzzzzzzz" + sql """insert into ${tableName} values("$key1", 1), ("$key2", 2);""" + checkKeyBounds(key1, key2, ++curVersion, false, -1, false) + } + + // 2. turn on enable_segments_key_bounds_truncation, should do truncation + customBeConfig = [ + segments_key_bounds_truncation_threshold : 6 + ] + + setBeConfigTemporary(customBeConfig) { + String key1 = "aa" + String key2 = "bbbb" + sql """insert into ${tableName} values("$key1", 1), ("$key2", 2);""" + checkKeyBounds(key1, key2, ++curVersion, true, 6, false) + + key1 = "000000000000000" + key2 = "1111111111111111111" + sql """insert into ${tableName} values("$key1", 1), ("$key2", 2);""" + checkKeyBounds(key1, key2, ++curVersion, true, 6, true) + + key1 = "xxx" + key2 = "yyyyyyyyyyyyyyyyyyyyy" + sql """insert into ${tableName} values("$key1", 1), ("$key2", 2);""" + checkKeyBounds(key1, key2, ++curVersion, true, 6, true) + + key1 = "cccccccccccccccccccc" + key2 = "dddd" + sql """insert into ${tableName} values("$key1", 1), ("$key2", 2);""" + checkKeyBounds(key1, key2, ++curVersion, true, 6, true) + } + +} diff --git a/regression-test/suites/unique_with_mow_p0/key_bounds/test_key_bounds_truncation_read_scenarios.groovy b/regression-test/suites/unique_with_mow_p0/key_bounds/test_key_bounds_truncation_read_scenarios.groovy new file mode 100644 index 00000000000000..2620e302faa12a --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/key_bounds/test_key_bounds_truncation_read_scenarios.groovy @@ -0,0 +1,100 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import com.google.common.collect.Maps +import org.apache.commons.lang.RandomStringUtils +import org.apache.doris.regression.util.Http +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_key_bounds_truncation_read_scenarios", "nonConcurrent") { + + def tableName = "test_key_bounds_truncation_read_scenarios" + + sql "DROP TABLE IF EXISTS ${tableName}" + sql """ + CREATE TABLE `${tableName}` ( + `k1` int NOT NULL, + `k2` int NOT NULL, + `k3` int NOT NULL, + `c1` int NOT NULL ) + ENGINE=OLAP UNIQUE KEY(k1,k2,k3) + DISTRIBUTED BY HASH(k1,k2,k3) BUCKETS 1 + PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "disable_auto_compaction"="true", + "store_row_column" = "true", "enable_mow_light_delete" = "false" ); + """ + + def getRowsetMetas = { int version -> + def metaUrl = sql_return_maparray("show tablets from ${tableName};").get(0).MetaUrl + def jsonMeta = Http.GET(metaUrl, true, false) + for (def meta : jsonMeta.rs_metas) { + int end_version = meta.end_version + if (end_version == version) { + return meta + } + } + } + + def checkKeyBounds = { int version, int length, boolean turnedOn -> + def rowsetMeta = getRowsetMetas(version) + def keyBounds = rowsetMeta.segments_key_bounds + + logger.info("\nversion=${version}, segments_key_bounds_truncated=${rowsetMeta.segments_key_bounds_truncated}, turnedOn=${turnedOn}") + assertEquals(turnedOn, rowsetMeta.segments_key_bounds_truncated) + + for (def bounds : keyBounds) { + String min_key = bounds.min_key + String max_key = bounds.max_key + logger.info("\nmin_key=${min_key}, size=${min_key.size()}\nmax_key=${max_key}, size=${max_key.size()}") + assertTrue(min_key.size() <= length) + assertTrue(max_key.size() <= length) + } + } + + + def customBeConfig = [ + segments_key_bounds_truncation_threshold : 2 + ] + + setBeConfigTemporary(customBeConfig) { + // 1. mow load + int k1 = 3757202 + for (int j=1;j<=10;j++) { + for (int i=1;i<=9;i++) { + sql """insert into ${tableName} values + (${k1},${i},1,9), + (${k1},${i},2,8), + (${k1},${i},3,7)""" + } + } + (2..91).each { idx -> + checkKeyBounds(idx, 2, true) + } + qt_sql "select * from ${tableName} order by k1,k2,k3;" + + + // 2. point lookup on mow table + for (int i=1;i<=9;i++) { + explain { + sql """ select * from ${tableName} where k1=${k1} and k2=${i} and k3=1; """ + contains "SHORT-CIRCUIT" + } + qt_sql """ select * from ${tableName} where k1=${k1} and k2=${i} and k3=1; """ + } + } +} diff --git a/regression-test/suites/unique_with_mow_p0/key_bounds/test_key_bounds_truncation_write_scenarios.groovy b/regression-test/suites/unique_with_mow_p0/key_bounds/test_key_bounds_truncation_write_scenarios.groovy new file mode 100644 index 00000000000000..bbb479a7bd3328 --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/key_bounds/test_key_bounds_truncation_write_scenarios.groovy @@ -0,0 +1,284 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import com.google.common.collect.Maps +import org.apache.commons.lang.RandomStringUtils +import org.apache.doris.regression.util.Http +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_key_bounds_truncation_write_scenarios", "nonConcurrent") { + + def tableName = "test_key_bounds_truncation_write_scenarios" + sql """ DROP TABLE IF EXISTS ${tableName} force;""" + sql """ CREATE TABLE ${tableName} ( + `k` varchar(65533) NOT NULL, + `v1` int, + v2 int, + v3 int not null default '99') + UNIQUE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES("replication_num" = "1", + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true"); """ + + def printCompactionStatus = { tblName -> + def tablets = sql_return_maparray("show tablets from ${tblName};") + for (def tabletStat : tablets) { + def compactionStatusUrl = tabletStat.CompactionStatus + def jsonMeta = Http.GET(compactionStatusUrl, true, false) + logger.info("${jsonMeta.rowsets}") + } + } + + def checkKeyBounds = { int length, int version = -1 -> + def tablets = sql_return_maparray("show tablets from ${tableName};") + for (def tabletStat : tablets) { + def metaUrl = tabletStat.MetaUrl + def tabletId = tabletStat.TabletId + logger.info("begin curl ${metaUrl}") + def jsonMeta = Http.GET(metaUrl, true, false) + for (def meta : jsonMeta.rs_metas) { + int end_version = meta.end_version + if (version != -1 && version != end_version) { + continue + } + logger.info("version=[${meta.start_version}-${meta.end_version}], meta.segments_key_bounds_truncated=${meta.segments_key_bounds_truncated}") + if (end_version >= 2 && meta.num_rows > 0) { + assert meta.segments_key_bounds_truncated + } + for (def bounds : meta.segments_key_bounds) { + String min_key = bounds.min_key + String max_key = bounds.max_key + // only check length here + logger.info("tablet_id=${tabletId}, version=[${meta.start_version}-${meta.end_version}]\nmin_key=${min_key}, size=${min_key.size()}\nmax_key=${max_key}, size=${max_key.size()}") + assert min_key.size() <= length + assert max_key.size() <= length + } + } + } + } + + def enable_publish_spin_wait = { + if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + } else { + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + } + } + + def disable_publish_spin_wait = { + if (isCloudMode()) { + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + } else { + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + } + } + + def enable_block_in_publish = { + if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + } else { + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + } + } + + def disable_block_in_publish = { + if (isCloudMode()) { + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + } else { + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + } + } + + + Random random = new Random() + def randomString = { -> + int count = random.nextInt(200) + 20 + return RandomStringUtils.randomAlphabetic(count); + } + + def customBeConfig = [ + segments_key_bounds_truncation_threshold : 20 + ] + + setBeConfigTemporary(customBeConfig) { + + // 1. load + logger.info("============= load ==============") + int m = 10, n = 20 + for (int i = 0; i < m; i++) { + String sqlStr = "insert into ${tableName} values" + for (int j = 1; j <= n; j++) { + sqlStr += """("${randomString()}", 1, 1, 1)""" + if (j < n) { + sqlStr += "," + } + } + sqlStr += ";" + sql sqlStr + } + printCompactionStatus(tableName) + checkKeyBounds(20) + + + // 2. partial update with publish conflict, will generate new segment and update rowset in publish phase + logger.info("============= partial update ==============") + set_be_param("segments_key_bounds_truncation_threshold", 16) + Thread.sleep(2000) + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + enable_publish_spin_wait() + enable_block_in_publish() + + String values = "" + for (int i = 1; i <= m; i++) { + values += """("${randomString()}", 2)""" + if (i < m) { + values += "," + } + } + + Thread.sleep(200) + + def t1 = Thread.start { + sql "set enable_insert_strict = false;" + sql "set enable_unique_key_partial_update = true;" + sql "sync;" + sql """ insert into ${tableName}(k,v1) values ${values};""" + } + + def t2 = Thread.start { + sql "set enable_insert_strict = false;" + sql "set enable_unique_key_partial_update = true;" + sql "sync;" + sql """ insert into ${tableName}(k,v2) values ${values};""" + } + + Thread.sleep(1500) + disable_publish_spin_wait() + disable_block_in_publish() + + t1.join() + t2.join() + + sql "set enable_unique_key_partial_update = false;" + sql "set enable_insert_strict = true;" + sql "sync;" + + Thread.sleep(1000) + printCompactionStatus(tableName) + checkKeyBounds(16, 12) + checkKeyBounds(16, 13) + + } finally { + disable_publish_spin_wait() + disable_block_in_publish() + } + + + // 3. schema change + def doSchemaChange = { cmd -> + sql cmd + waitForSchemaChangeDone { + sql """SHOW ALTER TABLE COLUMN WHERE TableName='${tableName}' ORDER BY createtime DESC LIMIT 1""" + time 20000 + } + } + + // direct schema change + logger.info("============= schema change 1 ==============") + set_be_param("segments_key_bounds_truncation_threshold", 12) + Thread.sleep(1000) + doSchemaChange " ALTER table ${tableName} modify column v2 varchar(100)" + printCompactionStatus(tableName) + checkKeyBounds(12) + sql "insert into ${tableName} select * from ${tableName};" + def res1 = sql "select k from ${tableName} group by k having count(*)>1;" + assert res1.size() == 0 + + // linked schema change + logger.info("============= schema change 2 ==============") + set_be_param("segments_key_bounds_truncation_threshold", 20) + Thread.sleep(1000) + doSchemaChange " ALTER table ${tableName} modify column v3 int null default '99'" + printCompactionStatus(tableName) + // will use previous rowsets' segment key bounds + // so the length is still 12 + checkKeyBounds(12) + sql "insert into ${tableName} select * from ${tableName};" + def res2 = sql "select k from ${tableName} group by k having count(*)>1;" + assert res2.size() == 0 + + // sort schema change + logger.info("============= schema change 3 ==============") + set_be_param("segments_key_bounds_truncation_threshold", 15) + Thread.sleep(2000) + doSchemaChange " ALTER table ${tableName} add column k2 int key after k;" + doSchemaChange " ALTER table ${tableName} order by (k2,k,v1,v2,v3);" + printCompactionStatus(tableName) + checkKeyBounds(15) + sql "insert into ${tableName} select * from ${tableName};" + def res3 = sql "select k from ${tableName} group by k having count(*)>1;" + assert res3.size() == 0 + + // 4. compaction + logger.info("============= compaction ==============") + set_be_param("segments_key_bounds_truncation_threshold", 8) + Thread.sleep(2000) + def triggerFullCompaction = { + def beNodes = sql_return_maparray("show backends;") + def tabletStat = sql_return_maparray("show tablets from ${tableName};").get(0) + def tabletBackendId = tabletStat.BackendId + def tabletId = tabletStat.TabletId + def tabletBackend; + for (def be : beNodes) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + + logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") + def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + assertEquals("success", compactJson.status.toLowerCase()) + + // wait for full compaction to complete + Awaitility.await().atMost(3, TimeUnit.SECONDS).pollDelay(200, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until( + { + (code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + return !compactionStatus.run_status + } + ) + } + + // trigger full compaction on tablet + triggerFullCompaction() + checkKeyBounds(8) + + qt_sql "select count(*) from ${tableName};" + } +} From 37289ce6f8a03fc412c6c6fd46f11b61f3ba2618 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Tue, 10 Jun 2025 11:24:13 +0800 Subject: [PATCH 2/3] format --- be/src/olap/base_tablet.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 5b0fe1b881ee45..e6d669b6325170 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -488,7 +488,7 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest // use PrimaryKeyIndexMetaPB in primary key index? if (schema->cluster_key_idxes().empty()) { if (key_is_not_in_segment(key_without_seq, segments_key_bounds[j], - rs->rowset_meta()->is_segments_key_bounds_truncated())) { + rs->rowset_meta()->is_segments_key_bounds_truncated())) { continue; } } From 5ea7ad762298ad2c865be81830e214a72a8db006 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Tue, 10 Jun 2025 14:17:03 +0800 Subject: [PATCH 3/3] fix --- be/src/olap/base_tablet.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index e6d669b6325170..8c8e52be30f90e 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -492,7 +492,7 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest continue; } } - picked_segments.emplace_back(i); + picked_segments.emplace_back(j); } if (picked_segments.empty()) { continue;