From 8aca7f072edbc04a5529d2aaba91fe0330c56302 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Tue, 21 Jan 2025 18:13:07 +0800 Subject: [PATCH 1/2] 1 --- be/src/cloud/cloud_cumulative_compaction.cpp | 1 + .../cloud_cumulative_compaction_policy.cpp | 7 +- .../test_first_delete_compaction.out | 4 + .../test_first_delete_compaction.groovy | 111 ++++++++++++++++++ 4 files changed, 119 insertions(+), 4 deletions(-) create mode 100644 regression-test/data/fault_injection_p0/test_first_delete_compaction.out create mode 100644 regression-test/suites/fault_injection_p0/test_first_delete_compaction.groovy diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index d2a3af3ec1ba52..8376b905b1825b 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -46,6 +46,7 @@ CloudCumulativeCompaction::CloudCumulativeCompaction(CloudStorageEngine& engine, CloudCumulativeCompaction::~CloudCumulativeCompaction() = default; Status CloudCumulativeCompaction::prepare_compact() { + DBUG_EXECUTE_IF("CloudCumulativeCompaction.prepare_compact.sleep", { sleep(5); }) if (_tablet->tablet_state() != TABLET_RUNNING && (!config::enable_new_tablet_do_compaction || static_cast(_tablet.get())->alter_version() == -1)) { diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.cpp b/be/src/cloud/cloud_cumulative_compaction_policy.cpp index 874799f4007311..d666311d343316 100644 --- a/be/src/cloud/cloud_cumulative_compaction_policy.cpp +++ b/be/src/cloud/cloud_cumulative_compaction_policy.cpp @@ -230,10 +230,9 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point( config::compaction_promotion_version_count; // if rowsets have delete version, move to the last directly. // if rowsets have no delete version, check output_rowset total disk size satisfies promotion size. - return output_rowset->start_version() == last_cumulative_point && - (last_delete_version.first != -1 || - output_rowset->total_disk_size() >= cloud_promotion_size(tablet) || - satisfy_promotion_version) + return (last_delete_version.first != -1 || + output_rowset->total_disk_size() >= cloud_promotion_size(tablet) || + satisfy_promotion_version) ? output_rowset->end_version() + 1 : last_cumulative_point; } diff --git a/regression-test/data/fault_injection_p0/test_first_delete_compaction.out b/regression-test/data/fault_injection_p0/test_first_delete_compaction.out new file mode 100644 index 00000000000000..6a278bd6d15a3e --- /dev/null +++ b/regression-test/data/fault_injection_p0/test_first_delete_compaction.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select1 -- +1 1 + diff --git a/regression-test/suites/fault_injection_p0/test_first_delete_compaction.groovy b/regression-test/suites/fault_injection_p0/test_first_delete_compaction.groovy new file mode 100644 index 00000000000000..f7bcba95bde62b --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_first_delete_compaction.groovy @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_first_delete_compaction", "nonConcurrent") { + if (!isCloudMode()) { + return + } + def tableName = "test_first_delete_compaction" + def check_cumu_point = { cumu_point -> + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + int cumuPoint = 0 + def tablet = tablets[0] + String tablet_id = tablet.TabletId + def (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + cumuPoint = tabletJson["cumulative point"] + return cumuPoint > cumu_point + } + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def set_be_config = { key, value -> + for (String backend_id: backendId_to_backendIP.keySet()) { + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + } + set_be_config.call("disable_auto_compaction", "true") + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().enableDebugPointForAllBEs("CloudCumulativeCompaction.prepare_compact.sleep") + + try { + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} + ( + `k1` INT NULL, + `v1` INT NULL + ) + UNIQUE KEY(k1) + PARTITION BY RANGE(k1) + ( + PARTITION p1 VALUES LESS THAN (10), + PARTITION p2 VALUES LESS THAN (20) + ) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "enable_mow_light_delete" = "true")""" + + sql """ INSERT INTO ${tableName} VALUES (11,11)""" + sql """ set delete_without_partition = true; """ + sql """ delete from ${tableName} where v1 > 0""" + sql """ INSERT INTO ${tableName} VALUES (1,1)""" + sql """ INSERT INTO ${tableName} VALUES (1,1)""" + sql """ INSERT INTO ${tableName} VALUES (1,1)""" + sql """ INSERT INTO ${tableName} VALUES (1,1)""" + sql """ INSERT INTO ${tableName} VALUES (1,1)""" + set_be_config.call("disable_auto_compaction", "false") + + def now = System.currentTimeMillis() + + while(true){ + sql """ INSERT INTO ${tableName} VALUES (1,1)""" + sql """ INSERT INTO ${tableName} VALUES (1,1)""" + sql """ INSERT INTO ${tableName} VALUES (1,1)""" + sql """ INSERT INTO ${tableName} VALUES (1,1)""" + sql """ INSERT INTO ${tableName} VALUES (1,1)""" + if(check_cumu_point(3)){ + break; + } + Thread.sleep(3000) + def diff = System.currentTimeMillis() - now + if(diff > 300*1000){ + break + } + } + def time_diff = System.currentTimeMillis() - now + logger.info("time_diff:" + time_diff) + assertTrue(time_diff<60*1000) + + qt_select1 """select * from ${tableName} order by k1, v1""" + } catch (Exception e){ + logger.info(e.getMessage()) + assertFalse(true) + } finally { + set_be_config.call("disable_auto_compaction", "false") + GetDebugPoint().disableDebugPointForAllBEs("CloudCumulativeCompaction.prepare_compact.sleep") + try_sql("DROP TABLE IF EXISTS ${tableName} FORCE") + } + +} \ No newline at end of file From aea7baafb8a9b9be93101094c93de6628e7428c5 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Thu, 27 Mar 2025 21:38:28 +0800 Subject: [PATCH 2/2] 2 --- ...loud_cumulative_compaction_policy_test.cpp | 148 ++++++++++++++++++ 1 file changed, 148 insertions(+) create mode 100644 be/test/cloud/cloud_cumulative_compaction_policy_test.cpp diff --git a/be/test/cloud/cloud_cumulative_compaction_policy_test.cpp b/be/test/cloud/cloud_cumulative_compaction_policy_test.cpp new file mode 100644 index 00000000000000..45dd3f1d093520 --- /dev/null +++ b/be/test/cloud/cloud_cumulative_compaction_policy_test.cpp @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include + +#include "cloud/cloud_storage_engine.h" +#include "gtest/gtest_pred_impl.h" +#include "json2pb/json_to_pb.h" +#include "olap/olap_common.h" +#include "olap/rowset/rowset_factory.h" +#include "olap/rowset/rowset_meta.h" +#include "olap/tablet_meta.h" +#include "util/uid_util.h" + +namespace doris { + +class TestCloudSizeBasedCumulativeCompactionPolicy : public testing::Test { +public: + TestCloudSizeBasedCumulativeCompactionPolicy() : _engine(CloudStorageEngine({})) {} + + void SetUp() { + config::compaction_promotion_size_mbytes = 1024; + config::compaction_promotion_ratio = 0.05; + config::compaction_promotion_min_size_mbytes = 64; + config::compaction_min_size_mbytes = 64; + + _tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, + UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, + TCompressionType::LZ4F)); + + _json_rowset_meta = R"({ + "rowset_id": 540081, + "tablet_id": 15673, + "txn_id": 4042, + "tablet_schema_hash": 567997577, + "rowset_type": "BETA_ROWSET", + "rowset_state": "VISIBLE", + "start_version": 2, + "end_version": 2, + "num_rows": 3929, + "total_disk_size": 41, + "data_disk_size": 41, + "index_disk_size": 235, + "empty": false, + "load_id": { + "hi": -5350970832824939812, + "lo": -6717994719194512122 + }, + "creation_time": 1553765670, + "num_segments": 3 + })"; + } + void TearDown() {} + + void init_rs_meta(RowsetMetaSharedPtr& pb1, int64_t start, int64_t end) { + RowsetMetaPB rowset_meta_pb; + json2pb::JsonToProtoMessage(_json_rowset_meta, &rowset_meta_pb); + rowset_meta_pb.set_start_version(start); + rowset_meta_pb.set_end_version(end); + rowset_meta_pb.set_creation_time(10000); + + pb1->init_from_pb(rowset_meta_pb); + pb1->set_total_disk_size(41); + pb1->set_tablet_schema(_tablet_meta->tablet_schema()); + } + + void init_rs_meta_small_base(std::vector* rs_metas) { + RowsetMetaSharedPtr ptr1(new RowsetMeta()); + init_rs_meta(ptr1, 0, 0); + rs_metas->push_back(ptr1); + + RowsetMetaSharedPtr ptr2(new RowsetMeta()); + init_rs_meta(ptr2, 1, 1); + rs_metas->push_back(ptr2); + + RowsetMetaSharedPtr ptr3(new RowsetMeta()); + init_rs_meta(ptr3, 2, 2); + rs_metas->push_back(ptr3); + + RowsetMetaSharedPtr ptr4(new RowsetMeta()); + init_rs_meta(ptr4, 3, 3); + rs_metas->push_back(ptr4); + + RowsetMetaSharedPtr ptr5(new RowsetMeta()); + init_rs_meta(ptr5, 4, 4); + rs_metas->push_back(ptr5); + } + +protected: + std::string _json_rowset_meta; + TabletMetaSharedPtr _tablet_meta; + +private: + CloudStorageEngine _engine; +}; + +static RowsetSharedPtr create_rowset(Version version, int num_segments, bool overlapping, + int data_size) { + auto rs_meta = std::make_shared(); + rs_meta->set_rowset_type(BETA_ROWSET); // important + rs_meta->_rowset_meta_pb.set_start_version(version.first); + rs_meta->_rowset_meta_pb.set_end_version(version.second); + rs_meta->set_num_segments(num_segments); + rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING); + rs_meta->set_total_disk_size(data_size); + RowsetSharedPtr rowset; + Status st = RowsetFactory::create_rowset(nullptr, "", rs_meta, &rowset); + if (!st.ok()) { + return nullptr; + } + return rowset; +} + +TEST_F(TestCloudSizeBasedCumulativeCompactionPolicy, new_cumulative_point) { + std::vector rs_metas; + init_rs_meta_small_base(&rs_metas); + + CloudTablet _tablet(_engine, _tablet_meta); + for (auto& rs_meta : rs_metas) { + static_cast(_tablet_meta->add_rs_meta(rs_meta)); + } + _tablet._tablet_meta->_enable_unique_key_merge_on_write = true; + _tablet._base_size = 100; + + CloudSizeBasedCumulativeCompactionPolicy policy; + RowsetSharedPtr output_rowset = create_rowset(Version(3, 5), 5, false, 100 * 1024 * 1024); + Version version(1, 1); + EXPECT_EQ(policy.new_cumulative_point(&_tablet, output_rowset, version, 2), 6); +} +} // namespace doris