From cd432b85138cb231fdadad88dfda0800349765aa Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 4 Sep 2025 21:37:39 +0800 Subject: [PATCH] [Fix](cloud) `calc_sync_versions` should consider full compaction (#55630) Currently, `MetaServiceImpl::get_rowset` use `calc_sync_versions` to eliminate unnecessary version ranges when BE sync rowset metas. One of the optimizations is as the following: ```cpp std::vector> calc_sync_versions(int64_t req_bc_cnt, int64_t bc_cnt, int64_t req_cc_cnt, int64_t cc_cnt, int64_t req_cp, int64_t cp, int64_t req_start, int64_t req_end) { // ... if (req_cc_cnt < cc_cnt) { Version cc_version; if (req_cp < cp && req_cc_cnt + 1 == cc_cnt) { // * only one CC happened and CP changed // BE [=][=][=][=][=====][=][=] // ^~~~~ req_cp // MS [=][=][=][=][xxxxxxxxxxxxxx][=======][=][=] // ^~~~~~~ ms_cp // ^____________^ related_versions: [req_cp, ms_cp - 1] // cc_version = {req_cp, cp - 1}; } else { // ... } ``` This optimization replies on the assumption that only cumulative compaction will change the cumulative point. However, full compaction can also change the cumulative point, which breaks the above replied assumption. This will cause data correctness problem in multi-cluster environment because it will make the tablet failed to sync some rowset metas forever. A data correctness problem has been observed in the following situaitions: 1. For a certain tablet, base_compaction_cnt=14, cumulative_compaction_cnt=804, cumu_point=7458. On node A of the write cluster (cluster 0), a full compaction of [2-7464] and a cumulative compaction of [7465-7486] were performed. The stats then became base_compaction_cnt=15, cumulative_compaction_cnt=805, cumu_point=7465. 2. On node B of the read cluster (cluster 1), during sync_rowset, we have: req_base_compaction_cnt=14, base_compaction_cnt=15, req_cumulative_compaction_cnt=804, cumulative_compaction_cnt=805, req_cp=7458, cp=7465, req_start=7487, req_end=int_max. 3. calc_sync_version computes that the rowsets to be pulled are [0-7464] and [7487-int_max], but it misses the rowset [7465-7486] produced by cumulative compaction. 4. Moreover, since the max_version of the tablet on cluster 1 node B has been updated, subsequent sync_rowset operations will also not pull the rowset [7465-7486]. 5. This causes duplicate keys problem on MOW table because new rowset will generate delete bitmap marks on [7465-7486]. --- This PR forbids the above optimization when full compaction cnt is changed. None - Test - [x] Regression test - [x] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason - Behavior changed: - [ ] No. - [ ] Yes. - Does this need documentation? - [ ] No. - [ ] Yes. - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label --- .../cloud_cumulative_compaction_policy.cpp | 11 ++ be/src/cloud/cloud_full_compaction.cpp | 11 ++ be/src/cloud/cloud_meta_mgr.cpp | 2 + be/src/cloud/cloud_tablet.h | 3 + cloud/src/meta-service/meta_service.cpp | 22 ++- cloud/src/meta-service/meta_service_job.cpp | 1 + cloud/test/meta_service_test.cpp | 55 ++++-- gensrc/proto/cloud.proto | 3 +- .../cloud/test_cloud_calc_sync_version.out | 55 ++++++ .../cloud/test_cloud_calc_sync_version.groovy | 179 ++++++++++++++++++ 10 files changed, 324 insertions(+), 18 deletions(-) create mode 100644 regression-test/data/fault_injection_p0/cloud/test_cloud_calc_sync_version.out create mode 100644 regression-test/suites/fault_injection_p0/cloud/test_cloud_calc_sync_version.groovy diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.cpp b/be/src/cloud/cloud_cumulative_compaction_policy.cpp index 24bd61db8fafe6..f4178320044ea6 100644 --- a/be/src/cloud/cloud_cumulative_compaction_policy.cpp +++ b/be/src/cloud/cloud_cumulative_compaction_policy.cpp @@ -219,6 +219,17 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point( int64_t last_cumulative_point) { TEST_INJECTION_POINT_RETURN_WITH_VALUE("new_cumulative_point", int64_t(0), output_rowset.get(), last_cumulative_point); + DBUG_EXECUTE_IF("CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point", { + auto target_tablet_id = dp->param("tablet_id", -1); + auto cumu_point = dp->param("cumu_point", -1); + if (target_tablet_id == tablet->tablet_id() && cumu_point != -1) { + LOG_INFO( + "[CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point] " + "tablet_id={}, cumu_point={}", + target_tablet_id, cumu_point); + return cumu_point; + } + }); // for MoW table, if there's too many versions, the delete bitmap will grow to // a very big size, which may cause the tablet meta too big and the `save_meta` // operation too slow. diff --git a/be/src/cloud/cloud_full_compaction.cpp b/be/src/cloud/cloud_full_compaction.cpp index 7358f6d19156a1..08e43ab2142ae9 100644 --- a/be/src/cloud/cloud_full_compaction.cpp +++ b/be/src/cloud/cloud_full_compaction.cpp @@ -149,6 +149,16 @@ Status CloudFullCompaction::pick_rowsets_to_compact() { } Status CloudFullCompaction::execute_compact() { + DBUG_EXECUTE_IF("CloudFullCompaction::execute_compact.block", { + auto target_tablet_id = dp->param("tablet_id", -1); + LOG_INFO( + "[verbose] CloudFullCompaction::execute_compact.block, target_tablet_id={}, " + "tablet_id={}", + target_tablet_id, cloud_tablet()->tablet_id()); + if (target_tablet_id == cloud_tablet()->tablet_id()) { + DBUG_BLOCK; + } + }); TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudFullCompaction::execute_compact_impl", Status::OK(), this); #ifndef __APPLE__ @@ -272,6 +282,7 @@ Status CloudFullCompaction::modify_rowsets() { cloud_tablet()->delete_rowsets(_input_rowsets, wrlock); cloud_tablet()->add_rowsets({_output_rowset}, false, wrlock); cloud_tablet()->set_base_compaction_cnt(stats.base_compaction_cnt()); + cloud_tablet()->set_full_compaction_cnt(stats.full_compaction_cnt()); cloud_tablet()->set_cumulative_layer_point(stats.cumulative_point()); if (output_rowset_delete_bitmap) { _tablet->tablet_meta()->delete_bitmap()->merge(*output_rowset_delete_bitmap); diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index 904e8e2e099405..a7a560f67193c6 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -521,6 +521,7 @@ Status CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet, } req.set_base_compaction_cnt(tablet->base_compaction_cnt()); req.set_cumulative_compaction_cnt(tablet->cumulative_compaction_cnt()); + req.set_full_compaction_cnt(tablet->full_compaction_cnt()); req.set_cumulative_point(tablet->cumulative_layer_point()); } req.set_end_version(-1); @@ -772,6 +773,7 @@ Status CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet, tablet->last_cumu_compaction_success_time_ms = stats.last_cumu_compaction_time_ms(); tablet->set_base_compaction_cnt(stats.base_compaction_cnt()); tablet->set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt()); + tablet->set_full_compaction_cnt(stats.full_compaction_cnt()); tablet->set_cumulative_layer_point(stats.cumulative_point()); tablet->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(), stats.num_rows(), stats.data_size()); diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index d1ea7dec379684..1f3c202d06a6ef 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -123,12 +123,14 @@ class CloudTablet final : public BaseTablet { int64_t max_version_unlocked() const override { return _max_version; } int64_t base_compaction_cnt() const { return _base_compaction_cnt; } int64_t cumulative_compaction_cnt() const { return _cumulative_compaction_cnt; } + int64_t full_compaction_cnt() const { return _full_compaction_cnt; } int64_t cumulative_layer_point() const { return _cumulative_point.load(std::memory_order_relaxed); } void set_base_compaction_cnt(int64_t cnt) { _base_compaction_cnt = cnt; } void set_cumulative_compaction_cnt(int64_t cnt) { _cumulative_compaction_cnt = cnt; } + void set_full_compaction_cnt(int64_t cnt) { _full_compaction_cnt = cnt; } void set_cumulative_layer_point(int64_t new_point); int64_t last_cumu_compaction_failure_time() { return _last_cumu_compaction_failure_millis; } @@ -322,6 +324,7 @@ class CloudTablet final : public BaseTablet { int64_t _base_compaction_cnt = 0; int64_t _cumulative_compaction_cnt = 0; + int64_t _full_compaction_cnt = 0; int64_t _max_version = -1; int64_t _base_size = 0; int64_t _alter_version = -1; diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 2fa746e5f69f18..68ee07cd2680c5 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -1603,6 +1603,7 @@ void internal_get_rowset(Transaction* txn, int64_t start, int64_t end, std::vector> calc_sync_versions(int64_t req_bc_cnt, int64_t bc_cnt, int64_t req_cc_cnt, int64_t cc_cnt, int64_t req_cp, int64_t cp, + int64_t req_fc_cnt, int64_t fc_cnt, int64_t req_start, int64_t req_end) { using Version = std::pair; // combine `v1` `v2` to `v1`, return true if success @@ -1628,8 +1629,8 @@ std::vector> calc_sync_versions(int64_t req_bc_cnt, if (req_cc_cnt < cc_cnt) { Version cc_version; - if (req_cp < cp && req_cc_cnt + 1 == cc_cnt) { - // * only one CC happened and CP changed + if (req_cp < cp && req_cc_cnt + 1 == cc_cnt && req_fc_cnt == fc_cnt) { + // * only one CC happened and CP changed, and no full compaction happened // BE [=][=][=][=][=====][=][=] // ^~~~~ req_cp // MS [=][=][=][=][xxxxxxxxxxxxxx][=======][=][=] @@ -1653,6 +1654,13 @@ std::vector> calc_sync_versions(int64_t req_bc_cnt, // ^_____________________^ related_versions: [req_cp, max] // there may be holes if we don't return all version // after ms_cp, however it can be optimized. + // * one CC happened and CP changed, and full compaction happened + // BE [=][=][=][=][=][=][=][=][=][=] + // ^~~~~ req_cp + // MS [xxxxxxxxxxxxxx][xxxxxxxxxxxxxx][=======][=][=] + // ^~~~~~~ ms_cp + // ^___________________________^ related_versions: [req_cp, max] + // cc_version = {req_cp, std::numeric_limits::max() - 1}; } if (versions.empty() || !combine_if_overlapping(versions.front(), cc_version)) { @@ -1723,6 +1731,7 @@ void MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller, } int64_t req_bc_cnt = request->base_compaction_cnt(); int64_t req_cc_cnt = request->cumulative_compaction_cnt(); + int64_t req_fc_cnt = request->has_full_compaction_cnt() ? request->full_compaction_cnt() : 0; int64_t req_cp = request->cumulative_point(); do { @@ -1807,6 +1816,8 @@ void MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller, int64_t bc_cnt = tablet_stat.base_compaction_cnt(); int64_t cc_cnt = tablet_stat.cumulative_compaction_cnt(); + int64_t fc_cnt = + tablet_stat.has_full_compaction_cnt() ? tablet_stat.full_compaction_cnt() : 0; int64_t cp = tablet_stat.cumulative_point(); response->mutable_stats()->CopyFrom(tablet_stat); @@ -1818,17 +1829,18 @@ void MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller, //========================================================================== // Find version ranges to be synchronized due to compaction //========================================================================== - if (req_bc_cnt > bc_cnt || req_cc_cnt > cc_cnt || req_cp > cp) { + if (req_bc_cnt > bc_cnt || req_cc_cnt > cc_cnt || req_cp > cp || req_fc_cnt > fc_cnt) { code = MetaServiceCode::INVALID_ARGUMENT; ss << "no valid compaction_cnt or cumulative_point given. req_bc_cnt=" << req_bc_cnt << ", bc_cnt=" << bc_cnt << ", req_cc_cnt=" << req_cc_cnt << ", cc_cnt=" << cc_cnt - << ", req_cp=" << req_cp << ", cp=" << cp << " tablet_id=" << tablet_id; + << " req_fc_cnt=" << req_fc_cnt << ", fc_cnt=" << fc_cnt << ", req_cp=" << req_cp + << ", cp=" << cp << " tablet_id=" << tablet_id; msg = ss.str(); LOG(WARNING) << msg; return; } auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, - req_start, req_end); + req_fc_cnt, fc_cnt, req_start, req_end); for (auto [start, end] : versions) { internal_get_rowset(txn.get(), start, end, instance_id, tablet_id, code, msg, response); if (code != MetaServiceCode::OK) { diff --git a/cloud/src/meta-service/meta_service_job.cpp b/cloud/src/meta-service/meta_service_job.cpp index 7b1f13462037f1..76b6a8d0c432fc 100644 --- a/cloud/src/meta-service/meta_service_job.cpp +++ b/cloud/src/meta-service/meta_service_job.cpp @@ -755,6 +755,7 @@ void process_compaction_job(MetaServiceCode& code, std::string& msg, std::string } else if (compaction.type() == TabletCompactionJobPB::FULL) { // clang-format off stats->set_base_compaction_cnt(stats->base_compaction_cnt() + 1); + stats->set_full_compaction_cnt(stats->has_full_compaction_cnt() ? stats->full_compaction_cnt() + 1 : 1); if (compaction.output_cumulative_point() > stats->cumulative_point()) { // After supporting parallel cumu compaction, compaction with older cumu point may be committed after // new cumu point has been set, MUST NOT set cumu point back to old value diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index 8150dced5bd9d1..6f94ec3b46ae75 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -3781,7 +3781,7 @@ TEST(MetaServiceTest, FilterCopyFilesTest) { extern std::vector> calc_sync_versions( int64_t req_bc_cnt, int64_t bc_cnt, int64_t req_cc_cnt, int64_t cc_cnt, int64_t req_cp, - int64_t cp, int64_t req_start, int64_t req_end); + int64_t cp, int64_t req_fc_cnt, int64_t fc_cnt, int64_t req_start, int64_t req_end); TEST(MetaServiceTest, CalcSyncVersionsTest) { using Versions = std::vector>; @@ -3797,7 +3797,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) { auto [req_cc_cnt, cc_cnt] = std::tuple {1, 1}; auto [req_cp, cp] = std::tuple {5, 5}; auto [req_start, req_end] = std::tuple {8, 12}; - auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0, req_start, req_end); ASSERT_EQ(versions, (Versions {{8, 12}})); } @@ -3813,7 +3813,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) { auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2}; auto [req_cp, cp] = std::tuple {5, 10}; auto [req_start, req_end] = std::tuple {8, 12}; - auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0, req_start, req_end); ASSERT_EQ(versions, (Versions {{5, 12}})); // [5, 9] v [8, 12] } @@ -3822,7 +3822,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) { auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2}; auto [req_cp, cp] = std::tuple {5, 15}; auto [req_start, req_end] = std::tuple {8, 12}; - auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0, req_start, req_end); ASSERT_EQ(versions, (Versions {{5, 14}})); // [5, 14] v [8, 12] } @@ -3839,7 +3839,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) { auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2}; auto [req_cp, cp] = std::tuple {5, 5}; auto [req_start, req_end] = std::tuple {8, 12}; - auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0, req_start, req_end); ASSERT_EQ(versions, (Versions {{5, INT64_MAX - 1}})); // [5, max] v [8, 12] } @@ -3855,7 +3855,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) { auto [req_cc_cnt, cc_cnt] = std::tuple {1, 3}; auto [req_cp, cp] = std::tuple {5, 5}; auto [req_start, req_end] = std::tuple {8, 12}; - auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0, req_start, req_end); ASSERT_EQ(versions, (Versions {{5, INT64_MAX - 1}})); // [5, max] v [8, 12] } @@ -3870,7 +3870,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) { auto [req_cc_cnt, cc_cnt] = std::tuple {1, 3}; auto [req_cp, cp] = std::tuple {5, 15}; auto [req_start, req_end] = std::tuple {8, 12}; - auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0, req_start, req_end); ASSERT_EQ(versions, (Versions {{5, INT64_MAX - 1}})); // [5, max] v [8, 12] } @@ -3886,7 +3886,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) { auto [req_cc_cnt, cc_cnt] = std::tuple {1, 1}; auto [req_cp, cp] = std::tuple {5, 5}; auto [req_start, req_end] = std::tuple {8, 12}; - auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0, req_start, req_end); ASSERT_EQ(versions, (Versions {{0, 4}, {8, 12}})); } @@ -3895,7 +3895,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) { auto [req_cc_cnt, cc_cnt] = std::tuple {1, 1}; auto [req_cp, cp] = std::tuple {8, 8}; auto [req_start, req_end] = std::tuple {8, 12}; - auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0, req_start, req_end); ASSERT_EQ(versions, (Versions {{0, 12}})); // [0, 7] v [8, 12] } @@ -3904,7 +3904,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) { auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2}; auto [req_cp, cp] = std::tuple {5, 10}; auto [req_start, req_end] = std::tuple {8, 12}; - auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0, req_start, req_end); ASSERT_EQ(versions, (Versions {{0, 12}})); // [0, 4] v [5, 9] v [8, 12] } @@ -3913,7 +3913,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) { auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2}; auto [req_cp, cp] = std::tuple {5, 15}; auto [req_start, req_end] = std::tuple {8, 12}; - auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0, req_start, req_end); ASSERT_EQ(versions, (Versions {{0, 14}})); // [0, 4] v [5, 14] v [8, 12] } @@ -3922,11 +3922,42 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) { auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2}; auto [req_cp, cp] = std::tuple {5, 5}; auto [req_start, req_end] = std::tuple {8, 12}; - auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0, req_start, req_end); // [0, 4] v [5, max] v [8, 12] ASSERT_EQ(versions, (Versions {{0, INT64_MAX - 1}})); } + + { + // when there exists full compaction, we can't optimize by "* only one CC happened and CP changed" + + // * one CC happened and CP changed, and full compaction happened + // BE [=][=][=][=][=][=][=][=][=][=] + // ^~~~~ req_cp + // MS [xxxxxxxxxxxxxx][xxxxxxxxxxxxxx][=======][=][=] + // ^~~~~~~ ms_cp + // ^___________________________^ related_versions: [req_cp, max] + // + auto [req_bc_cnt, bc_cnt] = std::tuple {0, 1}; + auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2}; + auto [req_cp, cp] = std::tuple {4, 7}; + auto [req_start, req_end] = std::tuple {9, 12}; + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 1, + req_start, req_end); + ASSERT_EQ(versions, (Versions {{0, INT64_MAX - 1}})); + } + + { + // abnormal case: + auto [req_bc_cnt, bc_cnt] = std::tuple {0, 1}; + auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2}; + auto [req_cp, cp] = std::tuple {4, 7}; + auto [req_start, req_end] = std::tuple {9, 12}; + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0, + req_start, req_end); + // when not considering full compaction, the returned versions is wrong becasue rowsets in [7-8] are missed + ASSERT_EQ(versions, (Versions {{0, 6}, {9, 12}})); + } } TEST(MetaServiceTest, StageTest) { diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index bc4a219d02691f..345e03e8574264 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -624,7 +624,7 @@ message TabletStatsPB { optional int64 cumulative_point = 9; optional int64 last_base_compaction_time_ms = 10; optional int64 last_cumu_compaction_time_ms = 11; - optional int64 full_compaction_cnt = 12; + optional int64 full_compaction_cnt = 12; // used by calc_sync_versions() only optional int64 last_full_compaction_time_ms = 13; optional int64 index_size = 14; optional int64 segment_size = 15; @@ -1043,6 +1043,7 @@ message GetRowsetRequest { // for compability reason we use FILL_WITH_DICT as default optional SchemaOp schema_op = 8 [default = FILL_WITH_DICT]; optional string request_ip = 9; + optional int64 full_compaction_cnt = 10; } message GetRowsetResponse { diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_calc_sync_version.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_calc_sync_version.out new file mode 100644 index 00000000000000..ed930452a5d142 --- /dev/null +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_calc_sync_version.out @@ -0,0 +1,55 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 10 +2 20 +3 30 + +-- !sql -- +1 10 +2 20 +3 30 + +-- !write_cluster_new_write -- +1 60 +2 70 +3 30 +4 40 +5 50 + +-- !read_cluster_query -- +1 60 +2 70 +3 30 +4 40 +5 50 + +-- !write_cluster_full_compaction -- +1 60 +2 70 +3 30 +4 40 +5 50 + +-- !write_cluster_cumu_compaction -- +1 60 +2 70 +3 30 +4 40 +5 50 + +-- !write_cluster_new_write -- +1 80 +2 70 +3 30 +4 40 +5 50 + +-- !read_cluster_check_dup_key -- + +-- !read_cluster_res -- +1 80 +2 70 +3 30 +4 40 +5 50 + diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_calc_sync_version.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_calc_sync_version.groovy new file mode 100644 index 00000000000000..12bdd2dd007bf9 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_calc_sync_version.groovy @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions + +suite("test_cloud_calc_sync_version","docker") { + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + ] + options.enableDebugPoints() + options.cloudMode = true + + docker(options) { + def write_cluster = "write_cluster" + def read_cluster = "read_cluster" + + // Add two clusters + cluster.addBackend(1, write_cluster) + cluster.addBackend(1, read_cluster) + + sql "use @${write_cluster}" + logger.info("==== switch to write cluster") + def tableName = "test_cloud_calc_sync_version" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , + ) engine=olap + UNIQUE KEY(k) + DISTRIBUTED BY HASH(`k`) BUCKETS 1 + properties( + "replication_num" = "1", + "disable_auto_compaction" = "true") + """ + + sql """ INSERT INTO ${tableName} VALUES (1,10)""" + sql """ INSERT INTO ${tableName} VALUES (2,20)""" + sql """ INSERT INTO ${tableName} VALUES (3,30)""" + qt_sql "select * from ${tableName} order by k;" + + def check_rs_metas = { tbl, check_func -> + def compactionUrl = sql_return_maparray("show tablets from ${tbl};").get(0).CompactionStatus + def (code, out, err) = curl("GET", compactionUrl) + assert code == 0 + def jsonMeta = parseJson(out.trim()) + logger.info("==== rowsets: ${jsonMeta.rowsets}, cumu point: ${jsonMeta["cumulative point"]}") + check_func(jsonMeta.rowsets, jsonMeta["cumulative point"]) + } + + def tabletStats = sql_return_maparray("show tablets from ${tableName};") + def tabletId = tabletStats[0].TabletId + def tabletBackendId = tabletStats[0].BackendId + def tabletBackend + def backends = sql_return_maparray('show backends') + for (def be : backends) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + logger.info("==== tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + + def do_cumu_compaction = { def tbl, def tablet_id, int start, int end, int cp -> + GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point", [tablet_id: "${tablet_id}", cumu_point: "${cp}"]) + GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", [tablet_id: "${tablet_id}", start_version: "${start}", end_version: "${end}"]) + + trigger_and_wait_compaction(tbl, "cumulative") + + GetDebugPoint().disableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point") + GetDebugPoint().disableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets") + } + + try { + // [2-2],[3-3],[4-4] -> [2,4] + do_cumu_compaction(tableName, tabletId, 2, 4, 5) + qt_sql "select * from ${tableName} order by k;" + check_rs_metas(tableName, {def rowsets, def cumu_point -> + assert rowsets.size() == 2 + assert cumu_point as int == 5 + assert rowsets[1].contains("[2-4]") + }) + + sql """ INSERT INTO ${tableName} VALUES (4,40)""" // ver=5 + sql """ INSERT INTO ${tableName} VALUES (5,50)""" // ver=6 + sql "sync;" + + GetDebugPoint().enableDebugPointForAllBEs("CloudFullCompaction::execute_compact.block", [tablet_id: "${tabletId}"]) + def t1 = thread("full compaction") { + // [2,4],[5-5],[6-6] -> [2,6] + sql "use @${write_cluster}" + trigger_and_wait_compaction(tableName, "full") + } + + sleep(1500) + sql """ INSERT INTO ${tableName} VALUES (1,60)""" // ver=7 + sql """ INSERT INTO ${tableName} VALUES (2,70)""" // ver=8 + sql "sync;" + qt_write_cluster_new_write "select * from ${tableName} order by k;" + + + // read cluster sync rowsets [2-4],[5-5],[6-6],[7-7],[8-8], bc_cnt=0, cc_cnt=1, cp=4 + sql "use @${read_cluster}" + logger.info("==== switch to read cluster") + qt_read_cluster_query "select * from ${tableName} order by k;" + check_rs_metas(tableName, {def rowsets, def cumu_point -> + assert rowsets.size() == 6 + assert cumu_point as int == 5 + assert rowsets[1].contains("[2-4]") + assert rowsets[2].contains("[5-5]") + assert rowsets[3].contains("[6-6]") + assert rowsets[4].contains("[7-7]") + assert rowsets[5].contains("[8-8]") + }) + + + sql "use @${write_cluster}" + logger.info("==== switch to write cluster") + GetDebugPoint().disableDebugPointForAllBEs("CloudFullCompaction::execute_compact.block") + t1.get() + qt_write_cluster_full_compaction "select * from ${tableName} order by k;" + check_rs_metas(tableName, {def rowsets, def cumu_point -> + assert rowsets.size() == 4 + assert cumu_point as int == 7 // updated by full compaction + assert rowsets[1].contains("[2-6]") + assert rowsets[2].contains("[7-7]") + assert rowsets[3].contains("[8-8]") + }) + + + do_cumu_compaction(tableName, tabletId, 7, 8, 7) + qt_write_cluster_cumu_compaction "select * from ${tableName} order by k;" + check_rs_metas(tableName, {def rowsets, def cumu_point -> + assert rowsets.size() == 3 + assert cumu_point as int == 7 + assert rowsets[1].contains("[2-6]") + assert rowsets[2].contains("[7-8]") + }) + + sql """ INSERT INTO ${tableName} VALUES (1,80)""" // ver=9 + sql "sync;" + qt_write_cluster_new_write "select * from ${tableName} order by k;" + + + // read cluster will read dup keys of ver=9 to ver=7 because it will not sync rowset [7-8] + sql "use @${read_cluster}" + logger.info("==== switch to read cluster") + sql "set disable_nereids_rules=ELIMINATE_GROUP_BY;" + qt_read_cluster_check_dup_key "select k,count() from ${tableName} group by k having count()>1;" + qt_read_cluster_res "select * from ${tableName} order by k;" + + } catch (Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + } + } +}