From 1f02d1b4c840074389e0766f7af231c662fd2d16 Mon Sep 17 00:00:00 2001 From: morningman Date: Sat, 12 Sep 2020 22:17:01 +0800 Subject: [PATCH 1/4] [Bug] Fix bug of cumulative compaction and deletion of stale version When selecting candicate rowsets to do the cumulative compaction, some rowsets may not be selected because the protection time has not expired. Therefore, we need to find the current longest continuous version path in the candicate rowsets. --- be/src/olap/compaction.cpp | 21 ++++++++++++++++++++ be/src/olap/compaction.h | 1 + be/src/olap/cumulative_compaction.cpp | 5 ++++- be/src/olap/cumulative_compaction_policy.cpp | 3 +-- 4 files changed, 27 insertions(+), 3 deletions(-) diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 67b56f6e763172..1cd39cb4c6d064 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -181,6 +181,27 @@ OLAPStatus Compaction::gc_unused_rowsets() { return OLAP_SUCCESS; } +// find the longest consecutive version path in "rowset", from begining. +OLAPStatus Compaction::find_longest_consecutive_version(vector* rowsets) { + RowsetSharedPtr prev_rowset = rowsets->front(); + size_t i = 1; + for (; i < rowsets->size(); ++i) { + RowsetSharedPtr rowset = (*rowsets)[i]; + if (rowset->start_version() != prev_rowset->end_version() + 1) { + LOG(WARNING) << "There are missed versions among rowsets. " + << "prev_rowset verison=" << prev_rowset->start_version() + << "-" << prev_rowset->end_version() + << ", rowset version=" << rowset->start_version() + << "-" << rowset->end_version(); + break; + } + prev_rowset = rowset; + } + + rowsets->resize(i); + return OLAP_SUCCESS; +} + OLAPStatus Compaction::check_version_continuity(const vector& rowsets) { RowsetSharedPtr prev_rowset = rowsets.front(); for (size_t i = 1; i < rowsets.size(); ++i) { diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 6c4b438aaeb260..90468eb06a715a 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -67,6 +67,7 @@ class Compaction { OLAPStatus check_version_continuity(const std::vector& rowsets); OLAPStatus check_correctness(const Merger::Statistics& stats); + OLAPStatus find_longest_consecutive_version(std::vector* rowsets); // semaphore used to limit the concurrency of running compaction tasks static Semaphore _concurrency_sem; diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 6611e5102be336..044435d5525f1b 100755 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -83,7 +83,10 @@ OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() { return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS; } - RETURN_NOT_OK(check_version_continuity(candidate_rowsets)); + // candidate_rowsets may not be continuous. Because some rowset may not be selected + // because the protection time has not expired(config::cumulative_compaction_skip_window_seconds). + // So we need to choose the longest continuous path from it. + RETURN_NOT_OK(find_longest_consecutive_version(&candidate_rowsets)); size_t compaction_score = 0; int transient_size = _tablet->cumulative_compaction_policy()->pick_input_rowsets( diff --git a/be/src/olap/cumulative_compaction_policy.cpp b/be/src/olap/cumulative_compaction_policy.cpp index 2e28c58af57dec..99a584420d1672 100644 --- a/be/src/olap/cumulative_compaction_policy.cpp +++ b/be/src/olap/cumulative_compaction_policy.cpp @@ -436,7 +436,6 @@ void CumulativeCompactionPolicy::pick_candicate_rowsets(int64_t skip_window_sec, } } std::sort(candidate_rowsets->begin(), candidate_rowsets->end(), Rowset::comparator); - } std::unique_ptr CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(std::string type) { @@ -467,4 +466,4 @@ void CumulativeCompactionPolicyFactory::_parse_cumulative_compaction_policy(std: *policy_type = NUM_BASED_POLICY; } } -} \ No newline at end of file +} From 9b66841b330a03a7053e8e90f08d96487773bb8a Mon Sep 17 00:00:00 2001 From: morningman Date: Sat, 12 Sep 2020 23:00:40 +0800 Subject: [PATCH 2/4] add ut --- be/src/olap/compaction.cpp | 3 + .../cumulative_compaction_policy_test.cpp | 57 ++++++++++++++++++- 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 1cd39cb4c6d064..61accf374e6c5a 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -183,6 +183,9 @@ OLAPStatus Compaction::gc_unused_rowsets() { // find the longest consecutive version path in "rowset", from begining. OLAPStatus Compaction::find_longest_consecutive_version(vector* rowsets) { + if (rowsets->empty()) { + return OLAP_SUCCESS; + } RowsetSharedPtr prev_rowset = rowsets->front(); size_t i = 1; for (; i < rowsets->size(); ++i) { diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp b/be/test/olap/cumulative_compaction_policy_test.cpp index 3fab6f70c0af22..e1d436e22adf6d 100644 --- a/be/test/olap/cumulative_compaction_policy_test.cpp +++ b/be/test/olap/cumulative_compaction_policy_test.cpp @@ -20,6 +20,7 @@ #include "olap/tablet_meta.h" #include "olap/rowset/rowset_meta.h" +#include "olap/cumulative_compaction.h" #include "olap/cumulative_compaction_policy.h" namespace doris { @@ -638,6 +639,24 @@ class TestSizeBasedCumulativeCompactionPolicy : public testing::Test { rs_metas->push_back(ptr5); } + void init_rs_meta_missing_version(std::vector* rs_metas) { + RowsetMetaSharedPtr ptr1(new RowsetMeta()); + init_rs_meta(ptr1, 0, 0); + rs_metas->push_back(ptr1); + + RowsetMetaSharedPtr ptr2(new RowsetMeta()); + init_rs_meta(ptr2, 1, 1); + rs_metas->push_back(ptr2); + + RowsetMetaSharedPtr ptr3(new RowsetMeta()); + init_rs_meta(ptr3, 2, 2); + rs_metas->push_back(ptr3); + + RowsetMetaSharedPtr ptr5(new RowsetMeta()); + init_rs_meta(ptr5, 4, 4); + rs_metas->push_back(ptr5); + } + protected: std::string _json_rowset_meta; TabletMetaSharedPtr _tablet_meta; @@ -1013,10 +1032,46 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, _level_size) { ASSERT_EQ(134217728, policy->_levels[2]); ASSERT_EQ(67108864, policy->_levels[3]); } + +TEST_F(TestSizeBasedCumulativeCompactionPolicy, _pick_missing_version_cumulative_compaction) { + std::vector rs_metas; + init_rs_meta_missing_version(&rs_metas); + + for (auto &rowset : rs_metas) { + _tablet_meta->add_rs_meta(rowset); + } + + TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY)); + _tablet->init(); + + // has miss version + std::vector rowsets; + rowsets.push_back(_tablet->get_rowset_by_version({0, 0})); + rowsets.push_back(_tablet->get_rowset_by_version({1, 1})); + rowsets.push_back(_tablet->get_rowset_by_version({2, 2})); + rowsets.push_back(_tablet->get_rowset_by_version({4, 4})); + std::shared_ptr mem_tracker(new MemTracker()); + CumulativeCompaction compaction(_tablet, "label", mem_tracker); + compaction.find_longest_consecutive_version(&rowsets); + ASSERT_EQ(3, rowsets.size()); + ASSERT_EQ(2, rowsets[2]->end_version()); + + // no miss version + std::vector rowsets2; + rowsets2.push_back(_tablet->get_rowset_by_version({0, 0})); + compaction.find_longest_consecutive_version(&rowsets2); + ASSERT_EQ(1, rowsets2.size()); + ASSERT_EQ(0, rowsets[0]->end_version()); + + // no version + std::vector rowsets3; + compaction.find_longest_consecutive_version(&rowsets3); + ASSERT_EQ(0, rowsets3.size()); +} } // @brief Test Stub int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); -} \ No newline at end of file +} From 5470d315a0b73f92b14eaa4942cd931f604e8b8a Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 16 Oct 2020 22:27:44 +0800 Subject: [PATCH 3/4] modify log --- be/src/olap/base_tablet.cpp | 5 +++++ be/src/olap/base_tablet.h | 7 +++---- be/src/olap/compaction.cpp | 17 ++++++++++------- be/src/olap/compaction.h | 2 +- be/src/olap/cumulative_compaction.cpp | 10 +++++++++- 5 files changed, 28 insertions(+), 13 deletions(-) diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 507bbc663725b0..64a31052ee0841 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -34,6 +34,11 @@ BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) _data_dir(data_dir) { _gen_tablet_path(); + std::stringstream ss; + ss << _tablet_meta->tablet_id() << "." << _tablet_meta->schema_hash() << "." + << _tablet_meta->tablet_uid().to_string(); + _full_name = ss.str(); + _metric_entity = DorisMetrics::instance()->metric_registry()->register_entity( strings::Substitute("Tablet.$0", tablet_id()), {{"tablet_id", std::to_string(tablet_id())}}, diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 6097d23957d6be..bce485dc8a3aec 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -77,6 +77,8 @@ class BaseTablet : public std::enable_shared_from_this { // metrics of this tablet std::shared_ptr _metric_entity = nullptr; + + std::string _full_name; public: IntCounter* query_scan_bytes; IntCounter* query_scan_rows; @@ -110,10 +112,7 @@ inline int64_t BaseTablet::table_id() const { } inline const std::string BaseTablet::full_name() const { - std::stringstream ss; - ss << _tablet_meta->tablet_id() << "." << _tablet_meta->schema_hash() << "." - << _tablet_meta->tablet_uid().to_string(); - return ss.str(); + return _full_name; } inline int64_t BaseTablet::partition_id() const { diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 61accf374e6c5a..3e68a909f284fa 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -181,8 +181,12 @@ OLAPStatus Compaction::gc_unused_rowsets() { return OLAP_SUCCESS; } -// find the longest consecutive version path in "rowset", from begining. -OLAPStatus Compaction::find_longest_consecutive_version(vector* rowsets) { +// Find the longest consecutive version path in "rowset", from begining. +// Two versions before and after the missing version will be saved in missing_version, +// if missing_version is not null. +OLAPStatus Compaction::find_longest_consecutive_version( + vector* rowsets, + vector* missing_version) { if (rowsets->empty()) { return OLAP_SUCCESS; } @@ -191,11 +195,10 @@ OLAPStatus Compaction::find_longest_consecutive_version(vector* for (; i < rowsets->size(); ++i) { RowsetSharedPtr rowset = (*rowsets)[i]; if (rowset->start_version() != prev_rowset->end_version() + 1) { - LOG(WARNING) << "There are missed versions among rowsets. " - << "prev_rowset verison=" << prev_rowset->start_version() - << "-" << prev_rowset->end_version() - << ", rowset version=" << rowset->start_version() - << "-" << rowset->end_version(); + if (missing_version != nullptr) { + missing_version->push_back(prev_rowset->version()); + missing_version->push_back(rowset->version()); + } break; } prev_rowset = rowset; diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 90468eb06a715a..dbcd61eed25da4 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -67,7 +67,7 @@ class Compaction { OLAPStatus check_version_continuity(const std::vector& rowsets); OLAPStatus check_correctness(const Merger::Statistics& stats); - OLAPStatus find_longest_consecutive_version(std::vector* rowsets); + OLAPStatus find_longest_consecutive_version(std::vector* rowsets, std::vector* missing_version); // semaphore used to limit the concurrency of running compaction tasks static Semaphore _concurrency_sem; diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 044435d5525f1b..d0e2efbdcb7d02 100755 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -86,7 +86,15 @@ OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() { // candidate_rowsets may not be continuous. Because some rowset may not be selected // because the protection time has not expired(config::cumulative_compaction_skip_window_seconds). // So we need to choose the longest continuous path from it. - RETURN_NOT_OK(find_longest_consecutive_version(&candidate_rowsets)); + std::vector missing_versions; + RETURN_NOT_OK(find_longest_consecutive_version(&candidate_rowsets, &missing_versions)); + if (missing_versions.empty()) { + DCHECK(missing_versions.size() == 2); + LOG(WARNING) << "There are missed versions among rowsets. " + << "prev rowset verison=" << missing_versions[0] + << ", next rowset version=" << missing_versions[1] + << ", tablet=" << _tablet->full_name(); + } size_t compaction_score = 0; int transient_size = _tablet->cumulative_compaction_policy()->pick_input_rowsets( From 019f047709f8cf8f584ef2bea2df08179d16bbc1 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Tue, 20 Oct 2020 09:42:51 +0800 Subject: [PATCH 4/4] Update be/src/olap/cumulative_compaction.cpp Co-authored-by: Zhao Chun --- be/src/olap/cumulative_compaction.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index d0e2efbdcb7d02..0ed92db384f767 100755 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -88,7 +88,7 @@ OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() { // So we need to choose the longest continuous path from it. std::vector missing_versions; RETURN_NOT_OK(find_longest_consecutive_version(&candidate_rowsets, &missing_versions)); - if (missing_versions.empty()) { + if (!missing_versions.empty()) { DCHECK(missing_versions.size() == 2); LOG(WARNING) << "There are missed versions among rowsets. " << "prev rowset verison=" << missing_versions[0] @@ -160,4 +160,3 @@ OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() { } } // namespace doris -