diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java index b93d4fe2cff464..1b014caea82779 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java @@ -35,6 +35,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.rpc.RpcException; import org.apache.doris.system.Backend; @@ -55,6 +56,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -418,11 +420,22 @@ public void checkInflghtWarmUpCacheAsync() { for (Map.Entry> entry : beToInfightTasks.entrySet()) { LOG.info("before pre cache check dest be {} inflight task num {}", entry.getKey(), entry.getValue().size()); Backend destBackend = cloudSystemInfoService.getBackend(entry.getKey()); + if (DebugPointUtil.isEnable("CloudTabletRebalancer.checkInflghtWarmUpCacheAsync.beNull")) { + LOG.info("debug point CloudTabletRebalancer.checkInflghtWarmUpCacheAsync.beNull, be {}", destBackend); + destBackend = null; + } if (destBackend == null || (!destBackend.isAlive() && destBackend.getLastUpdateMs() < needRehashDeadTime)) { + List toRemove = new LinkedList<>(); for (InfightTask task : entry.getValue()) { for (InfightTablet key : tabletToInfightTask.keySet()) { - tabletToInfightTask.remove(new InfightTablet(task.pickedTablet.getId(), key.clusterId)); + toRemove.add(new InfightTablet(task.pickedTablet.getId(), key.clusterId)); + } + } + for (InfightTablet key : toRemove) { + if (LOG.isDebugEnabled()) { + LOG.debug("remove tablet {}-{}", key.getClusterId(), key.getTabletId()); } + tabletToInfightTask.remove(key); } continue; } @@ -447,6 +460,9 @@ public void checkInflghtWarmUpCacheAsync() { LOG.info("{} pre cache timeout, forced to change the mapping", result.getKey()); } updateClusterToBeMap(task.pickedTablet, task.destBe, clusterId, infos); + if (LOG.isDebugEnabled()) { + LOG.debug("remove tablet {}-{}", clusterId, task.pickedTablet.getId()); + } tabletToInfightTask.remove(new InfightTablet(task.pickedTablet.getId(), clusterId)); } } diff --git a/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy index 7a76533b0a47d2..c15157308c4903 100644 --- a/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy +++ b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy @@ -21,142 +21,198 @@ suite('test_rebalance_in_cloud', 'multi_cluster,docker') { if (!isCloudMode()) { return; } - def options = new ClusterOptions() - options.feConfigs += [ - 'cloud_cluster_check_interval_second=1', - 'enable_cloud_warm_up_for_rebalance=false', - 'cloud_tablet_rebalancer_interval_second=1', - 'cloud_balance_tablet_percent_per_run=0.5', - 'cloud_pre_heating_time_limit_sec=1', - 'sys_log_verbose_modules=org', + + def clusterOptions = [ + new ClusterOptions(), + new ClusterOptions(), ] - options.setFeNum(3) - options.setBeNum(1) - options.cloudMode = true - options.connectToFollower = true - options.enableDebugPoints() - - docker(options) { - sql """ - CREATE TABLE table100 ( - class INT, - id INT, - score INT SUM - ) - AGGREGATE KEY(class, id) - DISTRIBUTED BY HASH(class) BUCKETS 48 - """ - - sql """ - CREATE TABLE table_p2 ( k1 int(11) NOT NULL, k2 varchar(20) NOT NULL, k3 int sum NOT NULL ) - AGGREGATE KEY(k1, k2) - PARTITION BY RANGE(k1) ( - PARTITION p1992 VALUES [("-2147483648"), ("19930101")), - PARTITION p1993 VALUES [("19930101"), ("19940101")), - PARTITION p1994 VALUES [("19940101"), ("19950101")), - PARTITION p1995 VALUES [("19950101"), ("19960101")), - PARTITION p1996 VALUES [("19960101"), ("19970101")), - PARTITION p1997 VALUES [("19970101"), ("19980101")), - PARTITION p1998 VALUES [("19980101"), ("19990101"))) - DISTRIBUTED BY HASH(k1) BUCKETS 3 - """ - GetDebugPoint().enableDebugPointForAllFEs("CloudReplica.getBackendIdImpl.primaryClusterToBackends"); - sql """set global forward_to_master=false""" - - // add a be - cluster.addBackend(1, null) - - dockerAwaitUntil(30) { - def bes = sql """show backends""" - log.info("bes: {}", bes) - bes.size() == 2 - } + for (options in clusterOptions) { + options.setFeNum(3) + options.setBeNum(1) + options.cloudMode = true + options.connectToFollower = true + options.enableDebugPoints() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'cloud_balance_tablet_percent_per_run=0.5', - dockerAwaitUntil(5) { - def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table100""" - log.info("replica distribution table100: {}", ret) - ret.size() == 2 - } + 'sys_log_verbose_modules=org', + ] + } + clusterOptions[0].feConfigs += ['enable_cloud_warm_up_for_rebalance=true', 'cloud_pre_heating_time_limit_sec=300'] + clusterOptions[1].feConfigs += ['enable_cloud_warm_up_for_rebalance=false'] - def result = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM table100; """ - assertEquals(2, result.size()) - int replicaNum = 0 - - for (def row : result) { - log.info("replica distribution: ${row} ".toString()) - replicaNum = Integer.valueOf((String) row.ReplicaNum) - if (replicaNum == 0) { - // due to debug point, observer not hash replica - } else { - assertTrue(replicaNum <= 25 && replicaNum >= 23) + + for (int i = 0; i < clusterOptions.size(); i++) { + log.info("begin warm up {}", i == 0 ? "ON" : "OFF") + docker(clusterOptions[i]) { + sql """ + CREATE TABLE table100 ( + class INT, + id INT, + score INT SUM + ) + AGGREGATE KEY(class, id) + DISTRIBUTED BY HASH(class) BUCKETS 48 + """ + + sql """ + CREATE TABLE table_p2 ( k1 int(11) NOT NULL, k2 varchar(20) NOT NULL, k3 int sum NOT NULL ) + AGGREGATE KEY(k1, k2) + PARTITION BY RANGE(k1) ( + PARTITION p1992 VALUES [("-2147483648"), ("19930101")), + PARTITION p1993 VALUES [("19930101"), ("19940101")), + PARTITION p1994 VALUES [("19940101"), ("19950101")), + PARTITION p1995 VALUES [("19950101"), ("19960101")), + PARTITION p1996 VALUES [("19960101"), ("19970101")), + PARTITION p1997 VALUES [("19970101"), ("19980101")), + PARTITION p1998 VALUES [("19980101"), ("19990101"))) + DISTRIBUTED BY HASH(k1) BUCKETS 3 + """ + GetDebugPoint().enableDebugPointForAllFEs("CloudReplica.getBackendIdImpl.primaryClusterToBackends"); + sql """set global forward_to_master=false""" + + // add a be + cluster.addBackend(1, null) + + dockerAwaitUntil(30) { + def bes = sql """show backends""" + log.info("bes: {}", bes) + bes.size() == 2 } - } - dockerAwaitUntil(5) { - def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1992)""" - log.info("replica distribution table_p2: {}", ret) - ret.size() == 2 - } + dockerAwaitUntil(5) { + def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table100""" + log.info("replica distribution table100: {}", ret) + ret.size() == 2 + } + def result = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM table100; """ + assertEquals(2, result.size()) + int replicaNum = 0 - result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1992) """ - assertEquals(2, result.size()) - for (def row : result) { - replicaNum = Integer.valueOf((String) row.ReplicaNum) - log.info("replica distribution: ${row} ".toString()) - if (replicaNum != 0) { - assertTrue(replicaNum <= 2 && replicaNum >= 1) + for (def row : result) { + log.info("replica distribution: ${row} ".toString()) + replicaNum = Integer.valueOf((String) row.ReplicaNum) + if (replicaNum == 0) { + // due to debug point, observer not hash replica + } else { + assertTrue(replicaNum <= 25 && replicaNum >= 23) + } } - } - result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1993) """ - assertEquals(2, result.size()) - for (def row : result) { - replicaNum = Integer.valueOf((String) row.ReplicaNum) - log.info("replica distribution: ${row} ".toString()) - if (replicaNum != 0) { - assertTrue(replicaNum <= 2 && replicaNum >= 1) + dockerAwaitUntil(5) { + def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1992)""" + log.info("replica distribution table_p2: {}", ret) + ret.size() == 2 } - } - result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1994) """ - assertEquals(2, result.size()) - for (def row : result) { - replicaNum = Integer.valueOf((String) row.ReplicaNum) - log.info("replica distribution: ${row} ".toString()) - if (replicaNum != 0) { - assertTrue(replicaNum <= 2 && replicaNum >= 1) + + result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1992) """ + assertEquals(2, result.size()) + for (def row : result) { + replicaNum = Integer.valueOf((String) row.ReplicaNum) + log.info("replica distribution: ${row} ".toString()) + if (replicaNum != 0) { + assertTrue(replicaNum <= 2 && replicaNum >= 1) + } } - } - result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1995) """ - assertEquals(2, result.size()) - for (def row : result) { - replicaNum = Integer.valueOf((String) row.ReplicaNum) - log.info("replica distribution: ${row} ".toString()) - if (replicaNum != 0) { - assertTrue(replicaNum <= 2 && replicaNum >= 1) + result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1993) """ + assertEquals(2, result.size()) + for (def row : result) { + replicaNum = Integer.valueOf((String) row.ReplicaNum) + log.info("replica distribution: ${row} ".toString()) + if (replicaNum != 0) { + assertTrue(replicaNum <= 2 && replicaNum >= 1) + } } - } - result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1996) """ - assertEquals(2, result.size()) - for (def row : result) { - replicaNum = Integer.valueOf((String) row.ReplicaNum) - log.info("replica distribution: ${row} ".toString()) - if (replicaNum != 0) { - assertTrue(replicaNum <= 2 && replicaNum >= 1) + result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1994) """ + assertEquals(2, result.size()) + for (def row : result) { + replicaNum = Integer.valueOf((String) row.ReplicaNum) + log.info("replica distribution: ${row} ".toString()) + if (replicaNum != 0) { + assertTrue(replicaNum <= 2 && replicaNum >= 1) + } } - } - result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1997) """ - assertEquals(2, result.size()) - for (def row : result) { - replicaNum = Integer.valueOf((String) row.ReplicaNum) - log.info("replica distribution: ${row} ".toString()) - if (replicaNum != 0) { - assertTrue(replicaNum <= 2 && replicaNum >= 1) + result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1995) """ + assertEquals(2, result.size()) + for (def row : result) { + replicaNum = Integer.valueOf((String) row.ReplicaNum) + log.info("replica distribution: ${row} ".toString()) + if (replicaNum != 0) { + assertTrue(replicaNum <= 2 && replicaNum >= 1) + } + } + + result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1996) """ + assertEquals(2, result.size()) + for (def row : result) { + replicaNum = Integer.valueOf((String) row.ReplicaNum) + log.info("replica distribution: ${row} ".toString()) + if (replicaNum != 0) { + assertTrue(replicaNum <= 2 && replicaNum >= 1) + } + } + + result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 PARTITION(p1997) """ + assertEquals(2, result.size()) + for (def row : result) { + replicaNum = Integer.valueOf((String) row.ReplicaNum) + log.info("replica distribution: ${row} ".toString()) + if (replicaNum != 0) { + assertTrue(replicaNum <= 2 && replicaNum >= 1) + } + } + + if (i == 1) { + // just test warm up + return + } + + GetDebugPoint().enableDebugPointForAllFEs("CloudTabletRebalancer.checkInflghtWarmUpCacheAsync.beNull"); + // add a be + cluster.addBackend(1, null) + // warm up + sql """admin set frontend config("enable_cloud_warm_up_for_rebalance"="true")""" + + // test rebalance thread still work + dockerAwaitUntil(30) { + def bes = sql """show backends""" + log.info("bes: {}", bes) + bes.size() == 3 + } + + dockerAwaitUntil(5) { + def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table100""" + log.info("replica distribution table100: {}", ret) + ret.size() == 3 + } + + result = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM table100; """ + assertEquals(3, result.size()) + log.info("replica distribution: ${result} ".toString()) + + // test 10s not balance, due to debug point + for (int j = 0; j < 10; j++) { + assertTrue(result.any { row -> + Integer.valueOf((String) row.ReplicaNum) == 0 + }) + sleep(1 * 1000) + } + GetDebugPoint().disableDebugPointForAllFEs("CloudTabletRebalancer.checkInflghtWarmUpCacheAsync.beNull"); + dockerAwaitUntil(10) { + def ret = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM table100""" + log.info("replica distribution table100: {}", ret) + ret.any { row -> + Integer.valueOf((String) row.ReplicaNum) == 16 + } } } + logger.info("Successfully run {} times", i + 1) } }