diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index fa2d89352be985..749f81893988bd 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -1132,8 +1132,14 @@ Status Compaction::find_longest_consecutive_version(std::vector if (rowsets->empty()) { return Status::OK(); } + RowsetSharedPtr prev_rowset = rowsets->front(); size_t i = 1; + int max_start = 0; + int max_length = 1; + + int start = 0; + int length = 1; for (; i < rowsets->size(); ++i) { RowsetSharedPtr rowset = (*rowsets)[i]; if (rowset->start_version() != prev_rowset->end_version() + 1) { @@ -1141,12 +1147,20 @@ Status Compaction::find_longest_consecutive_version(std::vector missing_version->push_back(prev_rowset->version()); missing_version->push_back(rowset->version()); } - break; + start = i; + length = 1; + } else { + length++; } + + if (length > max_length) { + max_start = start; + max_length = length; + } + prev_rowset = rowset; } - - rowsets->resize(i); + *rowsets = {rowsets->begin() + max_start, rowsets->begin() + max_start + max_length}; return Status::OK(); } diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 04504432f195fa..d7fc4da44739e9 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -111,10 +111,11 @@ Status CumulativeCompaction::pick_rowsets_to_compact() { std::vector missing_versions; RETURN_IF_ERROR(find_longest_consecutive_version(&candidate_rowsets, &missing_versions)); if (!missing_versions.empty()) { - DCHECK(missing_versions.size() == 2); + DCHECK(missing_versions.size() % 2 == 0); LOG(WARNING) << "There are missed versions among rowsets. " - << "prev rowset verison=" << missing_versions[0] - << ", next rowset version=" << missing_versions[1] + << "total missed version size: " << missing_versions.size() / 2 + << " first missed version prev rowset verison=" << missing_versions[0] + << ", first missed version next rowset version=" << missing_versions[1] << ", tablet=" << _tablet->tablet_id(); } diff --git a/be/test/olap/cumulative_compaction_test.cpp b/be/test/olap/cumulative_compaction_test.cpp new file mode 100644 index 00000000000000..a0d3d99093d3c1 --- /dev/null +++ b/be/test/olap/cumulative_compaction_test.cpp @@ -0,0 +1,274 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/cumulative_compaction.h" + +#include +#include +#include +#include +#include + +#include +#include + +#include "common/status.h" +#include "gtest/gtest_pred_impl.h" +#include "io/fs/local_file_system.h" +#include "olap/cumulative_compaction_policy.h" +#include "olap/data_dir.h" +#include "olap/rowset/rowset_factory.h" +#include "olap/storage_engine.h" +#include "olap/tablet_manager.h" +#include "util/threadpool.h" + +namespace doris { +using namespace config; + +class CumulativeCompactionTest : public testing::Test { +public: + virtual void SetUp() {} + + virtual void TearDown() {} +}; + +static RowsetSharedPtr create_rowset(Version version, int num_segments, bool overlapping, + int data_size) { + auto rs_meta = std::make_shared(); + rs_meta->set_rowset_type(BETA_ROWSET); // important + rs_meta->_rowset_meta_pb.set_start_version(version.first); + rs_meta->_rowset_meta_pb.set_end_version(version.second); + rs_meta->set_num_segments(num_segments); + rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING); + rs_meta->set_total_disk_size(data_size); + RowsetSharedPtr rowset; + Status st = RowsetFactory::create_rowset(nullptr, "", std::move(rs_meta), &rowset); + if (!st.ok()) { + return nullptr; + } + return rowset; +} + +TEST_F(CumulativeCompactionTest, TestConsecutiveVersion) { + EngineOptions options; + StorageEngine storage_engine(options); + //TabletSharedPtr tablet; + + TabletMetaSharedPtr tablet_meta; + tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, + UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, + TCompressionType::LZ4F)); + TabletSharedPtr tablet( + new Tablet(storage_engine, tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY)); + + CumulativeCompaction cumu_compaction(tablet); + + { + std::vector rowsets; + for (int i = 2; i < 10; ++i) { + RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024); + rowsets.push_back(rs); + } + std::vector missing_version; + Status st = cumu_compaction.find_longest_consecutive_version(&rowsets, &missing_version); + EXPECT_TRUE(st.OK()); + EXPECT_EQ(rowsets.size(), 8); + EXPECT_EQ(rowsets.front()->start_version(), 2); + EXPECT_EQ(rowsets.front()->end_version(), 2); + + EXPECT_EQ(rowsets.back()->start_version(), 9); + EXPECT_EQ(rowsets.back()->end_version(), 9); + + EXPECT_EQ(missing_version.size(), 0); + } + + { + std::vector rowsets; + for (int i = 2; i <= 4; ++i) { + RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024); + rowsets.push_back(rs); + } + + for (int i = 6; i <= 10; ++i) { + RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024); + rowsets.push_back(rs); + } + + for (int i = 12; i <= 13; ++i) { + RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024); + rowsets.push_back(rs); + } + + std::vector missing_version; + Status st = cumu_compaction.find_longest_consecutive_version(&rowsets, &missing_version); + EXPECT_TRUE(st.OK()); + + EXPECT_EQ(rowsets.size(), 5); + EXPECT_EQ(rowsets.front()->start_version(), 6); + EXPECT_EQ(rowsets.front()->end_version(), 6); + EXPECT_EQ(rowsets.back()->start_version(), 10); + EXPECT_EQ(rowsets.back()->end_version(), 10); + + EXPECT_EQ(missing_version.size(), 4); + EXPECT_EQ(missing_version[0].first, 4); + EXPECT_EQ(missing_version[0].second, 4); + EXPECT_EQ(missing_version[1].first, 6); + EXPECT_EQ(missing_version[1].second, 6); + EXPECT_EQ(missing_version[2].first, 10); + EXPECT_EQ(missing_version[2].second, 10); + EXPECT_EQ(missing_version[3].first, 12); + EXPECT_EQ(missing_version[3].second, 12); + } + + { + std::vector rowsets; + for (int i = 2; i <= 2; ++i) { + RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024); + rowsets.push_back(rs); + } + + for (int i = 4; i <= 4; ++i) { + RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024); + rowsets.push_back(rs); + } + + std::vector missing_version; + Status st = cumu_compaction.find_longest_consecutive_version(&rowsets, &missing_version); + EXPECT_TRUE(st.OK()); + + EXPECT_EQ(rowsets.size(), 1); + EXPECT_EQ(rowsets.front()->start_version(), 2); + EXPECT_EQ(rowsets.front()->end_version(), 2); + EXPECT_EQ(rowsets.back()->start_version(), 2); + EXPECT_EQ(rowsets.back()->end_version(), 2); + + EXPECT_EQ(missing_version.size(), 2); + EXPECT_EQ(missing_version[0].first, 2); + EXPECT_EQ(missing_version[0].second, 2); + EXPECT_EQ(missing_version[1].first, 4); + EXPECT_EQ(missing_version[1].second, 4); + } + + { + std::vector rowsets; + RowsetSharedPtr rs = create_rowset({2, 3}, 1, false, 1024); + rowsets.push_back(rs); + rs = create_rowset({4, 5}, 1, false, 1024); + rowsets.push_back(rs); + + rs = create_rowset({9, 11}, 1, false, 1024); + rowsets.push_back(rs); + rs = create_rowset({12, 13}, 1, false, 1024); + rowsets.push_back(rs); + + std::vector missing_version; + Status st = cumu_compaction.find_longest_consecutive_version(&rowsets, &missing_version); + EXPECT_TRUE(st.OK()); + + EXPECT_EQ(rowsets.size(), 2); + EXPECT_EQ(rowsets.front()->start_version(), 2); + EXPECT_EQ(rowsets.front()->end_version(), 3); + EXPECT_EQ(rowsets.back()->start_version(), 4); + EXPECT_EQ(rowsets.back()->end_version(), 5); + + EXPECT_EQ(missing_version.size(), 2); + EXPECT_EQ(missing_version[0].first, 4); + EXPECT_EQ(missing_version[0].second, 5); + EXPECT_EQ(missing_version[1].first, 9); + EXPECT_EQ(missing_version[1].second, 11); + } + + { + std::vector rowsets; + for (int i = 2; i <= 2; ++i) { + RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024); + rowsets.push_back(rs); + } + + std::vector missing_version; + Status st = cumu_compaction.find_longest_consecutive_version(&rowsets, &missing_version); + EXPECT_TRUE(st.OK()); + + EXPECT_EQ(rowsets.size(), 1); + EXPECT_EQ(rowsets.front()->start_version(), 2); + EXPECT_EQ(rowsets.front()->end_version(), 2); + + EXPECT_EQ(rowsets.back()->start_version(), 2); + EXPECT_EQ(rowsets.back()->end_version(), 2); + EXPECT_EQ(missing_version.size(), 0); + } + + { + std::vector rowsets; + for (int i = 2; i <= 2; ++i) { + RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024); + rowsets.push_back(rs); + } + + std::vector missing_version; + Status st = cumu_compaction.find_longest_consecutive_version(&rowsets, &missing_version); + EXPECT_TRUE(st.OK()); + + EXPECT_EQ(rowsets.size(), 1); + EXPECT_EQ(rowsets.front()->start_version(), 2); + EXPECT_EQ(rowsets.front()->end_version(), 2); + + EXPECT_EQ(rowsets.back()->start_version(), 2); + EXPECT_EQ(rowsets.back()->end_version(), 2); + EXPECT_EQ(missing_version.size(), 0); + } + + { + std::vector rowsets; + for (int i = 2; i <= 4; ++i) { + RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024); + rowsets.push_back(rs); + } + + for (int i = 6; i <= 10; ++i) { + RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024); + rowsets.push_back(rs); + } + + for (int i = 12; i <= 20; ++i) { + RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024); + rowsets.push_back(rs); + } + + std::vector missing_version; + Status st = cumu_compaction.find_longest_consecutive_version(&rowsets, &missing_version); + EXPECT_TRUE(st.OK()); + + EXPECT_EQ(rowsets.size(), 9); + EXPECT_EQ(rowsets.front()->start_version(), 12); + EXPECT_EQ(rowsets.front()->end_version(), 12); + EXPECT_EQ(rowsets.back()->start_version(), 20); + EXPECT_EQ(rowsets.back()->end_version(), 20); + + EXPECT_EQ(missing_version.size(), 4); + EXPECT_EQ(missing_version[0].first, 4); + EXPECT_EQ(missing_version[0].second, 4); + EXPECT_EQ(missing_version[1].first, 6); + EXPECT_EQ(missing_version[1].second, 6); + EXPECT_EQ(missing_version[2].first, 10); + EXPECT_EQ(missing_version[2].second, 10); + EXPECT_EQ(missing_version[3].first, 12); + EXPECT_EQ(missing_version[3].second, 12); + } +} + +} // namespace doris