From 5371c3627c4a2ebb5fd6782afd2e071517818520 Mon Sep 17 00:00:00 2001 From: Lei Zhang Date: Tue, 10 Jun 2025 14:18:57 +0800 Subject: [PATCH 1/2] [feat](cloud) Add unused rowset state for CloudTablet * Add unused rowset state for CloudTablet to recycle file cache data and delete_bitmap * the pr is enhancement for the pr `https://github.com/apache/doris/pull/50973` --- be/src/cloud/cloud_tablet.cpp | 65 +-- be/src/cloud/cloud_tablet.h | 2 + ...compaction_and_read_stale_cloud_docker.out | 36 ++ ...paction_and_read_stale_cloud_docker.groovy | 390 ++++++++++++++++++ ...egments_and_read_stale_cloud_docker.groovy | 347 ++++++++++++++++ 5 files changed, 817 insertions(+), 23 deletions(-) create mode 100644 regression-test/data/compaction/test_filecache_compaction_and_read_stale_cloud_docker.out create mode 100644 regression-test/suites/compaction/test_filecache_compaction_and_read_stale_cloud_docker.groovy create mode 100644 regression-test/suites/compaction/test_filecache_compaction_multisegments_and_read_stale_cloud_docker.groovy diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 43749157c9a536..2aca00c156f775 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -63,6 +63,8 @@ bvar::LatencyRecorder g_cu_compaction_get_delete_bitmap_lock_time_ms( bvar::LatencyRecorder g_base_compaction_get_delete_bitmap_lock_time_ms( "base_compaction_get_delete_bitmap_lock_time_ms"); +bvar::Adder g_unused_rowsets_count("unused_rowsets_count"); + static constexpr int LOAD_INITIATOR_ID = -1; CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta) @@ -342,17 +344,20 @@ void CloudTablet::add_rowsets(std::vector to_add, bool version_ // replace existed rowset with `to_add` rowset. This may occur when: // 1. schema change converts rowsets which have been double written to new tablet // 2. cumu compaction picks single overlapping input rowset to perform compaction - if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) { - // add existed rowset to unused_rowsets to remove delete bitmap - if (auto find_it = _rs_version_map.find(rs->version()); - find_it != _rs_version_map.end()) { - DCHECK(find_it->second->rowset_id() != rs->rowset_id()) - << "tablet_id=" << tablet_id() - << ", rowset_id=" << rs->rowset_id().to_string() - << ", existed rowset=" << find_it->second->rowset_id().to_string(); - _unused_rowsets.emplace(find_it->second->rowset_id(), find_it->second); - } + + // add existed rowset to unused_rowsets to remove delete bitmap and recycle cached data + + std::vector unused_rowsets; + if (auto find_it = _rs_version_map.find(rs->version()); + find_it != _rs_version_map.end()) { + DCHECK(find_it->second->rowset_id() != rs->rowset_id()) + << "tablet_id=" << tablet_id() + << ", rowset_id=" << rs->rowset_id().to_string() + << ", existed rowset_id=" << find_it->second->rowset_id().to_string(); + unused_rowsets.push_back(find_it->second); } + add_unused_rowsets(unused_rowsets); + _tablet_meta->delete_rs_meta_by_version(rs->version(), nullptr); _rs_version_map[rs->version()] = rs; _tablet_meta->add_rowsets_unchecked({rs}); @@ -460,21 +465,16 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() { } _reconstruct_version_tracker_if_necessary(); } + + // if the rowset is not used by any query, we can recycle its cached data early. recycle_cached_data(expired_rowsets); if (config::enable_mow_verbose_log) { LOG_INFO("finish delete_expired_stale_rowset for tablet={}", tablet_id()); } + add_unused_rowsets(expired_rowsets); if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write() && !deleted_stale_rowsets.empty()) { - // record expired rowsets in unused rowsets - { - std::lock_guard lock(_gc_mutex); - for (const auto& rowset : expired_rowsets) { - _unused_rowsets.emplace(rowset->rowset_id(), rowset); - } - } - // agg delete bitmap for pre rowsets; record unused delete bitmap key ranges OlapStopWatch watch; for (const auto& [version, unused_rowsets] : deleted_stale_rowsets) { @@ -504,19 +504,34 @@ bool CloudTablet::need_remove_unused_rowsets() { return !_unused_rowsets.empty() || !_unused_delete_bitmap.empty(); } +void CloudTablet::add_unused_rowsets(const std::vector& rowsets) { + std::lock_guard lock(_gc_mutex); + for (const auto& rowset : rowsets) { + _unused_rowsets[rowset->rowset_id()] = rowset; + } + g_unused_rowsets_count << rowsets.size(); +} + void CloudTablet::remove_unused_rowsets() { + int64_t removed_rowsets_num = 0; + int64_t removed_delete_bitmap_num = 0; + OlapStopWatch watch; std::lock_guard lock(_gc_mutex); - // 1. remove unused rowsets and delete bitmap + // 1. remove unused rowsets's cache data and delete bitmap for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) { + // it->second is std::shared_ptr auto&& rs = it->second; if (rs.use_count() > 1) { - LOG(WARNING) << "Rowset " << rs->rowset_id() << " has " << rs.use_count() - << " references. Can not remove delete bitmap."; + LOG(WARNING) << "tablet_id:" << tablet_id() << " rowset: " << rs->rowset_id() << " has " + << rs.use_count() << " references, it cannot be removed"; ++it; continue; } tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version()); + rs->clear_cache(); it = _unused_rowsets.erase(it); + g_unused_rowsets_count << -1; + removed_rowsets_num++; } // 2. remove delete bitmap of pre rowsets @@ -538,12 +553,16 @@ void CloudTablet::remove_unused_rowsets() { auto& key_ranges = std::get<1>(*it); tablet_meta()->delete_bitmap().remove(key_ranges); it = _unused_delete_bitmap.erase(it); + removed_delete_bitmap_num++; } - if (!_unused_rowsets.empty() || !_unused_delete_bitmap.empty()) { + if (removed_rowsets_num > 0 || removed_delete_bitmap_num > 0) { LOG(INFO) << "tablet_id=" << tablet_id() << ", unused_rowset size=" << _unused_rowsets.size() - << ", unused_delete_bitmap size=" << _unused_delete_bitmap.size(); + << ", unused_delete_bitmap size=" << _unused_delete_bitmap.size() + << ", removed_rowsets_num=" << removed_rowsets_num + << ", removed_delete_bitmap_num=" << removed_delete_bitmap_num + << ", cost(us)=" << watch.get_elapse_time_us(); } } diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index d63506ddde0e55..3474857b36aa6d 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -276,6 +276,8 @@ class CloudTablet final : public BaseTablet { std::map& pre_rowset_to_versions); bool need_remove_unused_rowsets(); + + void add_unused_rowsets(const std::vector& rowsets); void remove_unused_rowsets(); private: diff --git a/regression-test/data/compaction/test_filecache_compaction_and_read_stale_cloud_docker.out b/regression-test/data/compaction/test_filecache_compaction_and_read_stale_cloud_docker.out new file mode 100644 index 00000000000000..8701d535cf3b53 --- /dev/null +++ b/regression-test/data/compaction/test_filecache_compaction_and_read_stale_cloud_docker.out @@ -0,0 +1,36 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql1 -- +1 99 +2 99 +3 99 +4 99 +5 99 + +-- !sql2 -- +1 99 +2 99 +3 99 +4 99 +5 99 + +-- !sql3 -- +1 99 +2 99 +3 99 +4 99 +5 99 + +-- !sql4 -- +1 99 +2 99 +3 99 +4 99 +5 100 + +-- !sql5 -- +1 99 +2 99 +3 99 +4 99 +5 100 + diff --git a/regression-test/suites/compaction/test_filecache_compaction_and_read_stale_cloud_docker.groovy b/regression-test/suites/compaction/test_filecache_compaction_and_read_stale_cloud_docker.groovy new file mode 100644 index 00000000000000..8673f249e1dab0 --- /dev/null +++ b/regression-test/suites/compaction/test_filecache_compaction_and_read_stale_cloud_docker.groovy @@ -0,0 +1,390 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.atomic.AtomicBoolean +import org.apache.doris.regression.suite.ClusterOptions +import org.codehaus.groovy.runtime.IOGroovyMethods +import org.apache.doris.regression.util.Http + +suite("test_filecache_compaction_and_read_stale_cloud_docker", "docker") { + def options = new ClusterOptions() + options.cloudMode = true + options.setFeNum(1) + options.setBeNum(1) + options.enableDebugPoints() + options.feConfigs.add("enable_workload_group=false") + options.beConfigs.add('compaction_promotion_version_count=5') + options.beConfigs.add('tablet_rowset_stale_sweep_time_sec=0') + options.beConfigs.add('vacuum_stale_rowsets_interval_s=10') + options.beConfigs.add('enable_java_support=false') + + def dbName = "" + def testTable = "test_filecache_compaction_and_read_stale_cloud_docker" + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + def backendId_to_backendBrpcPort = [:] + + def triggerCompaction = { tablet -> + def compact_type = "cumulative" + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + if (compact_type == "cumulative") { + def (code_1, out_1, err_1) = be_run_cumulative_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 + ", err=" + err_1) + assertEquals(code_1, 0) + return out_1 + } else if (compact_type == "full") { + def (code_2, out_2, err_2) = be_run_full_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_2 + ", out=" + out_2 + ", err=" + err_2) + assertEquals(code_2, 0) + return out_2 + } else { + assertFalse(True) + } + } + + def getTabletStatus = { tablet -> + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/show?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get tablet status: =" + code + ", out=" + out) + assertEquals(code, 0) + def tabletStatus = parseJson(out.trim()) + return tabletStatus + } + + def waitForCompaction = { tablet -> + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + def running = true + do { + Thread.sleep(1000) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/run_status?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get compaction status: code=" + code + ", out=" + out) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + def getLocalDeleteBitmapStatus = { tablet -> + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + boolean running = true + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/delete_bitmap/count_local?verbose=true&tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get local delete bitmap count status: =" + code + ", out=" + out) + assertEquals(code, 0) + def deleteBitmapStatus = parseJson(out.trim()) + return deleteBitmapStatus + } + + def getMsDeleteBitmapStatus = { tablet -> + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + boolean running = true + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/delete_bitmap/count_ms?verbose=true&tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get ms delete bitmap count status: =" + code + ", out=" + out) + assertEquals(code, 0) + def deleteBitmapStatus = parseJson(out.trim()) + return deleteBitmapStatus + } + + docker(options) { + def fes = sql_return_maparray "show frontends" + logger.info("frontends: ${fes}") + def url = "jdbc:mysql://${fes[0].Host}:${fes[0].QueryPort}/" + logger.info("url: " + url) + AtomicBoolean query_result = new AtomicBoolean(true) + def query = { + connect( context.config.jdbcUser, context.config.jdbcPassword, url) { + logger.info("query start") + def results = sql_return_maparray """ select * from ${dbName}.${testTable}; """ + logger.info("query result: " + results) + Set keys = new HashSet<>() + for (final def result in results) { + if (keys.contains(result.k)) { + logger.info("find duplicate key: " + result.k) + query_result.set(false) + break + } + keys.add(result.k) + } + logger.info("query finish. query_result: " + query_result.get()) + } + } + + def result = sql 'SELECT DATABASE()' + dbName = result[0][0] + + sql """ DROP TABLE IF EXISTS ${testTable} """ + sql """ + create table ${testTable} (`k` int NOT NULL, `v` int NOT NULL) + UNIQUE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true" + ); + """ + // getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + getBackendIpHttpAndBrpcPort(backendId_to_backendIP, backendId_to_backendHttpPort, backendId_to_backendBrpcPort); + + def tablets = sql_return_maparray """ show tablets from ${testTable}; """ + logger.info("tablets: " + tablets) + assertEquals(1, tablets.size()) + def tablet = tablets[0] + String tablet_id = tablet.TabletId + + try { + Set all_history_stale_rowsets = new HashSet<>(); + + // write some data + sql """ INSERT INTO ${testTable} VALUES (1,99); """ + sql """ INSERT INTO ${testTable} VALUES (2,99); """ + sql """ INSERT INTO ${testTable} VALUES (3,99); """ + sql """ INSERT INTO ${testTable} VALUES (4,99); """ + sql """ INSERT INTO ${testTable} VALUES (5,99); """ + sql "sync" + order_qt_sql1 """ select * from ${testTable}; """ + + def tablet_status = getTabletStatus(tablet) + + // after compaction, [1-6] versions will become stale rowsets + all_history_stale_rowsets.addAll(tablet_status["rowsets"]) + + // trigger compaction to generate base rowset + assertTrue(triggerCompaction(tablet).contains("Success")) + waitForCompaction(tablet) + tablet_status = getTabletStatus(tablet) + assertEquals(2, tablet_status["rowsets"].size()) + all_history_stale_rowsets.addAll(tablet_status["rowsets"]) + + def ms_dm = getMsDeleteBitmapStatus(tablet) + assertEquals(0, ms_dm["delete_bitmap_count"]) + order_qt_sql2 "select * from ${testTable}" + + // write some data + sql """ INSERT INTO ${testTable} VALUES (1,99); """ + sql """ INSERT INTO ${testTable} VALUES (2,99); """ + sql """ INSERT INTO ${testTable} VALUES (3,99); """ + sql """ INSERT INTO ${testTable} VALUES (4,99); """ + sql """ INSERT INTO ${testTable} VALUES (5,99); """ + sql """ sync """ + order_qt_sql3 "select * from ${testTable}" + tablet_status = getTabletStatus(tablet) + assertEquals(7, tablet_status["rowsets"].size()) + all_history_stale_rowsets.addAll(tablet_status["rowsets"]) + ms_dm = getMsDeleteBitmapStatus(tablet) + assertEquals(5, ms_dm["delete_bitmap_count"]) + + // trigger and block one query + GetDebugPoint().enableDebugPointForAllBEs("NewOlapScanner::_init_tablet_reader_params.block") + GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowset") + GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", + [tablet_id: "${tablet_id}", start_version: 7, end_version: 11]); + Thread query_thread = new Thread(() -> query()) + query_thread.start() + sleep(100) + + // trigger compaction + // getTabletStatus(tablet) + assertTrue(triggerCompaction(tablet).contains("Success")) + waitForCompaction(tablet) + logger.info("compaction2 finished") + // check rowset count + tablet_status = getTabletStatus(tablet) + assertEquals(3, tablet_status["rowsets"].size()) + all_history_stale_rowsets.addAll(tablet_status["rowsets"]) + // check ms delete bitmap count + ms_dm = getMsDeleteBitmapStatus(tablet) + assertEquals(1, ms_dm["delete_bitmap_count"]) + assertEquals(5, ms_dm["cardinality"]) + // check local delete bitmap count + def local_dm = getLocalDeleteBitmapStatus(tablet) + assertEquals(5, local_dm["delete_bitmap_count"]) + assertEquals(9, local_dm["cardinality"]) + + // wait for stale rowsets are deleted + boolean is_stale_rowsets_deleted = false + for (int i = 0; i < 100; i++) { + tablet_status = getTabletStatus(tablet) + if (tablet_status["stale_rowsets"].size() == 0) { + is_stale_rowsets_deleted = true + break + } + sleep(500) + } + assertTrue(is_stale_rowsets_deleted, "stale rowsets are not deleted") + // check to delete bitmap of stale rowsets is not deleted + sleep(1000) + def local_dm_status = getLocalDeleteBitmapStatus(tablet) + assertEquals(5, local_dm_status["delete_bitmap_count"]) + + // unnlock query and check no duplicated keys + GetDebugPoint().disableDebugPointForAllBEs("NewOlapScanner::_init_tablet_reader_params.block") + query_thread.join() + assertTrue(query_result.get(), "find duplicated keys") + + // check delete bitmap of compaction2 stale rowsets are deleted + // write some data + sql """ INSERT INTO ${testTable} VALUES (1,99); """ + sql """ INSERT INTO ${testTable} VALUES (2,99); """ + sql """ INSERT INTO ${testTable} VALUES (3,99); """ + sql """ INSERT INTO ${testTable} VALUES (4,99); """ + sql """ INSERT INTO ${testTable} VALUES (5,100); """ + sql "sync" + order_qt_sql4 "select * from ${testTable}" + logger.info("order_qt_sql4 finished") + tablet_status = getTabletStatus(tablet) + all_history_stale_rowsets.addAll(tablet_status["rowsets"]) + getMsDeleteBitmapStatus(tablet) + // trigger compaction + GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", + [tablet_id: "${tablet_id}", start_version: 12, end_version: 16]); + tablet_status = getTabletStatus(tablet) + all_history_stale_rowsets.addAll(tablet_status["rowsets"]) + assertTrue(triggerCompaction(tablet).contains("Success")) + waitForCompaction(tablet) + boolean is_compaction_finished = false + for (int i = 0; i < 100; i++) { + tablet_status = getTabletStatus(tablet) + all_history_stale_rowsets.addAll(tablet_status["rowsets"]) + if (tablet_status["rowsets"].size() == 4) { + logger.info("final tablet status: ${tablet_status}") + is_compaction_finished = true + break + } + sleep(500) + } + assertTrue(is_compaction_finished, "compaction is not finished") + logger.info("compaction3 finished") + // check ms delete bitmap count + ms_dm = getMsDeleteBitmapStatus(tablet) + assertEquals(2, ms_dm["delete_bitmap_count"]) + assertEquals(10, ms_dm["cardinality"]) + // check delete bitmap count + logger.info("check local delete bitmap is deleted") + boolean is_local_dm_deleted = false + for (int i = 0; i < 100; i++) { + local_dm_status = getLocalDeleteBitmapStatus(tablet) + if (local_dm_status["delete_bitmap_count"] == 2) { + assertEquals(10, local_dm_status["cardinality"]) + is_local_dm_deleted = true + break + } + sleep(500) + } + assertTrue(is_local_dm_deleted, "delete bitmap of compaction2 stale rowsets are not deleted") + order_qt_sql5 "select * from ${testTable}" + + tablet_status = getTabletStatus(tablet) + def final_rowsets = tablet_status["rowsets"] + + // sleep for vacuum_stale_rowsets_interval_s=10 seconds to wait for unused rowsets are deleted + sleep(21000) + + def be_host = backendId_to_backendIP[tablet.BackendId] + def be_http_port = backendId_to_backendHttpPort[tablet.BackendId] + for (int i = 0; i < all_history_stale_rowsets.size(); i++) { + def rowsetStr = all_history_stale_rowsets[i] + // [12-12] 1 DATA NONOVERLAPPING 02000000000000124843c92c13625daa8296c20957119893 1011.00 B + def start_version = rowsetStr.split(" ")[0].replace('[', '').replace(']', '').split("-")[0].toInteger() + def end_version = rowsetStr.split(" ")[0].replace('[', '').replace(']', '').split("-")[1].toInteger() + def rowset_id = rowsetStr.split(" ")[4] + if (start_version == 0 || start_version != end_version) { + continue + } + + logger.info("rowset ${i}, start: ${start_version}, end: ${end_version}, id: ${rowset_id}") + def data = Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat", true) + logger.info("file cache data: ${data}") + assertTrue(data.size() == 0) + } + + for (int i = 0; i < final_rowsets.size(); i++) { + def rowsetStr = final_rowsets[i] + def start_version = rowsetStr.split(" ")[0].replace('[', '').replace(']', '').split("-")[0].toInteger() + def end_version = rowsetStr.split(" ")[0].replace('[', '').replace(']', '').split("-")[1].toInteger() + def rowset_id = rowsetStr.split(" ")[4] + if (start_version == 0) { + continue + } + + logger.info("final rowset ${i}, start: ${start_version}, end: ${end_version}, id: ${rowset_id}") + def data = Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat", true) + logger.info("file cache data: ${data}") + assertTrue(data.size() > 0) + } + + def (code_0, out_0, err_0) = curl("GET", "http://${be_host}:${backendId_to_backendBrpcPort[tablet.BackendId]}/vars/unused_rowsets_count") + logger.info("out_0: ${out_0}") + def unusedRowsetsCount = out_0.trim().split(":")[1].trim().toInteger() + assertEquals(0, unusedRowsetsCount) + + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + } +} diff --git a/regression-test/suites/compaction/test_filecache_compaction_multisegments_and_read_stale_cloud_docker.groovy b/regression-test/suites/compaction/test_filecache_compaction_multisegments_and_read_stale_cloud_docker.groovy new file mode 100644 index 00000000000000..4b6613160907d4 --- /dev/null +++ b/regression-test/suites/compaction/test_filecache_compaction_multisegments_and_read_stale_cloud_docker.groovy @@ -0,0 +1,347 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.atomic.AtomicBoolean +import org.apache.doris.regression.suite.ClusterOptions +import org.codehaus.groovy.runtime.IOGroovyMethods +import org.apache.doris.regression.util.Http + +suite("test_filecache_compaction_multisegments_and_read_stale_cloud_docker", "docker") { + def options = new ClusterOptions() + options.cloudMode = true + options.setFeNum(1) + options.setBeNum(1) + options.enableDebugPoints() + options.feConfigs.add("enable_workload_group=false") + options.beConfigs.add('compaction_promotion_version_count=5') + options.beConfigs.add('tablet_rowset_stale_sweep_time_sec=0') + options.beConfigs.add('vacuum_stale_rowsets_interval_s=10') + options.beConfigs.add('enable_java_support=false') + options.beConfigs.add('doris_scanner_row_bytes=1') + + def dbName = "" + def testTable = "test_filecache_multisegments_and_read_stale" + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + def backendId_to_backendBrpcPort = [:] + + def triggerCompaction = { tablet -> + def compact_type = "cumulative" + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + if (compact_type == "cumulative") { + def (code_1, out_1, err_1) = be_run_cumulative_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 + ", err=" + err_1) + assertEquals(code_1, 0) + return out_1 + } else if (compact_type == "full") { + def (code_2, out_2, err_2) = be_run_full_compaction(be_host, be_http_port, tablet_id) + logger.info("Run compaction: code=" + code_2 + ", out=" + out_2 + ", err=" + err_2) + assertEquals(code_2, 0) + return out_2 + } else { + assertFalse(True) + } + } + + def getTabletStatus = { tablet, rowsetIndex, lastRowsetSegmentNum, enableAssert = false, outputRowsets = null -> + String compactionUrl = tablet["CompactionStatus"] + def (code, out, err) = curl("GET", compactionUrl) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + if (outputRowsets != null) { + outputRowsets.addAll(tabletJson.rowsets) + } + + assertTrue(tabletJson.rowsets.size() >= rowsetIndex) + def rowset = tabletJson.rowsets.get(rowsetIndex - 1) + logger.info("rowset: ${rowset}") + int start_index = rowset.indexOf("]") + int end_index = rowset.indexOf("DATA") + def segmentNumStr = rowset.substring(start_index + 1, end_index).trim() + logger.info("segmentNumStr: ${segmentNumStr}") + if (enableAssert) { + assertEquals(lastRowsetSegmentNum, Integer.parseInt(segmentNumStr)) + } else { + return lastRowsetSegmentNum == Integer.parseInt(segmentNumStr); + } + } + + def waitForCompaction = { tablet -> + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + def running = true + do { + Thread.sleep(1000) + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/compaction/run_status?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get compaction status: code=" + code + ", out=" + out) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + def getLocalDeleteBitmapStatus = { tablet -> + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + boolean running = true + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/delete_bitmap/count_local?verbose=true&tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get local delete bitmap count status: =" + code + ", out=" + out) + assertEquals(code, 0) + def deleteBitmapStatus = parseJson(out.trim()) + return deleteBitmapStatus + } + + def getMsDeleteBitmapStatus = { tablet -> + String tablet_id = tablet.TabletId + String trigger_backend_id = tablet.BackendId + def be_host = backendId_to_backendIP[trigger_backend_id] + def be_http_port = backendId_to_backendHttpPort[trigger_backend_id] + boolean running = true + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/delete_bitmap/count_ms?verbose=true&tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def out = process.getText() + logger.info("Get ms delete bitmap count status: =" + code + ", out=" + out) + assertEquals(code, 0) + def deleteBitmapStatus = parseJson(out.trim()) + return deleteBitmapStatus + } + + docker(options) { + def fes = sql_return_maparray "show frontends" + logger.info("frontends: ${fes}") + def url = "jdbc:mysql://${fes[0].Host}:${fes[0].QueryPort}/" + logger.info("url: " + url) + AtomicBoolean query_result = new AtomicBoolean(true) + def query = { + connect( context.config.jdbcUser, context.config.jdbcPassword, url) { + logger.info("query start") + def results = sql_return_maparray """ select * from ${dbName}.${testTable}; """ + logger.info("query result: " + results) + Set keys = new HashSet<>() + for (final def result in results) { + if (keys.contains(result.k)) { + logger.info("find duplicate key: " + result.k) + query_result.set(false) + break + } + keys.add(result.k) + } + logger.info("query finish. query_result: " + query_result.get()) + } + } + + def result = sql 'SELECT DATABASE()' + dbName = result[0][0] + + sql """ DROP TABLE IF EXISTS ${testTable} """ + sql """ CREATE TABLE IF NOT EXISTS ${testTable} ( + `k1` int(11) NULL, + `k2` int(11) NULL, + `v3` int(11) NULL, + `v4` int(11) NULL + ) unique KEY(`k1`, `k2`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "disable_auto_compaction" = "true" + ); + """ + + // getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + getBackendIpHttpAndBrpcPort(backendId_to_backendIP, backendId_to_backendHttpPort, backendId_to_backendBrpcPort); + + def tablets = sql_return_maparray """ show tablets from ${testTable}; """ + logger.info("tablets: " + tablets) + assertEquals(1, tablets.size()) + def tablet = tablets[0] + String tablet_id = tablet.TabletId + def backend_id = tablet.BackendId + + GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush") + GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowset") + GetDebugPoint().enableDebugPointForAllBEs("Tablet.delete_expired_stale_rowset.start_delete_unused_rowset") + + Set all_history_stale_rowsets = new HashSet<>(); + try { + // load 1 + streamLoad { + table "${testTable}" + set 'column_separator', ',' + set 'compress_type', 'GZ' + file 'test_schema_change_add_key_column.csv.gz' + time 10000 // limit inflight 10s + + check { res, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + def json = parseJson(res) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(8192, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } + } + sql "sync" + def rowCount1 = sql """ select count() from ${testTable}; """ + logger.info("rowCount1: ${rowCount1}") + // check generate 3 segments + getTabletStatus(tablet, 2, 3, true, all_history_stale_rowsets) + + // trigger compaction + GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", + [tablet_id: "${tablet.TabletId}", start_version: 2, end_version: 2]) + def (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + logger.info("compact json: " + compactJson) + // check generate 1 segments + for (int i = 0; i < 20; i++) { + if (getTabletStatus(tablet, 2, 1, false, all_history_stale_rowsets)) { + break + } + sleep(100) + } + getTabletStatus(tablet, 2, 1, false, all_history_stale_rowsets) + sql """ select * from ${testTable} limit 1; """ + + // load 2 + streamLoad { + table "${testTable}" + set 'column_separator', ',' + set 'compress_type', 'GZ' + file 'test_schema_change_add_key_column1.csv.gz' + time 10000 // limit inflight 10s + + check { res, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + def json = parseJson(res) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(20480, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } + } + sql "sync" + def rowCount2 = sql """ select count() from ${testTable}; """ + logger.info("rowCount2: ${rowCount2}") + // check generate 3 segments + getTabletStatus(tablet, 3, 6, false, all_history_stale_rowsets) + def local_dm = getLocalDeleteBitmapStatus(tablet) + logger.info("local delete bitmap 1: " + local_dm) + + // trigger compaction for load 2 + GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", + [tablet_id: "${tablet.TabletId}", start_version: 3, end_version: 3]) + (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + compactJson = parseJson(out.trim()) + logger.info("compact json: " + compactJson) + waitForCompaction(tablet) + // check generate 1 segments + for (int i = 0; i < 20; i++) { + if (getTabletStatus(tablet, 3, 1, false, all_history_stale_rowsets)) { + break + } + sleep(100) + } + getTabletStatus(tablet, 3, 1, false, all_history_stale_rowsets) + + GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.vacuum_stale_rowsets") // cloud + GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.start_delete_unused_rowset") // local + local_dm = getLocalDeleteBitmapStatus(tablet) + logger.info("local delete bitmap 2: " + local_dm) + assertEquals(1, local_dm["delete_bitmap_count"]) + + + // sleep for vacuum_stale_rowsets_interval_s=10 seconds to wait for unused rowsets are deleted + sleep(21000) + + def be_host = backendId_to_backendIP[tablet.BackendId] + def be_http_port = backendId_to_backendHttpPort[tablet.BackendId] + logger.info("be_host: ${be_host}, be_http_port: ${be_http_port}, BrpcPort: ${backendId_to_backendBrpcPort[tablet.BackendId]}") + + for (int i = 0; i < all_history_stale_rowsets.size(); i++) { + def rowsetStr = all_history_stale_rowsets[i] + // [12-12] 1 DATA NONOVERLAPPING 02000000000000124843c92c13625daa8296c20957119893 1011.00 B + def start_version = rowsetStr.split(" ")[0].replace('[', '').replace(']', '').split("-")[0].toInteger() + def end_version = rowsetStr.split(" ")[0].replace('[', '').replace(']', '').split("-")[1].toInteger() + def rowset_id = rowsetStr.split(" ")[4] + if (start_version == 0) { + continue + } + + int start_index = rowsetStr.indexOf("]") + int end_index = rowsetStr.indexOf("DATA") + def segmentNum = rowsetStr.substring(start_index + 1, end_index).trim().toInteger() + + logger.info("rowset ${i}, start: ${start_version}, end: ${end_version}, id: ${rowset_id}, segment: ${segmentNum}") + def data = Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat", true) + logger.info("file cache data: ${data}") + if (segmentNum <= 1) { + assertTrue(data.size() > 0) + } else { + assertTrue(data.size() == 0) + } + } + + def (code_0, out_0, err_0) = curl("GET", "http://${be_host}:${backendId_to_backendBrpcPort[tablet.BackendId]}/vars/unused_rowsets_count") + logger.info("out_0: ${out_0}") + def unusedRowsetsCount = out_0.trim().split(":")[1].trim().toInteger() + assertEquals(0, unusedRowsetsCount) + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + } +} From 2ab707268b7d2d092ff99438e37166721f4675a4 Mon Sep 17 00:00:00 2001 From: Lei Zhang Date: Fri, 13 Jun 2025 14:37:33 +0800 Subject: [PATCH 2/2] fix some comments --- be/src/cloud/cloud_tablet.cpp | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 2aca00c156f775..2b86d76cf31aad 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -64,6 +64,7 @@ bvar::LatencyRecorder g_base_compaction_get_delete_bitmap_lock_time_ms( "base_compaction_get_delete_bitmap_lock_time_ms"); bvar::Adder g_unused_rowsets_count("unused_rowsets_count"); +bvar::Adder g_unused_rowsets_bytes("unused_rowsets_bytes"); static constexpr int LOAD_INITIATOR_ID = -1; @@ -350,10 +351,17 @@ void CloudTablet::add_rowsets(std::vector to_add, bool version_ std::vector unused_rowsets; if (auto find_it = _rs_version_map.find(rs->version()); find_it != _rs_version_map.end()) { - DCHECK(find_it->second->rowset_id() != rs->rowset_id()) - << "tablet_id=" << tablet_id() - << ", rowset_id=" << rs->rowset_id().to_string() - << ", existed rowset_id=" << find_it->second->rowset_id().to_string(); + if (find_it->second->rowset_id() == rs->rowset_id()) { + LOG(WARNING) << "tablet_id=" << tablet_id() + << ", rowset_id=" << rs->rowset_id().to_string() + << ", existed rowset_id=" + << find_it->second->rowset_id().to_string(); + DCHECK(find_it->second->rowset_id() != rs->rowset_id()) + << "tablet_id=" << tablet_id() + << ", rowset_id=" << rs->rowset_id().to_string() + << ", existed rowset_id=" + << find_it->second->rowset_id().to_string(); + } unused_rowsets.push_back(find_it->second); } add_unused_rowsets(unused_rowsets); @@ -508,6 +516,7 @@ void CloudTablet::add_unused_rowsets(const std::vector& rowsets std::lock_guard lock(_gc_mutex); for (const auto& rowset : rowsets) { _unused_rowsets[rowset->rowset_id()] = rowset; + g_unused_rowsets_bytes << rowset->total_disk_size(); } g_unused_rowsets_count << rowsets.size(); } @@ -529,8 +538,9 @@ void CloudTablet::remove_unused_rowsets() { } tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version()); rs->clear_cache(); - it = _unused_rowsets.erase(it); g_unused_rowsets_count << -1; + g_unused_rowsets_bytes << -rs->total_disk_size(); + it = _unused_rowsets.erase(it); removed_rowsets_num++; } @@ -556,14 +566,11 @@ void CloudTablet::remove_unused_rowsets() { removed_delete_bitmap_num++; } - if (removed_rowsets_num > 0 || removed_delete_bitmap_num > 0) { - LOG(INFO) << "tablet_id=" << tablet_id() - << ", unused_rowset size=" << _unused_rowsets.size() - << ", unused_delete_bitmap size=" << _unused_delete_bitmap.size() - << ", removed_rowsets_num=" << removed_rowsets_num - << ", removed_delete_bitmap_num=" << removed_delete_bitmap_num - << ", cost(us)=" << watch.get_elapse_time_us(); - } + LOG(INFO) << "tablet_id=" << tablet_id() << ", unused_rowset size=" << _unused_rowsets.size() + << ", unused_delete_bitmap size=" << _unused_delete_bitmap.size() + << ", removed_rowsets_num=" << removed_rowsets_num + << ", removed_delete_bitmap_num=" << removed_delete_bitmap_num + << ", cost(us)=" << watch.get_elapse_time_us(); } void CloudTablet::update_base_size(const Rowset& rs) {