From c12a4a8ced57adaf0fc70b7e35fe33dfcdd7fb48 Mon Sep 17 00:00:00 2001 From: deardeng Date: Mon, 13 Jan 2025 11:25:50 +0800 Subject: [PATCH] [fix](cloud) Support clean tablet file cache when tablet drop (#46390) --- be/src/agent/task_worker_pool.cpp | 43 +++++- be/src/cloud/cloud_tablet.cpp | 6 +- be/src/cloud/cloud_tablet.h | 4 +- .../doris/regression/suite/Suite.groovy | 100 +++++++++++++ .../apache/doris/regression/util/Http.groovy | 2 +- .../test_clean_stale_rs_file_cache.groovy | 129 +++++++++++++++++ ..._clean_tablet_when_drop_force_table.groovy | 132 ++++++++++++------ .../test_clean_tablet_when_rebalance.groovy | 123 +++++++++------- 8 files changed, 433 insertions(+), 106 deletions(-) create mode 100644 regression-test/suites/cloud_p0/tablets/test_clean_stale_rs_file_cache.groovy diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 1adfc3b072f3d9..da6184ad6fd24f 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1669,11 +1669,46 @@ void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& r .tag("tablet_id", drop_tablet_req.tablet_id); return; }); - // 1. erase lru from tablet mgr - // TODO(dx) clean tablet file cache - // get tablet's info(such as cachekey, tablet id, rsid) + MonotonicStopWatch watch; + watch.start(); + auto weak_tablets = engine.tablet_mgr().get_weak_tablets(); + std::ostringstream rowset_ids_stream; + bool found = false; + for (auto& weak_tablet : weak_tablets) { + auto tablet = weak_tablet.lock(); + if (tablet == nullptr) { + continue; + } + if (tablet->tablet_id() != drop_tablet_req.tablet_id) { + continue; + } + found = true; + auto clean_rowsets = tablet->get_snapshot_rowset(true); + // Get first 10 rowset IDs as comma-separated string, just for log + int count = 0; + for (const auto& rowset : clean_rowsets) { + if (count >= 10) break; + if (count > 0) { + rowset_ids_stream << ","; + } + rowset_ids_stream << rowset->rowset_id().to_string(); + count++; + } + + CloudTablet::recycle_cached_data(std::move(clean_rowsets)); + break; + } + + if (!found) { + LOG(WARNING) << "tablet not found when dropping tablet_id=" << drop_tablet_req.tablet_id + << ", cost " << static_cast(watch.elapsed_time()) / 1e9 << "(s)"; + return; + } + engine.tablet_mgr().erase_tablet(drop_tablet_req.tablet_id); - // 2. gen clean file cache task + LOG(INFO) << "drop cloud tablet_id=" << drop_tablet_req.tablet_id + << " and clean file cache first 10 rowsets {" << rowset_ids_stream.str() << "}, cost " + << static_cast(watch.elapsed_time()) / 1e9 << "(s)"; return; } diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index e9693097d9b4ac..e3c01365bb19cd 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -386,6 +386,8 @@ void CloudTablet::delete_rowsets(const std::vector& to_delete, uint64_t CloudTablet::delete_expired_stale_rowsets() { std::vector expired_rowsets; + // ATTN: trick, Use stale_rowsets to temporarily increase the reference count of the rowset shared pointer in _stale_rs_version_map so that in the recycle_cached_data function, it checks if the reference count is 2. + std::vector stale_rowsets; int64_t expired_stale_sweep_endtime = ::time(nullptr) - config::tablet_rowset_stale_sweep_time_sec; std::vector version_to_delete; @@ -409,6 +411,7 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() { auto rs_it = _stale_rs_version_map.find(v_ts->version()); if (rs_it != _stale_rs_version_map.end()) { expired_rowsets.push_back(rs_it->second); + stale_rowsets.push_back(rs_it->second); LOG(INFO) << "erase stale rowset, tablet_id=" << tablet_id() << " rowset_id=" << rs_it->second->rowset_id().to_string() << " version=" << rs_it->first.to_string(); @@ -456,7 +459,8 @@ void CloudTablet::recycle_cached_data(const std::vector& rowset if (config::enable_file_cache) { for (const auto& rs : rowsets) { - if (rs.use_count() >= 1) { + // rowsets and tablet._rs_version_map each hold a rowset shared_ptr, so at this point, the reference count of the shared_ptr is at least 2. + if (rs.use_count() > 2) { LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has " << rs.use_count() << " references. File Cache won't be recycled when query is using it."; diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index 7897786eb0395f..2f442a6221ffce 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -209,12 +209,12 @@ class CloudTablet final : public BaseTablet { void build_tablet_report_info(TTabletInfo* tablet_info); + static void recycle_cached_data(const std::vector& rowsets); + private: // FIXME(plat1ko): No need to record base size if rowsets are ordered by version void update_base_size(const Rowset& rs); - static void recycle_cached_data(const std::vector& rowsets); - Status sync_if_not_running(); CloudStorageEngine& _engine; diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index e3edbe4951252c..2fea19456abaf4 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -49,6 +49,7 @@ import org.apache.doris.regression.action.HttpCliAction import org.apache.doris.regression.util.DataUtils import org.apache.doris.regression.util.JdbcUtils import org.apache.doris.regression.util.Hdfs +import org.apache.doris.regression.util.Http import org.apache.doris.regression.util.SuiteUtils import org.apache.doris.regression.util.DebugPoint import org.apache.doris.regression.RunMode @@ -2743,4 +2744,103 @@ class Suite implements GroovyInterceptable { scpFiles("root", be_ip, udf_file_path, udf_file_path, false) } } + + def check_fold_consistency = { test_sql -> + def re_fe = order_sql "select /*+SET_VAR(enable_fold_constant_by_be=false)*/ ${test_sql}" + def re_be = order_sql "select /*+SET_VAR(enable_fold_constant_by_be=true)*/ ${test_sql}" + def re_no_fold = order_sql "select /*+SET_VAR(debug_skip_fold_constant=true)*/ ${test_sql}" + logger.info("check sql: ${test_sql}") + assertEquals(re_fe, re_be) + assertEquals(re_fe, re_no_fold) + } + + def backendIdToHost = { -> + def spb = sql_return_maparray """SHOW BACKENDS""" + def beIdToHost = [:] + spb.each { + beIdToHost[it.BackendId] = it.Host + } + beIdToHost + } + + def getTabletAndBeHostFromBe = { bes -> + def ret = [:] + bes.each { be -> + // {"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3} + def data = Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all", true).data + def tablets = data.tablets.collect { it.tablet_id as String } + tablets.each{ + ret[it] = data.host + } + } + ret + } + + def getTabletAndBeHostFromFe = { table -> + def result = sql_return_maparray """SHOW TABLETS FROM $table""" + def bes = backendIdToHost.call() + // tablet : [backendId, host] + def ret = [:] + result.each { + ret[it.TabletId] = [it.BackendId, bes[it.BackendId]] + } + ret + } + + // get rowset_id segment_id from ms + // curl '175.40.101.1:5000/MetaService/http/get_value?token=greedisgood9999&unicode&key_type=MetaRowsetKey&instance_id=default_instance_id&tablet_id=27700&version=2' + def getSegmentFilesFromMs = { msHttpPort, tabletId, version, check_func -> + httpTest { + endpoint msHttpPort + op "get" + uri "/MetaService/http/get_value?token=greedisgood9999&unicode&key_type=MetaRowsetKey&instance_id=default_instance_id&tablet_id=${tabletId}&version=${version}" + check check_func + } + } + + def getRowsetFileCacheDirFromBe = { beHttpPort, msHttpPort, tabletId, version -> + def hashValues = [] + def segmentFiles = [] + getSegmentFilesFromMs(msHttpPort, tabletId, version) { + respCode, body -> + def json = parseJson(body) + logger.info("get tablet {} version {} from ms, response {}", tabletId, version, json) + // {"rowset_id":"0","partition_id":"27695","tablet_id":"27700","txn_id":"7057526525952","tablet_schema_hash":0,"rowset_type":"BETA_ROWSET","rowset_state":"COMMITTED","start_version":"3","end_version":"3","version_hash":"0","num_rows":"1","total_disk_size":"895","data_disk_size":"895","index_disk_size":"0","empty":false,"load_id":{"hi":"-1646598626735601581","lo":"-6677682539881484579"},"delete_flag":false,"creation_time":"1736153402","num_segments":"1","rowset_id_v2":"0200000000000004694889e84c76391cfd52ec7db0a483ba","resource_id":"1","newest_write_timestamp":"1736153402","segments_key_bounds":[{"min_key":"AoAAAAAAAAAC","max_key":"AoAAAAAAAAAC"}],"txn_expiration":"1736167802","segments_overlap_pb":"NONOVERLAPPING","compaction_level":"0","segments_file_size":["895"],"index_id":"27697","schema_version":0,"enable_segments_file_size":true,"has_variant_type_in_schema":false,"enable_inverted_index_file_info":false} + def segmentNum = json.num_segments as int + def rowsetId = json.rowset_id_v2 as String + segmentFiles = (0.. "${rowsetId}_${i}.dat" } + } + + segmentFiles.each { + // curl '175.40.51.3:8040/api/file_cache?op=hash&value=0200000000000004694889e84c76391cfd52ec7db0a483ba_0.dat' + def data = Http.GET("http://${beHttpPort}/api/file_cache?op=hash&value=${it}", true) + // {"hash":"2b79c649a1766dad371054ee168f0574"} + logger.info("get tablet {} segmentFile {}, response {}", tabletId, it, data) + hashValues << data.hash + } + hashValues + } + + // get table's tablet file cache + def getTabletFileCacheDirFromBe = { msHttpPort, table, version -> + // beHost HashFile + def beHostToHashFile = [:] + + def getTabletsAndHostFromFe = getTabletAndBeHostFromFe(table) + getTabletsAndHostFromFe.each { + def beHost = it.Value[1] + def tabletId = it.Key + def hashRet = getRowsetFileCacheDirFromBe(beHost + ":8040", msHttpPort, tabletId, version) + hashRet.each { + def hashFile = it + if (beHostToHashFile.containsKey(beHost)) { + beHostToHashFile[beHost].add(hashFile) + } else { + beHostToHashFile[beHost] = [hashFile] + } + } + } + beHostToHashFile + } + } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Http.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Http.groovy index cd688a1fcfc062..2a63f8763df80f 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Http.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Http.groovy @@ -53,7 +53,7 @@ class Http { conn.setRequestProperty('Authorization', 'Basic cm9vdDo=') //token for root def code = conn.responseCode def text = conn.content.text - logger.info("http post url=${url}, isJson=${isJson}, response code=${code}, text=${text}") + logger.info("http get url=${url}, isJson=${isJson}, response code=${code}, text=${text}") Assert.assertEquals(200, code) if (isJson) { def json = new JsonSlurper() diff --git a/regression-test/suites/cloud_p0/tablets/test_clean_stale_rs_file_cache.groovy b/regression-test/suites/cloud_p0/tablets/test_clean_stale_rs_file_cache.groovy new file mode 100644 index 00000000000000..8d41939981a812 --- /dev/null +++ b/regression-test/suites/cloud_p0/tablets/test_clean_stale_rs_file_cache.groovy @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.Http + +suite('test_clean_stale_rs_file_cache', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1' + ] + options.beConfigs += [ + 'report_tablet_interval_seconds=1', + 'cumulative_compaction_min_deltas=5', + 'tablet_rowset_stale_sweep_by_size=false', + 'tablet_rowset_stale_sweep_time_sec=60', + 'vacuum_stale_rowsets_interval_s=10' + ] + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = true + + def table = "test_clean_stale_rs_file_cache" + + docker(options) { + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort + sql """CREATE TABLE $table ( + `k1` int(11) NULL, + `k2` int(11) NULL, + `v1` varchar(2048) + ) + DUPLICATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_num"="1" + ); + """ + // version 2 + sql """ + insert into $table values (1, 1, 'v1'), (2, 2, 'v2'), (3, 3, 'v3') + """ + def cacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + // version 3 + sql """ + insert into $table values (10, 1, 'v1'), (20, 2, 'v2'), (30, 3, 'v3') + """ + def cacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + // version 4 + sql """ + insert into $table values (100, 1, 'v1'), (200, 2, 'v2'), (300, 3, 'v3') + """ + // version 5 + sql """ + insert into $table values (1000, 1, 'v1'), (2000, 2, 'v2'), (3000, 3, 'v3') + """ + // version 6 + sql """ + insert into $table values (10000, 1, 'v1'), (20000, 2, 'v2'), (30000, 3, 'v3') + """ + + def mergedCacheDir = cacheDirVersion2 + cacheDirVersion3.collectEntries { host, hashFiles -> + [(host): cacheDirVersion2[host] ? (cacheDirVersion2[host] + hashFiles) : hashFiles] + } + for (int i = 0; i < 5; i++) { + sql """ + select count(*) from $table + """ + } + def beforeGetFromFe = getTabletAndBeHostFromFe(table) + logger.info("fe tablets {}, cache dir {}", beforeGetFromFe , mergedCacheDir) + // wait compaction finish, and vacuum_stale_rowsets work + sleep(80 * 1000) + + // check cache file has been deleted + beforeGetFromFe.each { + def tabletId = it.Key + def backendId = it.Value[0] + def backendHost = it.Value[1] + def be = cluster.getBeByBackendId(backendId.toLong()) + def dataPath = new File("${be.path}/storage/file_cache") + def subDirs = [] + + def collectDirs + collectDirs = { File dir -> + if (dir.exists()) { + dir.eachDir { subDir -> + subDirs << subDir.name + collectDirs(subDir) + } + } + } + + + collectDirs(dataPath) + logger.info("BE {} file_cache subdirs: {}", backendHost, subDirs) + def cacheDir = mergedCacheDir[backendHost] + + // add check + cacheDir.each { hashFile -> + assertFalse(subDirs.any { subDir -> subDir.startsWith(hashFile) }, + "Found unexpected cache file pattern ${hashFile} in BE ${backendHost}'s file_cache directory. " + + "Matching subdir found in: ${subDirs}") + } + } + + } +} diff --git a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy index 4dc847d603a324..a65f59f85a1ee4 100644 --- a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy +++ b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy @@ -16,7 +16,7 @@ // under the License. import org.apache.doris.regression.suite.ClusterOptions -import org.apache.doris.regression.util.Http + suite('test_clean_tablet_when_drop_force_table', 'docker') { if (!isCloudMode()) { @@ -31,51 +31,22 @@ suite('test_clean_tablet_when_drop_force_table', 'docker') { 'rehash_tablet_after_be_dead_seconds=5' ] options.beConfigs += [ - 'report_tablet_interval_seconds=1' + 'report_tablet_interval_seconds=1', + 'write_buffer_size=10240', + 'write_buffer_size_for_agg=10240' ] options.setFeNum(3) options.setBeNum(3) options.cloudMode = true options.enableDebugPoints() - def backendIdToHost = { -> - def spb = sql_return_maparray """SHOW BACKENDS""" - def beIdToHost = [:] - spb.each { - beIdToHost[it.BackendId] = it.Host - } - beIdToHost - } - - def getTabletAndBeHostFromFe = { table -> - def result = sql_return_maparray """SHOW TABLETS FROM $table""" - def bes = backendIdToHost.call() - // tablet : host - def ret = [:] - result.each { - ret[it.TabletId] = bes[it.BackendId] - } - ret - } - - def getTabletAndBeHostFromBe = { -> - def bes = cluster.getAllBackends() - def ret = [:] - bes.each { be -> - // {"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3} - def data = Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all", true).data - def tablets = data.tablets.collect { it.tablet_id as String } - tablets.each{ - ret[it] = data.host - } - } - ret - } - def testCase = { table, waitTime, useDp=false-> + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort sql """CREATE TABLE $table ( `k1` int(11) NULL, - `k2` int(11) NULL + `k2` int(11) NULL, + `v1` VARCHAR(2048) ) DUPLICATE KEY(`k1`, `k2`) COMMENT 'OLAP' @@ -84,23 +55,60 @@ suite('test_clean_tablet_when_drop_force_table', 'docker') { "replication_num"="1" ); """ + def random = new Random() + def generateRandomString = { int length -> + random.with { + def chars = ('A'..'Z').collect() + ('a'..'z').collect() + ('0'..'9').collect() + (1..length).collect { chars[nextInt(chars.size())] }.join('') + } + } + def valuesList = (1..30000).collect { i -> + def randomStr = generateRandomString(2000) + "($i, $i, '$randomStr')" + }.join(", ") + sql """ + set global max_allowed_packet = 1010241024 + """ + + context.reconnectFe() sql """ - insert into $table values (1, 1), (2, 2), (3, 3) + insert into $table values ${valuesList} """ for (int i = 0; i < 5; i++) { sql """ - select * from $table + select count(*) from $table """ } + valuesList = (30001..60000).collect { i -> + def randomStr = generateRandomString(2000) + "($i, $i, '$randomStr')" + }.join(", ") + sql """ + set global max_allowed_packet = 1010241024 + """ + context.reconnectFe() + sql """ + insert into $table values ${valuesList} + """ + // before drop table force def beforeGetFromFe = getTabletAndBeHostFromFe(table) - def beforeGetFromBe = getTabletAndBeHostFromBe.call() - logger.info("fe tablets {}, be tablets {}", beforeGetFromFe, beforeGetFromBe) + def beforeGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + // version 2 + def cacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + // version 3 + def cacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 3) + + def mergedCacheDir = cacheDirVersion2 + cacheDirVersion3.collectEntries { host, hashFiles -> + [(host): cacheDirVersion2[host] ? (cacheDirVersion2[host] + hashFiles) : hashFiles] + } + + logger.info("fe tablets {}, be tablets {}, cache dir {}", beforeGetFromFe, beforeGetFromBe, mergedCacheDir) beforeGetFromFe.each { assertTrue(beforeGetFromBe.containsKey(it.Key)) - assertEquals(beforeGetFromBe[it.Key], it.Value) + assertEquals(beforeGetFromBe[it.Key], it.Value[1]) } if (useDp) { GetDebugPoint().enableDebugPointForAllBEs("WorkPoolCloudDropTablet.drop_tablet_callback.failed") @@ -119,16 +127,50 @@ suite('test_clean_tablet_when_drop_force_table', 'docker') { } def start = System.currentTimeMillis() / 1000 // tablet can't find in be - dockerAwaitUntil(50) { - def beTablets = getTabletAndBeHostFromBe.call().keySet() + dockerAwaitUntil(500) { + def beTablets = getTabletAndBeHostFromBe(cluster.getAllBackends()).keySet() logger.info("before drop tablets {}, after tablets {}", beforeGetFromFe, beTablets) - beforeGetFromFe.keySet().every { !getTabletAndBeHostFromBe.call().containsKey(it) } + beforeGetFromFe.keySet().every { !getTabletAndBeHostFromBe(cluster.getAllBackends()).containsKey(it) } } logger.info("table {}, cost {}s", table, System.currentTimeMillis() / 1000 - start) assertTrue(System.currentTimeMillis() / 1000 - start > waitTime) if (useDp) { futrue.get() } + + sleep(25 * 1000) + + // check cache file has been deleted + beforeGetFromFe.each { + def tabletId = it.Key + def backendId = it.Value[0] + def backendHost = it.Value[1] + def be = cluster.getBeByBackendId(backendId.toLong()) + def dataPath = new File("${be.path}/storage/file_cache") + def subDirs = [] + + def collectDirs + collectDirs = { File dir -> + if (dir.exists()) { + dir.eachDir { subDir -> + subDirs << subDir.name + collectDirs(subDir) + } + } + } + + collectDirs(dataPath) + logger.info("BE {} file_cache subdirs: {}", backendHost, subDirs) + def cacheDir = mergedCacheDir[backendHost] + + // add check + cacheDir.each { hashFile -> + assertFalse(subDirs.any { subDir -> subDir.startsWith(hashFile) }, + "Found unexpected cache file pattern ${hashFile} in BE ${backendHost}'s file_cache directory. " + + "Matching subdir found in: ${subDirs}") + } + } + } docker(options) { diff --git a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy index 4a44b317cc2233..151de976a8303c 100644 --- a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy +++ b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy @@ -32,7 +32,9 @@ suite('test_clean_tablet_when_rebalance', 'docker') { ] options.feConfigs.add("rehash_tablet_after_be_dead_seconds=$rehashTime") options.beConfigs += [ - 'report_tablet_interval_seconds=1' + 'report_tablet_interval_seconds=1', + 'write_buffer_size=10240', + 'write_buffer_size_for_agg=10240' ] options.setFeNum(3) options.setBeNum(3) @@ -42,56 +44,22 @@ suite('test_clean_tablet_when_rebalance', 'docker') { def choseDeadBeIndex = 1 def table = "test_clean_tablet_when_rebalance" - def backendIdToHost = { -> - def spb = sql_return_maparray """SHOW BACKENDS""" - def beIdToHost = [:] - spb.each { - beIdToHost[it.BackendId] = it.Host - } - beIdToHost - } - - def getTabletAndBeHostFromFe = { -> - def result = sql_return_maparray """SHOW TABLETS FROM $table""" - def bes = backendIdToHost.call() - // tablet : host - def ret = [:] - result.each { - ret[it.TabletId] = bes[it.BackendId] - } - ret - } - - def getTabletAndBeHostFromBe = { -> - def bes = cluster.getAllBackends() - def ret = [:] - bes.each { be -> - // {"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3} - def data = Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all", true).data - def tablets = data.tablets.collect { it.tablet_id as String } - tablets.each{ - ret[it] = data.host - } - } - ret - } - - def testCase = { deadTime -> + def testCase = { deadTime, mergedCacheDir -> boolean beDeadLong = deadTime > rehashTime ? true : false logger.info("begin exec beDeadLong {}", beDeadLong) for (int i = 0; i < 5; i++) { sql """ - select * from $table + select count(*) from $table """ } - def beforeGetFromFe = getTabletAndBeHostFromFe() - def beforeGetFromBe = getTabletAndBeHostFromBe.call() + def beforeGetFromFe = getTabletAndBeHostFromFe(table) + def beforeGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) logger.info("before fe tablets {}, be tablets {}", beforeGetFromFe, beforeGetFromBe) beforeGetFromFe.each { assertTrue(beforeGetFromBe.containsKey(it.Key)) - assertEquals(beforeGetFromBe[it.Key], it.Value) + assertEquals(beforeGetFromBe[it.Key], it.Value[1]) } cluster.stopBackends(choseDeadBeIndex) @@ -120,24 +88,72 @@ suite('test_clean_tablet_when_rebalance', 'docker') { bes.size() == (beDeadLong ? 2 : 3) } for (int i = 0; i < 5; i++) { + sleep(2000) sql """ - select * from $table + select count(*) from $table """ - sleep(1000) } - beforeGetFromFe = getTabletAndBeHostFromFe() - beforeGetFromBe = getTabletAndBeHostFromBe.call() - logger.info("after fe tablets {}, be tablets {}", beforeGetFromFe, beforeGetFromBe) - beforeGetFromFe.each { - assertTrue(beforeGetFromBe.containsKey(it.Key)) - assertEquals(beforeGetFromBe[it.Key], it.Value) + def afterGetFromFe = getTabletAndBeHostFromFe(table) + def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + logger.info("after fe tablets {}, be tablets {}", afterGetFromFe, afterGetFromBe) + afterGetFromFe.each { + assertTrue(afterGetFromBe.containsKey(it.Key)) + assertEquals(afterGetFromBe[it.Key], it.Value[1]) + } + + // TODO(freemandealer) + // Once the freemandealer implements file cache cleanup during restart, enabling lines 107 to 145 will allow testing to confirm that after the rebalance, the tablet file cache on the BE will be cleared. In the current implementation, after restarting the BE and triggering the rebalance, the tablets in the tablet manager will be cleared, but the file cache cannot be cleaned up. + /* + if (beDeadLong) { + // check tablet file cache has been deleted + // after fe tablets {10309=[10003, 175.41.51.3], 10311=[10002, 175.41.51.2], 10313=[10003, 175.41.51.3]}, + afterGetFromFe.each { + logger.info("tablet_id {}, before host {}, after host {}", it.Key, beforeGetFromFe[it.Key][1], it.Value[1]) + if (beforeGetFromFe[it.Key][1] == it.Value[1]) { + return + } + logger.info("tablet_id {} has been reblanced from {} to {}", it.Key, beforeGetFromFe[it.Key][1], it.Value[1]) + // check before tablet file cache dir has been droped + + def tabletId = it.Key + def backendId = beforeGetFromFe[it.Key][0] + def backendHost = beforeGetFromFe[it.Key][1] + def be = cluster.getBeByBackendId(backendId.toLong()) + def dataPath = new File("${be.path}/storage/file_cache") + def subDirs = [] + + def collectDirs + collectDirs = { File dir -> + if (dir.exists()) { + dir.eachDir { subDir -> + subDirs << subDir.name + collectDirs(subDir) + } + } + } + + collectDirs(dataPath) + logger.info("BE {} file_cache subdirs: {}", backendHost, subDirs) + def cacheDir = mergedCacheDir[backendHost] + + // add check + cacheDir.each { hashFile -> + assertFalse(subDirs.any { subDir -> subDir.startsWith(hashFile) }, + "Found unexpected cache file pattern ${hashFile} in BE ${backendHost}'s file_cache directory. " + + "Matching subdir found in: ${subDirs}") + } + } } + */ } docker(options) { + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort sql """CREATE TABLE $table ( `k1` int(11) NULL, - `k2` int(11) NULL + `k2` int(11) NULL, + `v1` varchar(2048) ) DUPLICATE KEY(`k1`, `k2`) COMMENT 'OLAP' @@ -147,12 +163,13 @@ suite('test_clean_tablet_when_rebalance', 'docker') { ); """ sql """ - insert into $table values (1, 1), (2, 2), (3, 3) + insert into $table values (1, 1, 'v1'), (2, 2, 'v2'), (3, 3, 'v3') """ - // 'rehash_tablet_after_be_dead_seconds=10' + def cacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + // 'rehash_tablet_after_be_dead_seconds=100' // be-1 dead, but not dead for a long time - testCase(5) + testCase(5, cacheDirVersion2) // be-1 dead, and dead for a long time - testCase(200) + testCase(200, cacheDirVersion2) } }