diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 507bbc663725b0..64a31052ee0841 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -34,6 +34,11 @@ BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) _data_dir(data_dir) { _gen_tablet_path(); + std::stringstream ss; + ss << _tablet_meta->tablet_id() << "." << _tablet_meta->schema_hash() << "." + << _tablet_meta->tablet_uid().to_string(); + _full_name = ss.str(); + _metric_entity = DorisMetrics::instance()->metric_registry()->register_entity( strings::Substitute("Tablet.$0", tablet_id()), {{"tablet_id", std::to_string(tablet_id())}}, diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 6097d23957d6be..bce485dc8a3aec 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -77,6 +77,8 @@ class BaseTablet : public std::enable_shared_from_this { // metrics of this tablet std::shared_ptr _metric_entity = nullptr; + + std::string _full_name; public: IntCounter* query_scan_bytes; IntCounter* query_scan_rows; @@ -110,10 +112,7 @@ inline int64_t BaseTablet::table_id() const { } inline const std::string BaseTablet::full_name() const { - std::stringstream ss; - ss << _tablet_meta->tablet_id() << "." << _tablet_meta->schema_hash() << "." - << _tablet_meta->tablet_uid().to_string(); - return ss.str(); + return _full_name; } inline int64_t BaseTablet::partition_id() const { diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 67b56f6e763172..3e68a909f284fa 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -181,6 +181,33 @@ OLAPStatus Compaction::gc_unused_rowsets() { return OLAP_SUCCESS; } +// Find the longest consecutive version path in "rowset", from begining. +// Two versions before and after the missing version will be saved in missing_version, +// if missing_version is not null. +OLAPStatus Compaction::find_longest_consecutive_version( + vector* rowsets, + vector* missing_version) { + if (rowsets->empty()) { + return OLAP_SUCCESS; + } + RowsetSharedPtr prev_rowset = rowsets->front(); + size_t i = 1; + for (; i < rowsets->size(); ++i) { + RowsetSharedPtr rowset = (*rowsets)[i]; + if (rowset->start_version() != prev_rowset->end_version() + 1) { + if (missing_version != nullptr) { + missing_version->push_back(prev_rowset->version()); + missing_version->push_back(rowset->version()); + } + break; + } + prev_rowset = rowset; + } + + rowsets->resize(i); + return OLAP_SUCCESS; +} + OLAPStatus Compaction::check_version_continuity(const vector& rowsets) { RowsetSharedPtr prev_rowset = rowsets.front(); for (size_t i = 1; i < rowsets.size(); ++i) { diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 6c4b438aaeb260..dbcd61eed25da4 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -67,6 +67,7 @@ class Compaction { OLAPStatus check_version_continuity(const std::vector& rowsets); OLAPStatus check_correctness(const Merger::Statistics& stats); + OLAPStatus find_longest_consecutive_version(std::vector* rowsets, std::vector* missing_version); // semaphore used to limit the concurrency of running compaction tasks static Semaphore _concurrency_sem; diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 6611e5102be336..0ed92db384f767 100755 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -83,7 +83,18 @@ OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() { return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS; } - RETURN_NOT_OK(check_version_continuity(candidate_rowsets)); + // candidate_rowsets may not be continuous. Because some rowset may not be selected + // because the protection time has not expired(config::cumulative_compaction_skip_window_seconds). + // So we need to choose the longest continuous path from it. + std::vector missing_versions; + RETURN_NOT_OK(find_longest_consecutive_version(&candidate_rowsets, &missing_versions)); + if (!missing_versions.empty()) { + DCHECK(missing_versions.size() == 2); + LOG(WARNING) << "There are missed versions among rowsets. " + << "prev rowset verison=" << missing_versions[0] + << ", next rowset version=" << missing_versions[1] + << ", tablet=" << _tablet->full_name(); + } size_t compaction_score = 0; int transient_size = _tablet->cumulative_compaction_policy()->pick_input_rowsets( @@ -149,4 +160,3 @@ OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() { } } // namespace doris - diff --git a/be/src/olap/cumulative_compaction_policy.cpp b/be/src/olap/cumulative_compaction_policy.cpp index 2e28c58af57dec..99a584420d1672 100644 --- a/be/src/olap/cumulative_compaction_policy.cpp +++ b/be/src/olap/cumulative_compaction_policy.cpp @@ -436,7 +436,6 @@ void CumulativeCompactionPolicy::pick_candicate_rowsets(int64_t skip_window_sec, } } std::sort(candidate_rowsets->begin(), candidate_rowsets->end(), Rowset::comparator); - } std::unique_ptr CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(std::string type) { @@ -467,4 +466,4 @@ void CumulativeCompactionPolicyFactory::_parse_cumulative_compaction_policy(std: *policy_type = NUM_BASED_POLICY; } } -} \ No newline at end of file +} diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp b/be/test/olap/cumulative_compaction_policy_test.cpp index 3fab6f70c0af22..e1d436e22adf6d 100644 --- a/be/test/olap/cumulative_compaction_policy_test.cpp +++ b/be/test/olap/cumulative_compaction_policy_test.cpp @@ -20,6 +20,7 @@ #include "olap/tablet_meta.h" #include "olap/rowset/rowset_meta.h" +#include "olap/cumulative_compaction.h" #include "olap/cumulative_compaction_policy.h" namespace doris { @@ -638,6 +639,24 @@ class TestSizeBasedCumulativeCompactionPolicy : public testing::Test { rs_metas->push_back(ptr5); } + void init_rs_meta_missing_version(std::vector* rs_metas) { + RowsetMetaSharedPtr ptr1(new RowsetMeta()); + init_rs_meta(ptr1, 0, 0); + rs_metas->push_back(ptr1); + + RowsetMetaSharedPtr ptr2(new RowsetMeta()); + init_rs_meta(ptr2, 1, 1); + rs_metas->push_back(ptr2); + + RowsetMetaSharedPtr ptr3(new RowsetMeta()); + init_rs_meta(ptr3, 2, 2); + rs_metas->push_back(ptr3); + + RowsetMetaSharedPtr ptr5(new RowsetMeta()); + init_rs_meta(ptr5, 4, 4); + rs_metas->push_back(ptr5); + } + protected: std::string _json_rowset_meta; TabletMetaSharedPtr _tablet_meta; @@ -1013,10 +1032,46 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, _level_size) { ASSERT_EQ(134217728, policy->_levels[2]); ASSERT_EQ(67108864, policy->_levels[3]); } + +TEST_F(TestSizeBasedCumulativeCompactionPolicy, _pick_missing_version_cumulative_compaction) { + std::vector rs_metas; + init_rs_meta_missing_version(&rs_metas); + + for (auto &rowset : rs_metas) { + _tablet_meta->add_rs_meta(rowset); + } + + TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY)); + _tablet->init(); + + // has miss version + std::vector rowsets; + rowsets.push_back(_tablet->get_rowset_by_version({0, 0})); + rowsets.push_back(_tablet->get_rowset_by_version({1, 1})); + rowsets.push_back(_tablet->get_rowset_by_version({2, 2})); + rowsets.push_back(_tablet->get_rowset_by_version({4, 4})); + std::shared_ptr mem_tracker(new MemTracker()); + CumulativeCompaction compaction(_tablet, "label", mem_tracker); + compaction.find_longest_consecutive_version(&rowsets); + ASSERT_EQ(3, rowsets.size()); + ASSERT_EQ(2, rowsets[2]->end_version()); + + // no miss version + std::vector rowsets2; + rowsets2.push_back(_tablet->get_rowset_by_version({0, 0})); + compaction.find_longest_consecutive_version(&rowsets2); + ASSERT_EQ(1, rowsets2.size()); + ASSERT_EQ(0, rowsets[0]->end_version()); + + // no version + std::vector rowsets3; + compaction.find_longest_consecutive_version(&rowsets3); + ASSERT_EQ(0, rowsets3.size()); +} } // @brief Test Stub int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); -} \ No newline at end of file +}