From bd975a1722f15951503fdedec2325c8a7f33af3c Mon Sep 17 00:00:00 2001 From: deardeng Date: Fri, 4 Jul 2025 22:18:05 +0800 Subject: [PATCH] [opt](cloud) Optimize balance speed by reducing the complexity of the rebalance algorithm (#51733) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Arrary List remove,time complexity is too high image --- .../cloud/catalog/CloudTabletRebalancer.java | 510 ++++++++++-------- 1 file changed, 288 insertions(+), 222 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java index 1b014caea82779..17d72dd7446a44 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java @@ -54,6 +54,7 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -69,31 +70,31 @@ public class CloudTabletRebalancer extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(CloudTabletRebalancer.class); - private volatile ConcurrentHashMap> beToTabletsGlobal = - new ConcurrentHashMap>(); + private volatile ConcurrentHashMap> beToTabletsGlobal = + new ConcurrentHashMap>(); - private volatile ConcurrentHashMap> beToColocateTabletsGlobal = - new ConcurrentHashMap>(); + private volatile ConcurrentHashMap> beToColocateTabletsGlobal = + new ConcurrentHashMap>(); // used for cloud tablet report - private volatile ConcurrentHashMap> beToTabletsGlobalInSecondary = - new ConcurrentHashMap>(); + private volatile ConcurrentHashMap> beToTabletsGlobalInSecondary = + new ConcurrentHashMap>(); - private Map> futureBeToTabletsGlobal; + private Map> futureBeToTabletsGlobal; private Map> clusterToBes; private Set allBes; // partitionId -> indexId -> be -> tablet - private Map>>> partitionToTablets; + private Map>>> partitionToTablets; - private Map>>> futurePartitionToTablets; + private Map>>> futurePartitionToTablets; // tableId -> be -> tablet - private Map>> beToTabletsInTable; + private Map>> beToTabletsInTable; - private Map>> futureBeToTabletsInTable; + private Map>> futureBeToTabletsInTable; private Map beToDecommissionedTime = new HashMap(); @@ -157,7 +158,7 @@ private class InfightTask { public long srcBe; public long destBe; public boolean isGlobal; - public Map> beToTablets; + public Map> beToTablets; public long startTimestamp; BalanceType balanceType; } @@ -172,7 +173,7 @@ private class TransferPairInfo { public Set getSnapshotTabletsInPrimaryByBeId(Long beId) { Set tabletIds = Sets.newHashSet(); - List tablets = beToTabletsGlobal.get(beId); + Set tablets = beToTabletsGlobal.get(beId); if (tablets != null) { for (Tablet tablet : tablets) { tabletIds.add(tablet.getId()); @@ -191,7 +192,7 @@ public Set getSnapshotTabletsInPrimaryByBeId(Long beId) { public Set getSnapshotTabletsInSecondaryByBeId(Long beId) { Set tabletIds = Sets.newHashSet(); - List tablets = beToTabletsGlobalInSecondary.get(beId); + Set tablets = beToTabletsGlobalInSecondary.get(beId); if (tablets != null) { for (Tablet tablet : tablets) { tabletIds.add(tablet.getId()); @@ -208,8 +209,8 @@ public Set getSnapshotTabletsInPrimaryAndSecondaryByBeId(Long beId) { } public int getTabletNumByBackendId(long beId) { - List tablets = beToTabletsGlobal.get(beId); - List colocateTablets = beToColocateTabletsGlobal.get(beId); + Set tablets = beToTabletsGlobal.get(beId); + Set colocateTablets = beToColocateTabletsGlobal.get(beId); return (tablets == null ? 0 : tablets.size()) + (colocateTablets == null ? 0 : colocateTablets.size()); @@ -232,80 +233,80 @@ protected void runAfterCatalogReady() { } LOG.info("cloud tablet rebalance begin"); - - clusterToBes = new HashMap>(); - allBes = new HashSet(); long start = System.currentTimeMillis(); - // 1 build cluster to backend info + buildClusterToBackendMap(); + if (!completeRouteInfo()) { + return; + } + + checkInflightWarmUpCacheAsync(); + statRouteInfo(); + migrateTabletsForSmoothUpgrade(); + statRouteInfo(); + + indexBalanced = true; + tableBalanced = true; + + performBalancing(); + + checkDecommissionState(clusterToBes); + LOG.info("finished to rebalancer. cost: {} ms", (System.currentTimeMillis() - start)); + } + + private void buildClusterToBackendMap() { + clusterToBes = new HashMap<>(); + allBes = new HashSet<>(); for (Long beId : cloudSystemInfoService.getAllBackendIds()) { Backend be = cloudSystemInfoService.getBackend(beId); if (be == null) { LOG.info("backend {} not found", beId); continue; } - clusterToBes.putIfAbsent(be.getCloudClusterId(), new ArrayList()); + clusterToBes.putIfAbsent(be.getCloudClusterId(), new ArrayList<>()); clusterToBes.get(be.getCloudClusterId()).add(beId); allBes.add(beId); } LOG.info("cluster to backends {}", clusterToBes); + } - // 2 complete route info - if (!completeRouteInfo()) { - return; - } - - // 3 check whether the inflight preheating task has been completed - checkInflghtWarmUpCacheAsync(); - - // 4 migrate tablet for smooth upgrade + private void migrateTabletsForSmoothUpgrade() { Pair pair; - statRouteInfo(); while (!tabletsMigrateTasks.isEmpty()) { try { pair = tabletsMigrateTasks.take(); + LOG.debug("begin tablets migration from be {} to be {}", pair.first, pair.second); + migrateTablets(pair.first, pair.second); } catch (InterruptedException e) { + LOG.warn("migrate tablets failed", e); throw new RuntimeException(e); } - if (LOG.isDebugEnabled()) { - LOG.debug("begin tablets migration from be {} to be {}", pair.first, pair.second); - } - migrateTablets(pair.first, pair.second); } + } - // 5 statistics be to tablets mapping information - statRouteInfo(); - - indexBalanced = true; - tableBalanced = true; - - // 6 partition-level balance + private void performBalancing() { + // ATTN: In general, the order of `balance` should follow `partition`, `table`, and `global`. + // This is because performing `global` scheduling first and then `partition` scheduling may + // lead to ineffective scheduling. Specifically, `global` scheduling might place multiple tablets belonging + // to the same table or partition onto the same BE, while `partition` scheduling later requires these tablets + // to be dispersed across different BEs, resulting in unnecessary scheduling. if (Config.enable_cloud_partition_balance) { balanceAllPartitions(); } - - // 7 if tablets in partition-level already balanced, perform table balance if (Config.enable_cloud_table_balance && indexBalanced) { balanceAllTables(); } - - // 8 if tablets in partition-level and table-level already balanced, perform global balance if (Config.enable_cloud_global_balance && indexBalanced && tableBalanced) { globalBalance(); } - - // 9 check whether all tablets of decomission have been migrated - checkDecommissionState(clusterToBes); - - LOG.info("finished to rebalancer. cost: {} ms", (System.currentTimeMillis() - start)); } public void balanceAllPartitions() { - for (Map.Entry> entry : beToTabletsGlobal.entrySet()) { + for (Map.Entry> entry : beToTabletsGlobal.entrySet()) { LOG.info("before partition balance be {} tablet num {}", entry.getKey(), entry.getValue().size()); } - for (Map.Entry> entry : futureBeToTabletsGlobal.entrySet()) { + for (Map.Entry> entry : futureBeToTabletsGlobal.entrySet()) { LOG.info("before partition balance be {} tablet num(current + pre heating inflight) {}", entry.getKey(), entry.getValue().size()); } @@ -326,22 +327,22 @@ public void balanceAllPartitions() { return; } - for (Map.Entry> entry : beToTabletsGlobal.entrySet()) { + for (Map.Entry> entry : beToTabletsGlobal.entrySet()) { LOG.info("after partition balance be {} tablet num {}", entry.getKey(), entry.getValue().size()); } - for (Map.Entry> entry : futureBeToTabletsGlobal.entrySet()) { + for (Map.Entry> entry : futureBeToTabletsGlobal.entrySet()) { LOG.info("after partition balance be {} tablet num(current + pre heating inflight) {}", entry.getKey(), entry.getValue().size()); } } public void balanceAllTables() { - for (Map.Entry> entry : beToTabletsGlobal.entrySet()) { + for (Map.Entry> entry : beToTabletsGlobal.entrySet()) { LOG.info("before table balance be {} tablet num {}", entry.getKey(), entry.getValue().size()); } - for (Map.Entry> entry : futureBeToTabletsGlobal.entrySet()) { + for (Map.Entry> entry : futureBeToTabletsGlobal.entrySet()) { LOG.info("before table balance be {} tablet num(current + pre heating inflight) {}", entry.getKey(), entry.getValue().size()); } @@ -362,22 +363,22 @@ public void balanceAllTables() { return; } - for (Map.Entry> entry : beToTabletsGlobal.entrySet()) { + for (Map.Entry> entry : beToTabletsGlobal.entrySet()) { LOG.info("after table balance be {} tablet num {}", entry.getKey(), entry.getValue().size()); } - for (Map.Entry> entry : futureBeToTabletsGlobal.entrySet()) { + for (Map.Entry> entry : futureBeToTabletsGlobal.entrySet()) { LOG.info("after table balance be {} tablet num(current + pre heating inflight) {}", entry.getKey(), entry.getValue().size()); } } public void globalBalance() { - for (Map.Entry> entry : beToTabletsGlobal.entrySet()) { + for (Map.Entry> entry : beToTabletsGlobal.entrySet()) { LOG.info("before global balance be {} tablet num {}", entry.getKey(), entry.getValue().size()); } - for (Map.Entry> entry : futureBeToTabletsGlobal.entrySet()) { + for (Map.Entry> entry : futureBeToTabletsGlobal.entrySet()) { LOG.info("before global balance be {} tablet num(current + pre heating inflight) {}", entry.getKey(), entry.getValue().size()); } @@ -397,17 +398,17 @@ public void globalBalance() { return; } - for (Map.Entry> entry : beToTabletsGlobal.entrySet()) { + for (Map.Entry> entry : beToTabletsGlobal.entrySet()) { LOG.info("after global balance be {} tablet num {}", entry.getKey(), entry.getValue().size()); } - for (Map.Entry> entry : futureBeToTabletsGlobal.entrySet()) { + for (Map.Entry> entry : futureBeToTabletsGlobal.entrySet()) { LOG.info("after global balance be {} tablet num(current + pre heating inflight) {}", entry.getKey(), entry.getValue().size()); } } - public void checkInflghtWarmUpCacheAsync() { + public void checkInflightWarmUpCacheAsync() { Map> beToInfightTasks = new HashMap>(); for (Map.Entry entry : tabletToInfightTask.entrySet()) { @@ -494,7 +495,7 @@ public void checkDecommissionState(Map> clusterToBes) { for (Map.Entry> entry : clusterToBes.entrySet()) { List beList = entry.getValue(); for (long beId : beList) { - List tablets = beToTabletsGlobal.get(beId); + Set tablets = beToTabletsGlobal.get(beId); int tabletNum = tablets == null ? 0 : tablets.size(); Backend backend = cloudSystemInfoService.getBackend(beId); if (backend == null) { @@ -644,42 +645,42 @@ private boolean completeRouteInfo() { } public void fillBeToTablets(long be, long tableId, long partId, long indexId, Tablet tablet, - Map> globalBeToTablets, - Map>> beToTabletsInTable, - Map>>> partToTablets) { + Map> globalBeToTablets, + Map>> beToTabletsInTable, + Map>>> partToTablets) { // global - globalBeToTablets.putIfAbsent(be, new ArrayList()); + globalBeToTablets.putIfAbsent(be, new HashSet()); globalBeToTablets.get(be).add(tablet); // table - beToTabletsInTable.putIfAbsent(tableId, new HashMap>()); - Map> beToTabletsOfTable = beToTabletsInTable.get(tableId); - beToTabletsOfTable.putIfAbsent(be, new ArrayList()); + beToTabletsInTable.putIfAbsent(tableId, new HashMap>()); + Map> beToTabletsOfTable = beToTabletsInTable.get(tableId); + beToTabletsOfTable.putIfAbsent(be, new HashSet()); beToTabletsOfTable.get(be).add(tablet); // partition - partToTablets.putIfAbsent(partId, new HashMap>>()); - Map>> indexToTablets = partToTablets.get(partId); - indexToTablets.putIfAbsent(indexId, new HashMap>()); - Map> beToTabletsOfIndex = indexToTablets.get(indexId); - beToTabletsOfIndex.putIfAbsent(be, new ArrayList()); + partToTablets.putIfAbsent(partId, new HashMap>>()); + Map>> indexToTablets = partToTablets.get(partId); + indexToTablets.putIfAbsent(indexId, new HashMap>()); + Map> beToTabletsOfIndex = indexToTablets.get(indexId); + beToTabletsOfIndex.putIfAbsent(be, new HashSet()); beToTabletsOfIndex.get(be).add(tablet); } public void statRouteInfo() { - ConcurrentHashMap> tmpBeToTabletsGlobal = new ConcurrentHashMap>(); - ConcurrentHashMap> tmpBeToTabletsGlobalInSecondary - = new ConcurrentHashMap>(); - ConcurrentHashMap> tmpBeToColocateTabletsGlobal - = new ConcurrentHashMap>(); + ConcurrentHashMap> tmpBeToTabletsGlobal = new ConcurrentHashMap>(); + ConcurrentHashMap> tmpBeToTabletsGlobalInSecondary + = new ConcurrentHashMap>(); + ConcurrentHashMap> tmpBeToColocateTabletsGlobal + = new ConcurrentHashMap>(); - futureBeToTabletsGlobal = new HashMap>(); + futureBeToTabletsGlobal = new HashMap>(); - partitionToTablets = new HashMap>>>(); - futurePartitionToTablets = new HashMap>>>(); + partitionToTablets = new HashMap>>>(); + futurePartitionToTablets = new HashMap>>>(); - beToTabletsInTable = new HashMap>>(); - futureBeToTabletsInTable = new HashMap>>(); + beToTabletsInTable = new HashMap>>(); + futureBeToTabletsInTable = new HashMap>>(); loopCloudReplica((Database db, Table table, Partition partition, MaterializedIndex index, String cluster) -> { boolean isColocated = Env.getCurrentColocateIndex().isColocateTable(table.getId()); @@ -694,8 +695,8 @@ public void statRouteInfo() { continue; } if (allBes.contains(beId)) { - List colocateTablets = - tmpBeToColocateTabletsGlobal.computeIfAbsent(beId, k -> new ArrayList<>()); + Set colocateTablets = + tmpBeToColocateTabletsGlobal.computeIfAbsent(beId, k -> new HashSet<>()); colocateTablets.add(tablet); } continue; @@ -710,8 +711,8 @@ public void statRouteInfo() { Backend secondaryBe = replica.getSecondaryBackend(cluster); long secondaryBeId = secondaryBe == null ? -1L : secondaryBe.getId(); if (allBes.contains(secondaryBeId)) { - List tablets = tmpBeToTabletsGlobalInSecondary - .computeIfAbsent(secondaryBeId, k -> new ArrayList<>()); + Set tablets = tmpBeToTabletsGlobalInSecondary + .computeIfAbsent(secondaryBeId, k -> new HashSet<>()); tablets.add(tablet); } @@ -764,10 +765,10 @@ public void loopCloudReplica(Operator operator) { public void balanceInPartition(List bes, String clusterId, List infos) { // balance all partition - for (Map.Entry>>> partitionEntry : futurePartitionToTablets.entrySet()) { - Map>> indexToTablets = partitionEntry.getValue(); + for (Map.Entry>>> partitionEntry : futurePartitionToTablets.entrySet()) { + Map>> indexToTablets = partitionEntry.getValue(); // balance all index of a partition - for (Map.Entry>> entry : indexToTablets.entrySet()) { + for (Map.Entry>> entry : indexToTablets.entrySet()) { // balance a index balanceImpl(bes, clusterId, entry.getValue(), BalanceType.PARTITION, infos); } @@ -776,7 +777,7 @@ public void balanceInPartition(List bes, String clusterId, List bes, String clusterId, List infos) { // balance all tables - for (Map.Entry>> entry : futureBeToTabletsInTable.entrySet()) { + for (Map.Entry>> entry : futureBeToTabletsInTable.entrySet()) { balanceImpl(bes, clusterId, entry.getValue(), BalanceType.TABLE, infos); } } @@ -846,10 +847,10 @@ private Map sendCheckWarmUpCacheAsyncRpc(List tabletIds, lo return null; } - private void updateBeToTablets(Tablet pickedTablet, long srcBe, long destBe, BalanceType balanceType, - Map> globalBeToTablets, - Map>> beToTabletsInTable, - Map>>> partToTablets) { + private void updateBeToTablets(Tablet pickedTablet, long srcBe, long destBe, + Map> globalBeToTablets, + Map>> beToTabletsInTable, + Map>>> partToTablets) { CloudReplica replica = (CloudReplica) pickedTablet.getReplicas().get(0); long tableId = replica.getTableId(); long partId = replica.getPartitionId(); @@ -892,47 +893,73 @@ private void updateClusterToBeMap(Tablet pickedTablet, long destBe, String clust } } - private boolean getTransferPair(List bes, Map> beToTablets, long avgNum, + private boolean getTransferPair(List bes, Map> beToTablets, long avgNum, TransferPairInfo pairInfo) { - long destBe = bes.get(0); - long srcBe = bes.get(0); + long srcBe = findSourceBackend(bes, beToTablets); + long destBe = findDestinationBackend(bes, beToTablets, srcBe); - long minTabletsNum = Long.MAX_VALUE; + if (srcBe == -1 || destBe == -1) { + return false; // No valid backend found + } + + long minTabletsNum = beToTablets.get(destBe) == null ? 0 : beToTablets.get(destBe).size(); + long maxTabletsNum = beToTablets.get(srcBe) == null ? 0 : beToTablets.get(srcBe).size(); + + if (!isTransferValid(srcBe, minTabletsNum, maxTabletsNum, avgNum)) { + return false; // Transfer conditions not met + } + + pairInfo.srcBe = srcBe; + pairInfo.destBe = destBe; + pairInfo.minTabletsNum = minTabletsNum; + pairInfo.maxTabletsNum = maxTabletsNum; + return true; + } + + private long findSourceBackend(List bes, Map> beToTablets) { + long srcBe = -1; long maxTabletsNum = 0; - boolean srcDecommissioned = false; for (Long be : bes) { - long tabletNum = beToTablets.get(be) == null ? 0 : beToTablets.get(be).size(); - if (tabletNum > maxTabletsNum) { - srcBe = be; - maxTabletsNum = tabletNum; - } - + long tabletNum = beToTablets.getOrDefault(be, Collections.emptySet()).size(); Backend backend = cloudSystemInfoService.getBackend(be); - if (backend == null) { + + // Check if the backend is decommissioned + if (backend != null) { + if (backend.isDecommissioning() && tabletNum > 0) { + srcBe = be; // Mark as source if decommissioned and has tablets + break; // Exit early if we found a decommissioned backend + } + if (!backend.isDecommissioning() && tabletNum > maxTabletsNum) { + srcBe = be; + maxTabletsNum = tabletNum; + } + } else { LOG.info("backend {} not found", be); - continue; - } - if (tabletNum < minTabletsNum && backend.isAlive() && !backend.isDecommissioning() - && !backend.isSmoothUpgradeSrc()) { - destBe = be; - minTabletsNum = tabletNum; } } + return srcBe; + } + + private long findDestinationBackend(List bes, Map> beToTablets, long srcBe) { + long destBe = -1; + long minTabletsNum = Long.MAX_VALUE; for (Long be : bes) { - long tabletNum = beToTablets.get(be) == null ? 0 : beToTablets.get(be).size(); + long tabletNum = beToTablets.getOrDefault(be, Collections.emptySet()).size(); Backend backend = cloudSystemInfoService.getBackend(be); - if (backend == null) { - LOG.info("backend {} not found", be); - continue; - } - if (backend.isDecommissioning() && tabletNum > 0) { - srcBe = be; - srcDecommissioned = true; - break; + if (backend != null && backend.isAlive() && !backend.isDecommissioning() && !backend.isSmoothUpgradeSrc()) { + if (tabletNum < minTabletsNum) { + destBe = be; + minTabletsNum = tabletNum; + } } } + return destBe; + } + + private boolean isTransferValid(long srcBe, long minTabletsNum, long maxTabletsNum, long avgNum) { + boolean srcDecommissioned = cloudSystemInfoService.getBackend(srcBe).isDecommissioning(); if (!srcDecommissioned) { if ((maxTabletsNum < avgNum * (1 + Config.cloud_rebalance_percent_threshold) @@ -941,142 +968,183 @@ private boolean getTransferPair(List bes, Map> beToTabl return false; } } - - pairInfo.srcBe = srcBe; - pairInfo.destBe = destBe; - pairInfo.minTabletsNum = minTabletsNum; - pairInfo.maxTabletsNum = maxTabletsNum; return true; } private boolean isConflict(long srcBe, long destBe, CloudReplica cloudReplica, BalanceType balanceType, - Map>>> beToTabletsInParts, - Map>> beToTabletsInTables) { - if (balanceType == balanceType.GLOBAL) { - // check is conflict with partition balance - long maxBeSize = beToTabletsInParts.get(cloudReplica.getPartitionId()) - .get(cloudReplica.getIndexId()).get(srcBe).size(); - List destBeTablets = beToTabletsInParts.get(cloudReplica.getPartitionId()) - .get(cloudReplica.getIndexId()).get(destBe); - long minBeSize = destBeTablets == null ? 0 : destBeTablets.size(); - if (minBeSize >= maxBeSize) { - return true; - } - - // check is conflict with table balance - maxBeSize = beToTabletsInTables.get(cloudReplica.getTableId()).get(srcBe).size(); - destBeTablets = beToTabletsInTables.get(cloudReplica.getTableId()).get(destBe); - minBeSize = destBeTablets == null ? 0 : destBeTablets.size(); - if (minBeSize >= maxBeSize) { - return true; - } + Map>>> beToTabletsInParts, + Map>> beToTabletsInTables) { + if (cloudSystemInfoService.getBackend(srcBe).isDecommissioning()) { + return false; // If source BE is decommissioned, no conflict } - if (balanceType == balanceType.TABLE) { - // check is conflict with partition balance - long maxBeSize = beToTabletsInParts.get(cloudReplica.getPartitionId()) - .get(cloudReplica.getIndexId()).get(srcBe).size(); - List destBeTablets = beToTabletsInParts.get(cloudReplica.getPartitionId()) - .get(cloudReplica.getIndexId()).get(destBe); - long minBeSize = destBeTablets == null ? 0 : destBeTablets.size(); - return minBeSize >= maxBeSize; + if (balanceType == BalanceType.GLOBAL) { + return checkGlobalBalanceConflict(srcBe, destBe, cloudReplica, beToTabletsInParts, beToTabletsInTables); + } else if (balanceType == BalanceType.TABLE) { + return checkTableBalanceConflict(srcBe, destBe, cloudReplica, beToTabletsInParts); } return false; } - private void balanceImpl(List bes, String clusterId, Map> beToTablets, + private boolean checkGlobalBalanceConflict(long srcBe, long destBe, CloudReplica cloudReplica, + Map>>> beToTabletsInParts, + Map>> beToTabletsInTables) { + long maxBeSize = getTabletSizeInParts(srcBe, cloudReplica, beToTabletsInParts); + long minBeSize = getTabletSizeInParts(destBe, cloudReplica, beToTabletsInParts); + + if (minBeSize >= maxBeSize) { + return true; // Conflict detected + } + + maxBeSize = getTabletSizeInBes(srcBe, cloudReplica, beToTabletsInTables); + minBeSize = getTabletSizeInBes(destBe, cloudReplica, beToTabletsInTables); + + return minBeSize >= maxBeSize; // Conflict detected + } + + private boolean checkTableBalanceConflict(long srcBe, long destBe, CloudReplica cloudReplica, + Map>>> beToTabletsInParts) { + long maxBeSize = getTabletSizeInParts(srcBe, cloudReplica, beToTabletsInParts); + long minBeSize = getTabletSizeInParts(destBe, cloudReplica, beToTabletsInParts); + + return minBeSize >= maxBeSize; // Conflict detected + } + + private long getTabletSizeInParts(long beId, CloudReplica cloudReplica, + Map>>> beToTabletsInParts) { + Set tablets = beToTabletsInParts.get(cloudReplica.getPartitionId()) + .get(cloudReplica.getIndexId()).get(beId); + return tablets == null ? 0 : tablets.size(); + } + + private long getTabletSizeInBes(long beId, CloudReplica cloudReplica, + Map>> beToTabletsInTables) { + Set tablets = beToTabletsInTables.get(cloudReplica.getTableId()).get(beId); + return tablets == null ? 0 : tablets.size(); + } + + + private void balanceImpl(List bes, String clusterId, Map> beToTablets, BalanceType balanceType, List infos) { if (bes == null || bes.isEmpty() || beToTablets == null || beToTablets.isEmpty()) { return; } - long totalTabletsNum = 0; - long beNum = 0; - for (Long be : bes) { - long tabletNum = beToTablets.get(be) == null ? 0 : beToTablets.get(be).size(); - Backend backend = cloudSystemInfoService.getBackend(be); - if (backend != null && !backend.isDecommissioning()) { - beNum++; - } - totalTabletsNum += tabletNum; - } + long totalTabletsNum = calculateTotalTablets(bes, beToTablets); + long beNum = countActiveBackends(bes); + if (beNum == 0) { LOG.warn("zero be, but want balance, skip"); return; } + long avgNum = totalTabletsNum / beNum; - long transferNum = Math.max(Math.round(avgNum * Config.cloud_balance_tablet_percent_per_run), - Config.cloud_min_balance_tablet_num_per_run); + long transferNum = calculateTransferNum(avgNum); for (int i = 0; i < transferNum; i++) { TransferPairInfo pairInfo = new TransferPairInfo(); if (!getTransferPair(bes, beToTablets, avgNum, pairInfo)) { - // no need balance; - break; - } - - if (balanceType == balanceType.PARTITION) { - indexBalanced = false; + break; // no need balance } - if (balanceType == balanceType.TABLE) { - tableBalanced = false; - } + updateBalanceStatus(balanceType); long srcBe = pairInfo.srcBe; long destBe = pairInfo.destBe; - long minTabletsNum = pairInfo.minTabletsNum; - long maxTabletsNum = pairInfo.maxTabletsNum; - int randomIndex = rand.nextInt(beToTablets.get(srcBe).size()); - Tablet pickedTablet = beToTablets.get(srcBe).get(randomIndex); - CloudReplica cloudReplica = (CloudReplica) pickedTablet.getReplicas().get(0); + Tablet pickedTablet = pickRandomTablet(beToTablets.get(srcBe)); + if (pickedTablet == null) { + continue; // No tablet to pick + } + CloudReplica cloudReplica = (CloudReplica) pickedTablet.getReplicas().get(0); Backend srcBackend = Env.getCurrentSystemInfo().getBackend(srcBe); - // if srcBe is dead, destBe cann't download cache from it, preheating will failed + if (Config.enable_cloud_warm_up_for_rebalance && srcBackend != null && srcBackend.isAlive()) { - if (isConflict(srcBe, destBe, cloudReplica, balanceType, futurePartitionToTablets, - futureBeToTabletsInTable)) { + if (isConflict(srcBe, destBe, cloudReplica, balanceType, + futurePartitionToTablets, futureBeToTabletsInTable)) { continue; } - - try { - sendPreHeatingRpc(pickedTablet, srcBe, destBe); - } catch (Exception e) { - break; - } - - InfightTask task = new InfightTask(); - task.pickedTablet = pickedTablet; - task.srcBe = srcBe; - task.destBe = destBe; - task.balanceType = balanceType; - task.beToTablets = beToTablets; - task.startTimestamp = System.currentTimeMillis() / 1000; - tabletToInfightTask.put(new InfightTablet(pickedTablet.getId(), clusterId), task); - - LOG.info("pre cache {} from {} to {}, cluster {} minNum {} maxNum {} beNum {} tabletsNum {}, part {}", - pickedTablet.getId(), srcBe, destBe, clusterId, - minTabletsNum, maxTabletsNum, beNum, totalTabletsNum, cloudReplica.getPartitionId()); - updateBeToTablets(pickedTablet, srcBe, destBe, balanceType, - futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets); + preheatAndUpdateTablet(pickedTablet, srcBe, destBe, clusterId, balanceType, beToTablets); } else { if (isConflict(srcBe, destBe, cloudReplica, balanceType, partitionToTablets, beToTabletsInTable)) { continue; } + transferTablet(pickedTablet, srcBe, destBe, clusterId, balanceType, infos); + } + } + } - LOG.info("transfer {} from {} to {}, cluster {} minNum {} maxNum {} beNum {} tabletsNum {}, part {}", - pickedTablet.getId(), srcBe, destBe, clusterId, - minTabletsNum, maxTabletsNum, beNum, totalTabletsNum, cloudReplica.getPartitionId()); + private long calculateTotalTablets(List bes, Map> beToTablets) { + return bes.stream() + .mapToLong(be -> beToTablets.getOrDefault(be, Collections.emptySet()).size()) + .sum(); + } - updateBeToTablets(pickedTablet, srcBe, destBe, balanceType, beToTabletsGlobal, - beToTabletsInTable, partitionToTablets); - updateBeToTablets(pickedTablet, srcBe, destBe, balanceType, - futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets); - updateClusterToBeMap(pickedTablet, destBe, clusterId, infos); - } + private long countActiveBackends(List bes) { + return bes.stream() + .filter(be -> { + Backend backend = cloudSystemInfoService.getBackend(be); + return backend != null && !backend.isDecommissioning(); + }) + .count(); + } + + private long calculateTransferNum(long avgNum) { + return Math.max(Math.round(avgNum * Config.cloud_balance_tablet_percent_per_run), + Config.cloud_min_balance_tablet_num_per_run); + } + + private void updateBalanceStatus(BalanceType balanceType) { + if (balanceType == BalanceType.PARTITION) { + indexBalanced = false; + } else if (balanceType == BalanceType.TABLE) { + tableBalanced = false; + } + } + + private Tablet pickRandomTablet(Set tablets) { + if (tablets.isEmpty()) { + return null; + } + int randomIndex = rand.nextInt(tablets.size()); + return tablets.stream().skip(randomIndex).findFirst().orElse(null); + } + + private void preheatAndUpdateTablet(Tablet pickedTablet, long srcBe, long destBe, String clusterId, + BalanceType balanceType, Map> beToTablets) { + try { + sendPreHeatingRpc(pickedTablet, srcBe, destBe); + } catch (Exception e) { + LOG.warn("Failed to preheat tablet {} from {} to {}, " + + "help msg turn off fe config enable_cloud_warm_up_for_rebalance", + pickedTablet.getId(), srcBe, destBe, e); + return; } + + InfightTask task = new InfightTask(); + task.pickedTablet = pickedTablet; + task.srcBe = srcBe; + task.destBe = destBe; + task.balanceType = balanceType; + task.beToTablets = beToTablets; + task.startTimestamp = System.currentTimeMillis() / 1000; + tabletToInfightTask.put(new InfightTablet(pickedTablet.getId(), clusterId), task); + + LOG.info("pre cache {} from {} to {}, cluster {}", pickedTablet.getId(), srcBe, destBe, clusterId); + updateBeToTablets(pickedTablet, srcBe, destBe, + futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets); + } + + private void transferTablet(Tablet pickedTablet, long srcBe, long destBe, String clusterId, + BalanceType balanceType, List infos) { + LOG.info("transfer {} from {} to {}, cluster {}", pickedTablet.getId(), srcBe, destBe, clusterId); + updateBeToTablets(pickedTablet, srcBe, destBe, + beToTabletsGlobal, beToTabletsInTable, partitionToTablets); + updateBeToTablets(pickedTablet, srcBe, destBe, + futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets); + updateClusterToBeMap(pickedTablet, destBe, clusterId, infos); } public void addTabletMigrationTask(Long srcBe, Long dstBe) { @@ -1088,7 +1156,7 @@ public void addTabletMigrationTask(Long srcBe, Long dstBe) { */ private void migrateTablets(Long srcBe, Long dstBe) { // get tablets - List tablets = beToTabletsGlobal.get(srcBe); + Set tablets = beToTabletsGlobal.get(srcBe); if (tablets == null || tablets.isEmpty()) { LOG.info("smooth upgrade srcBe={} does not have any tablets, set inactive", srcBe); ((CloudEnv) Env.getCurrentEnv()).getCloudUpgradeMgr().setBeStateInactive(srcBe); @@ -1224,6 +1292,4 @@ private List batchUpdateCloudReplicaInfoEditlogs(List