From 333ac0aa49b3fabd203c2c746d6debd3bb7ca942 Mon Sep 17 00:00:00 2001 From: deardeng Date: Fri, 3 Jan 2025 19:43:11 +0800 Subject: [PATCH 1/5] [Feature](cloud) Support clean tablet file cache when tablet drop --- be/src/agent/task_worker_pool.cpp | 19 +++++++++++++++---- be/src/cloud/cloud_tablet.h | 4 ++-- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index a8ab93de455c3b..b7619ef454f4ae 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1657,11 +1657,22 @@ void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& r .tag("tablet_id", drop_tablet_req.tablet_id); return; }); - // 1. erase lru from tablet mgr - // TODO(dx) clean tablet file cache - // get tablet's info(such as cachekey, tablet id, rsid) + auto weak_tablets = engine.tablet_mgr().get_weak_tablets(); + auto it = std::find_if( + weak_tablets.begin(), weak_tablets.end(), + [tablet_id = drop_tablet_req.tablet_id](const std::weak_ptr& weak_tablet) { + if (auto tablet = weak_tablet.lock()) { + return tablet->tablet_id() == tablet_id; + } + return false; + }); + + if (it != weak_tablets.end()) { + if (auto tablet = it->lock()) { + CloudTablet::recycle_cached_data(tablet->get_snapshot_rowset(true)); + } + } engine.tablet_mgr().erase_tablet(drop_tablet_req.tablet_id); - // 2. gen clean file cache task return; } diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index c876518d868a49..2aa031ad503f43 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -210,12 +210,12 @@ class CloudTablet final : public BaseTablet { void build_tablet_report_info(TTabletInfo* tablet_info); + static void recycle_cached_data(const std::vector& rowsets); + private: // FIXME(plat1ko): No need to record base size if rowsets are ordered by version void update_base_size(const Rowset& rs); - static void recycle_cached_data(const std::vector& rowsets); - Status sync_if_not_running(); CloudStorageEngine& _engine; From bcbf8511af84b0dc02fdf717af1a5c6e9244148b Mon Sep 17 00:00:00 2001 From: deardeng Date: Tue, 7 Jan 2025 17:08:33 +0800 Subject: [PATCH 2/5] fix --- be/src/agent/task_worker_pool.cpp | 48 ++++-- be/src/cloud/cloud_tablet.cpp | 2 +- .../apache/doris/regression/util/Http.groovy | 2 +- ..._clean_tablet_when_drop_force_table.groovy | 150 ++++++++++++++++-- 4 files changed, 179 insertions(+), 23 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index b7619ef454f4ae..d1ee097ed833bc 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1657,22 +1657,46 @@ void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& r .tag("tablet_id", drop_tablet_req.tablet_id); return; }); + MonotonicStopWatch watch; + watch.start(); auto weak_tablets = engine.tablet_mgr().get_weak_tablets(); - auto it = std::find_if( - weak_tablets.begin(), weak_tablets.end(), - [tablet_id = drop_tablet_req.tablet_id](const std::weak_ptr& weak_tablet) { - if (auto tablet = weak_tablet.lock()) { - return tablet->tablet_id() == tablet_id; - } - return false; - }); - - if (it != weak_tablets.end()) { - if (auto tablet = it->lock()) { - CloudTablet::recycle_cached_data(tablet->get_snapshot_rowset(true)); + std::ostringstream rowset_ids_stream; + bool found = false; + for (auto& weak_tablet : weak_tablets) { + auto tablet = weak_tablet.lock(); + if (tablet == nullptr) { + continue; + } + if (tablet->tablet_id() != drop_tablet_req.tablet_id) { + continue; + } + found = true; + auto clean_rowsets = tablet->get_snapshot_rowset(true); + // Get first 10 rowset IDs as comma-separated string, just for log + int count = 0; + for (const auto& rowset : clean_rowsets) { + if (count >= 10) break; + if (count > 0) { + rowset_ids_stream << ","; + } + rowset_ids_stream << rowset->rowset_id().to_string(); + count++; } + + CloudTablet::recycle_cached_data(std::move(clean_rowsets)); + break; } + + if (!found) { + LOG(WARNING) << "tablet not found when dropping tablet_id=" << drop_tablet_req.tablet_id + << ", cost " << static_cast(watch.elapsed_time()) / 1e9 << "(s)"; + return; + } + engine.tablet_mgr().erase_tablet(drop_tablet_req.tablet_id); + LOG(INFO) << "drop cloud tablet_id=" << drop_tablet_req.tablet_id + << " and clean file cache first 10 rowsets {" << rowset_ids_stream.str() << "}, cost " + << static_cast(watch.elapsed_time()) / 1e9 << "(s)"; return; } diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index f2f21162bf05b5..fd373fe49af730 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -456,7 +456,7 @@ void CloudTablet::recycle_cached_data(const std::vector& rowset if (config::enable_file_cache) { for (const auto& rs : rowsets) { - if (rs.use_count() >= 1) { + if (rs.use_count() > 1) { LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has " << rs.use_count() << " references. File Cache won't be recycled when query is using it."; diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Http.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Http.groovy index cd688a1fcfc062..2a63f8763df80f 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Http.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Http.groovy @@ -53,7 +53,7 @@ class Http { conn.setRequestProperty('Authorization', 'Basic cm9vdDo=') //token for root def code = conn.responseCode def text = conn.content.text - logger.info("http post url=${url}, isJson=${isJson}, response code=${code}, text=${text}") + logger.info("http get url=${url}, isJson=${isJson}, response code=${code}, text=${text}") Assert.assertEquals(200, code) if (isJson) { def json = new JsonSlurper() diff --git a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy index 4dc847d603a324..72ef7d68e9f24b 100644 --- a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy +++ b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy @@ -31,7 +31,9 @@ suite('test_clean_tablet_when_drop_force_table', 'docker') { 'rehash_tablet_after_be_dead_seconds=5' ] options.beConfigs += [ - 'report_tablet_interval_seconds=1' + 'report_tablet_interval_seconds=1', + 'write_buffer_size=10240', + 'write_buffer_size_for_agg=10240' ] options.setFeNum(3) options.setBeNum(3) @@ -50,10 +52,10 @@ suite('test_clean_tablet_when_drop_force_table', 'docker') { def getTabletAndBeHostFromFe = { table -> def result = sql_return_maparray """SHOW TABLETS FROM $table""" def bes = backendIdToHost.call() - // tablet : host + // tablet : [backendId, host] def ret = [:] result.each { - ret[it.TabletId] = bes[it.BackendId] + ret[it.TabletId] = [it.BackendId, bes[it.BackendId]] } ret } @@ -72,10 +74,69 @@ suite('test_clean_tablet_when_drop_force_table', 'docker') { ret } + // get rowset_id segment_id from ms + // curl '175.40.101.1:5000/MetaService/http/get_value?token=greedisgood9999&unicode&key_type=MetaRowsetKey&instance_id=default_instance_id&tablet_id=27700&version=2' + def getSegmentFilesFromMs = { msHttpPort, tabletId, version, check_func -> + httpTest { + endpoint msHttpPort + op "get" + uri "/MetaService/http/get_value?token=greedisgood9999&unicode&key_type=MetaRowsetKey&instance_id=default_instance_id&tablet_id=${tabletId}&version=${version}" + check check_func + } + } + + def getRowsetFileCacheDirFromBe = { beHttpPort, msHttpPort, tabletId, version -> + def hashValues = [] + def segmentFiles = [] + getSegmentFilesFromMs(msHttpPort, tabletId, version) { + respCode, body -> + def json = parseJson(body) + logger.info("get tablet {} version {} from ms, response {}", tabletId, version, json) + // {"rowset_id":"0","partition_id":"27695","tablet_id":"27700","txn_id":"7057526525952","tablet_schema_hash":0,"rowset_type":"BETA_ROWSET","rowset_state":"COMMITTED","start_version":"3","end_version":"3","version_hash":"0","num_rows":"1","total_disk_size":"895","data_disk_size":"895","index_disk_size":"0","empty":false,"load_id":{"hi":"-1646598626735601581","lo":"-6677682539881484579"},"delete_flag":false,"creation_time":"1736153402","num_segments":"1","rowset_id_v2":"0200000000000004694889e84c76391cfd52ec7db0a483ba","resource_id":"1","newest_write_timestamp":"1736153402","segments_key_bounds":[{"min_key":"AoAAAAAAAAAC","max_key":"AoAAAAAAAAAC"}],"txn_expiration":"1736167802","segments_overlap_pb":"NONOVERLAPPING","compaction_level":"0","segments_file_size":["895"],"index_id":"27697","schema_version":0,"enable_segments_file_size":true,"has_variant_type_in_schema":false,"enable_inverted_index_file_info":false} + def segmentNum = json.num_segments as int + def rowsetId = json.rowset_id_v2 as String + segmentFiles = (0.. "${rowsetId}_${i}.dat" } + } + + segmentFiles.each { + // curl '175.40.51.3:8040/api/file_cache?op=hash&value=0200000000000004694889e84c76391cfd52ec7db0a483ba_0.dat' + def data = Http.GET("http://${beHttpPort}/api/file_cache?op=hash&value=${it}", true) + // {"hash":"2b79c649a1766dad371054ee168f0574"} + logger.info("get tablet {} segmentFile {}, response {}", tabletId, it, data) + hashValues << data.hash + } + hashValues + } + + // get table's tablet file cache + def getTabletFileCacheDirFromBe = { msHttpPort, table, version -> + // beHost HashFile + def beHostToHashFile = [:] + + def getTabletsAndHostFromFe = getTabletAndBeHostFromFe(table) + getTabletsAndHostFromFe.each { + def beHost = it.Value[1] + def tabletId = it.Key + def hashRet = getRowsetFileCacheDirFromBe(beHost + ":8040", msHttpPort, tabletId, version) + hashRet.each { + def hashFile = it + if (beHostToHashFile.containsKey(beHost)) { + beHostToHashFile[beHost].add(hashFile) + } else { + beHostToHashFile[beHost] = [hashFile] + } + } + } + beHostToHashFile + } + def testCase = { table, waitTime, useDp=false-> + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort sql """CREATE TABLE $table ( `k1` int(11) NULL, - `k2` int(11) NULL + `k2` int(11) NULL, + `v1` VARCHAR(2048) ) DUPLICATE KEY(`k1`, `k2`) COMMENT 'OLAP' @@ -84,23 +145,60 @@ suite('test_clean_tablet_when_drop_force_table', 'docker') { "replication_num"="1" ); """ + def random = new Random() + def generateRandomString = { int length -> + random.with { + def chars = ('A'..'Z').collect() + ('a'..'z').collect() + ('0'..'9').collect() + (1..length).collect { chars[nextInt(chars.size())] }.join('') + } + } + def valuesList = (1..30000).collect { i -> + def randomStr = generateRandomString(2000) + "($i, $i, '$randomStr')" + }.join(", ") sql """ - insert into $table values (1, 1), (2, 2), (3, 3) + set global max_allowed_packet = 1010241024 + """ + + context.reconnectFe() + sql """ + insert into $table values ${valuesList} """ for (int i = 0; i < 5; i++) { sql """ - select * from $table + select count(*) from $table """ } + valuesList = (30001..60000).collect { i -> + def randomStr = generateRandomString(2000) + "($i, $i, '$randomStr')" + }.join(", ") + sql """ + set global max_allowed_packet = 1010241024 + """ + context.reconnectFe() + sql """ + insert into $table values ${valuesList} + """ + // before drop table force def beforeGetFromFe = getTabletAndBeHostFromFe(table) def beforeGetFromBe = getTabletAndBeHostFromBe.call() - logger.info("fe tablets {}, be tablets {}", beforeGetFromFe, beforeGetFromBe) + // version 2 + def cacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + // version 3 + def cacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 3) + + def mergedCacheDir = cacheDirVersion2 + cacheDirVersion3.collectEntries { host, hashFiles -> + [(host): cacheDirVersion2[host] ? (cacheDirVersion2[host] + hashFiles) : hashFiles] + } + + logger.info("fe tablets {}, be tablets {}, cache dir {}", beforeGetFromFe, beforeGetFromBe, mergedCacheDir) beforeGetFromFe.each { assertTrue(beforeGetFromBe.containsKey(it.Key)) - assertEquals(beforeGetFromBe[it.Key], it.Value) + assertEquals(beforeGetFromBe[it.Key], it.Value[1]) } if (useDp) { GetDebugPoint().enableDebugPointForAllBEs("WorkPoolCloudDropTablet.drop_tablet_callback.failed") @@ -119,7 +217,7 @@ suite('test_clean_tablet_when_drop_force_table', 'docker') { } def start = System.currentTimeMillis() / 1000 // tablet can't find in be - dockerAwaitUntil(50) { + dockerAwaitUntil(500) { def beTablets = getTabletAndBeHostFromBe.call().keySet() logger.info("before drop tablets {}, after tablets {}", beforeGetFromFe, beTablets) beforeGetFromFe.keySet().every { !getTabletAndBeHostFromBe.call().containsKey(it) } @@ -129,6 +227,40 @@ suite('test_clean_tablet_when_drop_force_table', 'docker') { if (useDp) { futrue.get() } + + sleep(25 * 1000) + + // check cache file has been deleted + beforeGetFromFe.each { + def tabletId = it.Key + def backendId = it.Value[0] + def backendHost = it.Value[1] + def be = cluster.getBeByBackendId(backendId.toLong()) + def dataPath = new File("${be.path}/storage/file_cache") + def subDirs = [] + + def collectDirs + collectDirs = { File dir -> + if (dir.exists()) { + dir.eachDir { subDir -> + subDirs << subDir.name + collectDirs(subDir) + } + } + } + + collectDirs(dataPath) + logger.info("BE {} file_cache subdirs: {}", backendHost, subDirs) + def cacheDir = mergedCacheDir[backendHost] + + // add check + cacheDir.each { hashFile -> + assertFalse(subDirs.any { subDir -> subDir.startsWith(hashFile) }, + "Found unexpected cache file pattern ${hashFile} in BE ${backendHost}'s file_cache directory. " + + "Matching subdir found in: ${subDirs}") + } + } + } docker(options) { From e8cd41fc477a9890733561ba3f8ba3424196788c Mon Sep 17 00:00:00 2001 From: deardeng Date: Tue, 7 Jan 2025 21:10:20 +0800 Subject: [PATCH 3/5] add case --- .../doris/regression/suite/Suite.groovy | 91 +++++++++++++++++ ..._clean_tablet_when_drop_force_table.groovy | 98 +------------------ .../test_clean_tablet_when_rebalance.groovy | 46 ++------- 3 files changed, 101 insertions(+), 134 deletions(-) diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 20868cbb4ccb96..e32ed700e0efd2 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -49,6 +49,7 @@ import org.apache.doris.regression.action.HttpCliAction import org.apache.doris.regression.util.DataUtils import org.apache.doris.regression.util.JdbcUtils import org.apache.doris.regression.util.Hdfs +import org.apache.doris.regression.util.Http import org.apache.doris.regression.util.SuiteUtils import org.apache.doris.regression.util.DebugPoint import org.apache.doris.regression.RunMode @@ -2766,4 +2767,94 @@ class Suite implements GroovyInterceptable { assertEquals(re_fe, re_be) assertEquals(re_fe, re_no_fold) } + + def backendIdToHost = { -> + def spb = sql_return_maparray """SHOW BACKENDS""" + def beIdToHost = [:] + spb.each { + beIdToHost[it.BackendId] = it.Host + } + beIdToHost + } + + def getTabletAndBeHostFromBe = { bes -> + def ret = [:] + bes.each { be -> + // {"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3} + def data = Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all", true).data + def tablets = data.tablets.collect { it.tablet_id as String } + tablets.each{ + ret[it] = data.host + } + } + ret + } + + def getTabletAndBeHostFromFe = { table -> + def result = sql_return_maparray """SHOW TABLETS FROM $table""" + def bes = backendIdToHost.call() + // tablet : [backendId, host] + def ret = [:] + result.each { + ret[it.TabletId] = [it.BackendId, bes[it.BackendId]] + } + ret + } + + // get rowset_id segment_id from ms + // curl '175.40.101.1:5000/MetaService/http/get_value?token=greedisgood9999&unicode&key_type=MetaRowsetKey&instance_id=default_instance_id&tablet_id=27700&version=2' + def getSegmentFilesFromMs = { msHttpPort, tabletId, version, check_func -> + httpTest { + endpoint msHttpPort + op "get" + uri "/MetaService/http/get_value?token=greedisgood9999&unicode&key_type=MetaRowsetKey&instance_id=default_instance_id&tablet_id=${tabletId}&version=${version}" + check check_func + } + } + + def getRowsetFileCacheDirFromBe = { beHttpPort, msHttpPort, tabletId, version -> + def hashValues = [] + def segmentFiles = [] + getSegmentFilesFromMs(msHttpPort, tabletId, version) { + respCode, body -> + def json = parseJson(body) + logger.info("get tablet {} version {} from ms, response {}", tabletId, version, json) + // {"rowset_id":"0","partition_id":"27695","tablet_id":"27700","txn_id":"7057526525952","tablet_schema_hash":0,"rowset_type":"BETA_ROWSET","rowset_state":"COMMITTED","start_version":"3","end_version":"3","version_hash":"0","num_rows":"1","total_disk_size":"895","data_disk_size":"895","index_disk_size":"0","empty":false,"load_id":{"hi":"-1646598626735601581","lo":"-6677682539881484579"},"delete_flag":false,"creation_time":"1736153402","num_segments":"1","rowset_id_v2":"0200000000000004694889e84c76391cfd52ec7db0a483ba","resource_id":"1","newest_write_timestamp":"1736153402","segments_key_bounds":[{"min_key":"AoAAAAAAAAAC","max_key":"AoAAAAAAAAAC"}],"txn_expiration":"1736167802","segments_overlap_pb":"NONOVERLAPPING","compaction_level":"0","segments_file_size":["895"],"index_id":"27697","schema_version":0,"enable_segments_file_size":true,"has_variant_type_in_schema":false,"enable_inverted_index_file_info":false} + def segmentNum = json.num_segments as int + def rowsetId = json.rowset_id_v2 as String + segmentFiles = (0.. "${rowsetId}_${i}.dat" } + } + + segmentFiles.each { + // curl '175.40.51.3:8040/api/file_cache?op=hash&value=0200000000000004694889e84c76391cfd52ec7db0a483ba_0.dat' + def data = Http.GET("http://${beHttpPort}/api/file_cache?op=hash&value=${it}", true) + // {"hash":"2b79c649a1766dad371054ee168f0574"} + logger.info("get tablet {} segmentFile {}, response {}", tabletId, it, data) + hashValues << data.hash + } + hashValues + } + + // get table's tablet file cache + def getTabletFileCacheDirFromBe = { msHttpPort, table, version -> + // beHost HashFile + def beHostToHashFile = [:] + + def getTabletsAndHostFromFe = getTabletAndBeHostFromFe(table) + getTabletsAndHostFromFe.each { + def beHost = it.Value[1] + def tabletId = it.Key + def hashRet = getRowsetFileCacheDirFromBe(beHost + ":8040", msHttpPort, tabletId, version) + hashRet.each { + def hashFile = it + if (beHostToHashFile.containsKey(beHost)) { + beHostToHashFile[beHost].add(hashFile) + } else { + beHostToHashFile[beHost] = [hashFile] + } + } + } + beHostToHashFile + } + } diff --git a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy index 72ef7d68e9f24b..a65f59f85a1ee4 100644 --- a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy +++ b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy @@ -16,7 +16,7 @@ // under the License. import org.apache.doris.regression.suite.ClusterOptions -import org.apache.doris.regression.util.Http + suite('test_clean_tablet_when_drop_force_table', 'docker') { if (!isCloudMode()) { @@ -40,96 +40,6 @@ suite('test_clean_tablet_when_drop_force_table', 'docker') { options.cloudMode = true options.enableDebugPoints() - def backendIdToHost = { -> - def spb = sql_return_maparray """SHOW BACKENDS""" - def beIdToHost = [:] - spb.each { - beIdToHost[it.BackendId] = it.Host - } - beIdToHost - } - - def getTabletAndBeHostFromFe = { table -> - def result = sql_return_maparray """SHOW TABLETS FROM $table""" - def bes = backendIdToHost.call() - // tablet : [backendId, host] - def ret = [:] - result.each { - ret[it.TabletId] = [it.BackendId, bes[it.BackendId]] - } - ret - } - - def getTabletAndBeHostFromBe = { -> - def bes = cluster.getAllBackends() - def ret = [:] - bes.each { be -> - // {"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3} - def data = Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all", true).data - def tablets = data.tablets.collect { it.tablet_id as String } - tablets.each{ - ret[it] = data.host - } - } - ret - } - - // get rowset_id segment_id from ms - // curl '175.40.101.1:5000/MetaService/http/get_value?token=greedisgood9999&unicode&key_type=MetaRowsetKey&instance_id=default_instance_id&tablet_id=27700&version=2' - def getSegmentFilesFromMs = { msHttpPort, tabletId, version, check_func -> - httpTest { - endpoint msHttpPort - op "get" - uri "/MetaService/http/get_value?token=greedisgood9999&unicode&key_type=MetaRowsetKey&instance_id=default_instance_id&tablet_id=${tabletId}&version=${version}" - check check_func - } - } - - def getRowsetFileCacheDirFromBe = { beHttpPort, msHttpPort, tabletId, version -> - def hashValues = [] - def segmentFiles = [] - getSegmentFilesFromMs(msHttpPort, tabletId, version) { - respCode, body -> - def json = parseJson(body) - logger.info("get tablet {} version {} from ms, response {}", tabletId, version, json) - // {"rowset_id":"0","partition_id":"27695","tablet_id":"27700","txn_id":"7057526525952","tablet_schema_hash":0,"rowset_type":"BETA_ROWSET","rowset_state":"COMMITTED","start_version":"3","end_version":"3","version_hash":"0","num_rows":"1","total_disk_size":"895","data_disk_size":"895","index_disk_size":"0","empty":false,"load_id":{"hi":"-1646598626735601581","lo":"-6677682539881484579"},"delete_flag":false,"creation_time":"1736153402","num_segments":"1","rowset_id_v2":"0200000000000004694889e84c76391cfd52ec7db0a483ba","resource_id":"1","newest_write_timestamp":"1736153402","segments_key_bounds":[{"min_key":"AoAAAAAAAAAC","max_key":"AoAAAAAAAAAC"}],"txn_expiration":"1736167802","segments_overlap_pb":"NONOVERLAPPING","compaction_level":"0","segments_file_size":["895"],"index_id":"27697","schema_version":0,"enable_segments_file_size":true,"has_variant_type_in_schema":false,"enable_inverted_index_file_info":false} - def segmentNum = json.num_segments as int - def rowsetId = json.rowset_id_v2 as String - segmentFiles = (0.. "${rowsetId}_${i}.dat" } - } - - segmentFiles.each { - // curl '175.40.51.3:8040/api/file_cache?op=hash&value=0200000000000004694889e84c76391cfd52ec7db0a483ba_0.dat' - def data = Http.GET("http://${beHttpPort}/api/file_cache?op=hash&value=${it}", true) - // {"hash":"2b79c649a1766dad371054ee168f0574"} - logger.info("get tablet {} segmentFile {}, response {}", tabletId, it, data) - hashValues << data.hash - } - hashValues - } - - // get table's tablet file cache - def getTabletFileCacheDirFromBe = { msHttpPort, table, version -> - // beHost HashFile - def beHostToHashFile = [:] - - def getTabletsAndHostFromFe = getTabletAndBeHostFromFe(table) - getTabletsAndHostFromFe.each { - def beHost = it.Value[1] - def tabletId = it.Key - def hashRet = getRowsetFileCacheDirFromBe(beHost + ":8040", msHttpPort, tabletId, version) - hashRet.each { - def hashFile = it - if (beHostToHashFile.containsKey(beHost)) { - beHostToHashFile[beHost].add(hashFile) - } else { - beHostToHashFile[beHost] = [hashFile] - } - } - } - beHostToHashFile - } - def testCase = { table, waitTime, useDp=false-> def ms = cluster.getAllMetaservices().get(0) def msHttpPort = ms.host + ":" + ms.httpPort @@ -185,7 +95,7 @@ suite('test_clean_tablet_when_drop_force_table', 'docker') { // before drop table force def beforeGetFromFe = getTabletAndBeHostFromFe(table) - def beforeGetFromBe = getTabletAndBeHostFromBe.call() + def beforeGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) // version 2 def cacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) // version 3 @@ -218,9 +128,9 @@ suite('test_clean_tablet_when_drop_force_table', 'docker') { def start = System.currentTimeMillis() / 1000 // tablet can't find in be dockerAwaitUntil(500) { - def beTablets = getTabletAndBeHostFromBe.call().keySet() + def beTablets = getTabletAndBeHostFromBe(cluster.getAllBackends()).keySet() logger.info("before drop tablets {}, after tablets {}", beforeGetFromFe, beTablets) - beforeGetFromFe.keySet().every { !getTabletAndBeHostFromBe.call().containsKey(it) } + beforeGetFromFe.keySet().every { !getTabletAndBeHostFromBe(cluster.getAllBackends()).containsKey(it) } } logger.info("table {}, cost {}s", table, System.currentTimeMillis() / 1000 - start) assertTrue(System.currentTimeMillis() / 1000 - start > waitTime) diff --git a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy index 4a44b317cc2233..4f63c61950114b 100644 --- a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy +++ b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy @@ -42,40 +42,6 @@ suite('test_clean_tablet_when_rebalance', 'docker') { def choseDeadBeIndex = 1 def table = "test_clean_tablet_when_rebalance" - def backendIdToHost = { -> - def spb = sql_return_maparray """SHOW BACKENDS""" - def beIdToHost = [:] - spb.each { - beIdToHost[it.BackendId] = it.Host - } - beIdToHost - } - - def getTabletAndBeHostFromFe = { -> - def result = sql_return_maparray """SHOW TABLETS FROM $table""" - def bes = backendIdToHost.call() - // tablet : host - def ret = [:] - result.each { - ret[it.TabletId] = bes[it.BackendId] - } - ret - } - - def getTabletAndBeHostFromBe = { -> - def bes = cluster.getAllBackends() - def ret = [:] - bes.each { be -> - // {"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3} - def data = Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all", true).data - def tablets = data.tablets.collect { it.tablet_id as String } - tablets.each{ - ret[it] = data.host - } - } - ret - } - def testCase = { deadTime -> boolean beDeadLong = deadTime > rehashTime ? true : false logger.info("begin exec beDeadLong {}", beDeadLong) @@ -86,12 +52,12 @@ suite('test_clean_tablet_when_rebalance', 'docker') { """ } - def beforeGetFromFe = getTabletAndBeHostFromFe() - def beforeGetFromBe = getTabletAndBeHostFromBe.call() + def beforeGetFromFe = getTabletAndBeHostFromFe(table) + def beforeGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) logger.info("before fe tablets {}, be tablets {}", beforeGetFromFe, beforeGetFromBe) beforeGetFromFe.each { assertTrue(beforeGetFromBe.containsKey(it.Key)) - assertEquals(beforeGetFromBe[it.Key], it.Value) + assertEquals(beforeGetFromBe[it.Key], it.Value[1]) } cluster.stopBackends(choseDeadBeIndex) @@ -125,12 +91,12 @@ suite('test_clean_tablet_when_rebalance', 'docker') { """ sleep(1000) } - beforeGetFromFe = getTabletAndBeHostFromFe() - beforeGetFromBe = getTabletAndBeHostFromBe.call() + beforeGetFromFe = getTabletAndBeHostFromFe(table) + beforeGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) logger.info("after fe tablets {}, be tablets {}", beforeGetFromFe, beforeGetFromBe) beforeGetFromFe.each { assertTrue(beforeGetFromBe.containsKey(it.Key)) - assertEquals(beforeGetFromBe[it.Key], it.Value) + assertEquals(beforeGetFromBe[it.Key], it.Value[1]) } } From ecfaf53340a114da632778a02eff56b6a04ca005 Mon Sep 17 00:00:00 2001 From: deardeng Date: Wed, 8 Jan 2025 19:44:17 +0800 Subject: [PATCH 4/5] fix --- be/src/cloud/cloud_tablet.cpp | 6 +- .../test_clean_tablet_when_rebalance.groovy | 83 +++++++++++++++---- 2 files changed, 71 insertions(+), 18 deletions(-) diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index fd373fe49af730..354a32328cf165 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -386,6 +386,7 @@ void CloudTablet::delete_rowsets(const std::vector& to_delete, uint64_t CloudTablet::delete_expired_stale_rowsets() { std::vector expired_rowsets; + std::vector stale_rowsets; int64_t expired_stale_sweep_endtime = ::time(nullptr) - config::tablet_rowset_stale_sweep_time_sec; std::vector version_to_delete; @@ -409,10 +410,10 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() { auto rs_it = _stale_rs_version_map.find(v_ts->version()); if (rs_it != _stale_rs_version_map.end()) { expired_rowsets.push_back(rs_it->second); + stale_rowsets.push_back(rs_it->second); LOG(INFO) << "erase stale rowset, tablet_id=" << tablet_id() << " rowset_id=" << rs_it->second->rowset_id().to_string() << " version=" << rs_it->first.to_string(); - _stale_rs_version_map.erase(rs_it); } else { LOG(WARNING) << "cannot find stale rowset " << v_ts->version() << " in tablet " << tablet_id(); @@ -456,7 +457,8 @@ void CloudTablet::recycle_cached_data(const std::vector& rowset if (config::enable_file_cache) { for (const auto& rs : rowsets) { - if (rs.use_count() > 1) { + // rowsets vector save ptr and tablet._rs_version_map save ptr, so at least 2 + if (rs.use_count() > 2) { LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has " << rs.use_count() << " references. File Cache won't be recycled when query is using it."; diff --git a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy index 4f63c61950114b..30a20a48ada546 100644 --- a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy +++ b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy @@ -32,7 +32,9 @@ suite('test_clean_tablet_when_rebalance', 'docker') { ] options.feConfigs.add("rehash_tablet_after_be_dead_seconds=$rehashTime") options.beConfigs += [ - 'report_tablet_interval_seconds=1' + 'report_tablet_interval_seconds=1', + 'write_buffer_size=10240', + 'write_buffer_size_for_agg=10240' ] options.setFeNum(3) options.setBeNum(3) @@ -42,13 +44,13 @@ suite('test_clean_tablet_when_rebalance', 'docker') { def choseDeadBeIndex = 1 def table = "test_clean_tablet_when_rebalance" - def testCase = { deadTime -> + def testCase = { deadTime, mergedCacheDir -> boolean beDeadLong = deadTime > rehashTime ? true : false logger.info("begin exec beDeadLong {}", beDeadLong) for (int i = 0; i < 5; i++) { sql """ - select * from $table + select count(*) from $table """ } @@ -86,24 +88,72 @@ suite('test_clean_tablet_when_rebalance', 'docker') { bes.size() == (beDeadLong ? 2 : 3) } for (int i = 0; i < 5; i++) { + sleep(2000) sql """ - select * from $table + select count(*) from $table """ - sleep(1000) } - beforeGetFromFe = getTabletAndBeHostFromFe(table) - beforeGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) - logger.info("after fe tablets {}, be tablets {}", beforeGetFromFe, beforeGetFromBe) - beforeGetFromFe.each { - assertTrue(beforeGetFromBe.containsKey(it.Key)) - assertEquals(beforeGetFromBe[it.Key], it.Value[1]) + def afterGetFromFe = getTabletAndBeHostFromFe(table) + def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + logger.info("after fe tablets {}, be tablets {}", afterGetFromFe, afterGetFromBe) + afterGetFromFe.each { + assertTrue(afterGetFromBe.containsKey(it.Key)) + assertEquals(afterGetFromBe[it.Key], it.Value[1]) + } + + // TODO(freemandealer) + // Once the freemandealer implements file cache cleanup during restart, and after enabling the current rebalance, the tablets in the tablet manager will be cleared, but the file cache will not be cleaned up. + /* + if (beDeadLong) { + // check tablet file cache has been deleted + // after fe tablets {10309=[10003, 175.41.51.3], 10311=[10002, 175.41.51.2], 10313=[10003, 175.41.51.3]}, + afterGetFromFe.each { + logger.info("tablet_id {}, before host {}, after host {}", it.Key, beforeGetFromFe[it.Key][1], it.Value[1]) + if (beforeGetFromFe[it.Key][1] == it.Value[1]) { + return + } + logger.info("tablet_id {} has been reblanced from {} to {}", it.Key, beforeGetFromFe[it.Key][1], it.Value[1]) + // check before tablet file cache dir has been droped + + def tabletId = it.Key + def backendId = beforeGetFromFe[it.Key][0] + def backendHost = beforeGetFromFe[it.Key][1] + def be = cluster.getBeByBackendId(backendId.toLong()) + def dataPath = new File("${be.path}/storage/file_cache") + def subDirs = [] + + def collectDirs + collectDirs = { File dir -> + if (dir.exists()) { + dir.eachDir { subDir -> + subDirs << subDir.name + collectDirs(subDir) + } + } + } + + collectDirs(dataPath) + logger.info("BE {} file_cache subdirs: {}", backendHost, subDirs) + def cacheDir = mergedCacheDir[backendHost] + + // add check + cacheDir.each { hashFile -> + assertFalse(subDirs.any { subDir -> subDir.startsWith(hashFile) }, + "Found unexpected cache file pattern ${hashFile} in BE ${backendHost}'s file_cache directory. " + + "Matching subdir found in: ${subDirs}") + } + } } + */ } docker(options) { + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort sql """CREATE TABLE $table ( `k1` int(11) NULL, - `k2` int(11) NULL + `k2` int(11) NULL, + `v1` varchar(2048) ) DUPLICATE KEY(`k1`, `k2`) COMMENT 'OLAP' @@ -113,12 +163,13 @@ suite('test_clean_tablet_when_rebalance', 'docker') { ); """ sql """ - insert into $table values (1, 1), (2, 2), (3, 3) + insert into $table values (1, 1, 'v1'), (2, 2, 'v2'), (3, 3, 'v3') """ - // 'rehash_tablet_after_be_dead_seconds=10' + def cacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + // 'rehash_tablet_after_be_dead_seconds=100' // be-1 dead, but not dead for a long time - testCase(5) + testCase(5, cacheDirVersion2) // be-1 dead, and dead for a long time - testCase(200) + testCase(200, cacheDirVersion2) } } From 79d549207889eeb13dc09fcac949c6883368df30 Mon Sep 17 00:00:00 2001 From: deardeng Date: Thu, 9 Jan 2025 14:45:51 +0800 Subject: [PATCH 5/5] fix review --- be/src/cloud/cloud_tablet.cpp | 4 +- .../test_clean_stale_rs_file_cache.groovy | 129 ++++++++++++++++++ .../test_clean_tablet_when_rebalance.groovy | 2 +- 3 files changed, 133 insertions(+), 2 deletions(-) create mode 100644 regression-test/suites/cloud_p0/tablets/test_clean_stale_rs_file_cache.groovy diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 354a32328cf165..b8dd0eae4b736e 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -386,6 +386,7 @@ void CloudTablet::delete_rowsets(const std::vector& to_delete, uint64_t CloudTablet::delete_expired_stale_rowsets() { std::vector expired_rowsets; + // ATTN: trick, Use stale_rowsets to temporarily increase the reference count of the rowset shared pointer in _stale_rs_version_map so that in the recycle_cached_data function, it checks if the reference count is 2. std::vector stale_rowsets; int64_t expired_stale_sweep_endtime = ::time(nullptr) - config::tablet_rowset_stale_sweep_time_sec; @@ -414,6 +415,7 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() { LOG(INFO) << "erase stale rowset, tablet_id=" << tablet_id() << " rowset_id=" << rs_it->second->rowset_id().to_string() << " version=" << rs_it->first.to_string(); + _stale_rs_version_map.erase(rs_it); } else { LOG(WARNING) << "cannot find stale rowset " << v_ts->version() << " in tablet " << tablet_id(); @@ -457,7 +459,7 @@ void CloudTablet::recycle_cached_data(const std::vector& rowset if (config::enable_file_cache) { for (const auto& rs : rowsets) { - // rowsets vector save ptr and tablet._rs_version_map save ptr, so at least 2 + // rowsets and tablet._rs_version_map each hold a rowset shared_ptr, so at this point, the reference count of the shared_ptr is at least 2. if (rs.use_count() > 2) { LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has " << rs.use_count() diff --git a/regression-test/suites/cloud_p0/tablets/test_clean_stale_rs_file_cache.groovy b/regression-test/suites/cloud_p0/tablets/test_clean_stale_rs_file_cache.groovy new file mode 100644 index 00000000000000..8d41939981a812 --- /dev/null +++ b/regression-test/suites/cloud_p0/tablets/test_clean_stale_rs_file_cache.groovy @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.Http + +suite('test_clean_stale_rs_file_cache', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1' + ] + options.beConfigs += [ + 'report_tablet_interval_seconds=1', + 'cumulative_compaction_min_deltas=5', + 'tablet_rowset_stale_sweep_by_size=false', + 'tablet_rowset_stale_sweep_time_sec=60', + 'vacuum_stale_rowsets_interval_s=10' + ] + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = true + + def table = "test_clean_stale_rs_file_cache" + + docker(options) { + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort + sql """CREATE TABLE $table ( + `k1` int(11) NULL, + `k2` int(11) NULL, + `v1` varchar(2048) + ) + DUPLICATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_num"="1" + ); + """ + // version 2 + sql """ + insert into $table values (1, 1, 'v1'), (2, 2, 'v2'), (3, 3, 'v3') + """ + def cacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + // version 3 + sql """ + insert into $table values (10, 1, 'v1'), (20, 2, 'v2'), (30, 3, 'v3') + """ + def cacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + // version 4 + sql """ + insert into $table values (100, 1, 'v1'), (200, 2, 'v2'), (300, 3, 'v3') + """ + // version 5 + sql """ + insert into $table values (1000, 1, 'v1'), (2000, 2, 'v2'), (3000, 3, 'v3') + """ + // version 6 + sql """ + insert into $table values (10000, 1, 'v1'), (20000, 2, 'v2'), (30000, 3, 'v3') + """ + + def mergedCacheDir = cacheDirVersion2 + cacheDirVersion3.collectEntries { host, hashFiles -> + [(host): cacheDirVersion2[host] ? (cacheDirVersion2[host] + hashFiles) : hashFiles] + } + for (int i = 0; i < 5; i++) { + sql """ + select count(*) from $table + """ + } + def beforeGetFromFe = getTabletAndBeHostFromFe(table) + logger.info("fe tablets {}, cache dir {}", beforeGetFromFe , mergedCacheDir) + // wait compaction finish, and vacuum_stale_rowsets work + sleep(80 * 1000) + + // check cache file has been deleted + beforeGetFromFe.each { + def tabletId = it.Key + def backendId = it.Value[0] + def backendHost = it.Value[1] + def be = cluster.getBeByBackendId(backendId.toLong()) + def dataPath = new File("${be.path}/storage/file_cache") + def subDirs = [] + + def collectDirs + collectDirs = { File dir -> + if (dir.exists()) { + dir.eachDir { subDir -> + subDirs << subDir.name + collectDirs(subDir) + } + } + } + + + collectDirs(dataPath) + logger.info("BE {} file_cache subdirs: {}", backendHost, subDirs) + def cacheDir = mergedCacheDir[backendHost] + + // add check + cacheDir.each { hashFile -> + assertFalse(subDirs.any { subDir -> subDir.startsWith(hashFile) }, + "Found unexpected cache file pattern ${hashFile} in BE ${backendHost}'s file_cache directory. " + + "Matching subdir found in: ${subDirs}") + } + } + + } +} diff --git a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy index 30a20a48ada546..151de976a8303c 100644 --- a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy +++ b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy @@ -102,7 +102,7 @@ suite('test_clean_tablet_when_rebalance', 'docker') { } // TODO(freemandealer) - // Once the freemandealer implements file cache cleanup during restart, and after enabling the current rebalance, the tablets in the tablet manager will be cleared, but the file cache will not be cleaned up. + // Once the freemandealer implements file cache cleanup during restart, enabling lines 107 to 145 will allow testing to confirm that after the rebalance, the tablet file cache on the BE will be cleared. In the current implementation, after restarting the BE and triggering the rebalance, the tablets in the tablet manager will be cleared, but the file cache cannot be cleaned up. /* if (beDeadLong) { // check tablet file cache has been deleted