diff --git a/be/src/cloud/cloud_backend_service.cpp b/be/src/cloud/cloud_backend_service.cpp index d91e9e416b81a1..2dc6d03ebf698d 100644 --- a/be/src/cloud/cloud_backend_service.cpp +++ b/be/src/cloud/cloud_backend_service.cpp @@ -180,6 +180,11 @@ void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncRespon const TCheckWarmUpCacheAsyncRequest& request) { std::map task_done; _engine.file_cache_block_downloader().check_download_task(request.tablets, &task_done); + DBUG_EXECUTE_IF("CloudBackendService.check_warm_up_cache_async.return_task_false", { + for (auto& it : task_done) { + it.second = false; + } + }); response.__set_task_done(task_done); Status st = Status::OK(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java index 73ddbe4c4551ae..fc580c4fc7eca5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java @@ -46,6 +46,7 @@ import org.apache.doris.thrift.TWarmUpCacheAsyncResponse; import com.google.common.base.Preconditions; +import lombok.Getter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -54,6 +55,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -94,7 +96,7 @@ public class CloudTabletRebalancer extends MasterDaemon { private LinkedBlockingQueue> tabletsMigrateTasks = new LinkedBlockingQueue>(); - private Map tabletToInfightTask = new HashMap(); + private Map tabletToInfightTask = new HashMap<>(); private long assignedErrNum = 0; @@ -115,12 +117,39 @@ public enum BalanceType { PARTITION } + @Getter + private class InfightTablet { + private final Long tabletId; + private final String clusterId; + + public InfightTablet(Long tabletId, String clusterId) { + this.tabletId = tabletId; + this.clusterId = clusterId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + InfightTablet that = (InfightTablet) o; + return tabletId.equals(that.tabletId) && clusterId.equals(that.clusterId); + } + + @Override + public int hashCode() { + return Objects.hash(tabletId, clusterId); + } + } + private class InfightTask { public Tablet pickedTablet; public long srcBe; public long destBe; public boolean isGlobal; - public String clusterId; public Map> beToTablets; public long startTimestamp; BalanceType balanceType; @@ -343,41 +372,44 @@ public void globalBalance() { } public void checkInflghtWarmUpCacheAsync() { - Map> beToTabletIds = new HashMap>(); + Map> beToInfightTasks = new HashMap>(); - for (Map.Entry entry : tabletToInfightTask.entrySet()) { - beToTabletIds.putIfAbsent(entry.getValue().destBe, new ArrayList()); - beToTabletIds.get(entry.getValue().destBe).add(entry.getValue().pickedTablet.getId()); + for (Map.Entry entry : tabletToInfightTask.entrySet()) { + beToInfightTasks.putIfAbsent(entry.getValue().destBe, new ArrayList<>()); + beToInfightTasks.get(entry.getValue().destBe).add(entry.getValue()); } List infos = new ArrayList<>(); - for (Map.Entry> entry : beToTabletIds.entrySet()) { + for (Map.Entry> entry : beToInfightTasks.entrySet()) { LOG.info("before pre cache check dest be {} inflight task num {}", entry.getKey(), entry.getValue().size()); Backend destBackend = cloudSystemInfoService.getBackend(entry.getKey()); if (destBackend == null) { - for (long tabletId : entry.getValue()) { - tabletToInfightTask.remove(tabletId); + for (InfightTask task : entry.getValue()) { + for (InfightTablet key : tabletToInfightTask.keySet()) { + tabletToInfightTask.remove(new InfightTablet(task.pickedTablet.getId(), key.clusterId)); + } } continue; } - - Map taskDone = sendCheckWarmUpCacheAsyncRpc(entry.getValue(), entry.getKey()); + List tablets = entry.getValue().stream() + .map(task -> task.pickedTablet.getId()).collect(Collectors.toList()); + Map taskDone = sendCheckWarmUpCacheAsyncRpc(tablets, entry.getKey()); if (taskDone == null) { LOG.warn("sendCheckWarmUpCacheAsyncRpc return null be {}, inFight tasks {}", entry.getKey(), entry.getValue()); continue; } - + String clusterId = cloudSystemInfoService.getBackend(entry.getKey()).getCloudClusterId(); for (Map.Entry result : taskDone.entrySet()) { - InfightTask task = tabletToInfightTask.get(result.getKey()); - if (result.getValue() - || System.currentTimeMillis() / 1000 - task.startTimestamp - > Config.cloud_pre_heating_time_limit_sec) { + InfightTask task = tabletToInfightTask + .getOrDefault(new InfightTablet(result.getKey(), clusterId), null); + if (task != null && (result.getValue() || System.currentTimeMillis() / 1000 - task.startTimestamp + > Config.cloud_pre_heating_time_limit_sec)) { if (!result.getValue()) { LOG.info("{} pre cache timeout, forced to change the mapping", result.getKey()); } - updateClusterToBeMap(task.pickedTablet, task.destBe, task.clusterId, infos); - tabletToInfightTask.remove(result.getKey()); + updateClusterToBeMap(task.pickedTablet, task.destBe, clusterId, infos); + tabletToInfightTask.remove(new InfightTablet(task.pickedTablet.getId(), clusterId)); } } } @@ -393,13 +425,13 @@ public void checkInflghtWarmUpCacheAsync() { } // recalculate inflight beToTablets, just for print the log - beToTabletIds = new HashMap>(); - for (Map.Entry entry : tabletToInfightTask.entrySet()) { - beToTabletIds.putIfAbsent(entry.getValue().destBe, new ArrayList()); - beToTabletIds.get(entry.getValue().destBe).add(entry.getValue().pickedTablet.getId()); + beToInfightTasks.clear(); + for (Map.Entry entry : tabletToInfightTask.entrySet()) { + beToInfightTasks.putIfAbsent(entry.getValue().destBe, new ArrayList<>()); + beToInfightTasks.get(entry.getValue().destBe).add(entry.getValue()); } - for (Map.Entry> entry : beToTabletIds.entrySet()) { + for (Map.Entry> entry : beToInfightTasks.entrySet()) { LOG.info("after pre cache check dest be {} inflight task num {}", entry.getKey(), entry.getValue().size()); } } @@ -449,7 +481,7 @@ public void checkDecommissionState(Map> clusterToBes) { } LOG.info("notify decommission response: {} ", response); } catch (RpcException e) { - LOG.info("failed to notify decommission {}", e); + LOG.info("failed to notify decommission", e); return; } beToDecommissionedTime.put(beId, System.currentTimeMillis() / 1000); @@ -552,8 +584,10 @@ public void statRouteInfo() { fillBeToTablets(bes.get(0), table.getId(), partition.getId(), index.getId(), tablet, tmpBeToTabletsGlobal, beToTabletsInTable, this.partitionToTablets); - if (tabletToInfightTask.containsKey(tablet.getId())) { - InfightTask task = tabletToInfightTask.get(tablet.getId()); + InfightTask task = tabletToInfightTask + .getOrDefault(new InfightTablet(tablet.getId(), cluster), null); + + if (task != null) { fillBeToTablets(task.destBe, table.getId(), partition.getId(), index.getId(), tablet, futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets); } else { @@ -808,9 +842,7 @@ private boolean isConflict(long srcBe, long destBe, CloudReplica cloudReplica, B List destBeTablets = beToTabletsInParts.get(cloudReplica.getPartitionId()) .get(cloudReplica.getIndexId()).get(destBe); long minBeSize = destBeTablets == null ? 0 : destBeTablets.size(); - if (minBeSize >= maxBeSize) { - return true; - } + return minBeSize >= maxBeSize; } return false; @@ -881,10 +913,9 @@ private void balanceImpl(List bes, String clusterId, Map + log.info("search fe log path: {}", feLogPath) + Map> circularRebalanceMap = [:] + boolean isCircularRebalanceDetected = false + + new File(feLogPath).text.tokenize('\n') + .findAll { it =~ /pre cache ([0-9]+) from ([0-9]+) to ([0-9]+), cluster ([a-zA-Z0-9_]+)/ } + .each { line -> + def (tabletId, fromBe, toBe, clusterId) = (line =~ /pre cache ([0-9]+) from ([0-9]+) to ([0-9]+), cluster ([a-zA-Z0-9_]+)/)[0][1..-1] + + String clusterPreCacheKey = "$clusterId-$tabletId" + + if (!circularRebalanceMap.containsKey(clusterPreCacheKey)) { + circularRebalanceMap[clusterPreCacheKey] = new ArrayList<>() + } + + List paths = circularRebalanceMap[clusterPreCacheKey] + + if (paths.contains(toBe)) { + isCircularRebalanceDetected = true + log.info("Circular rebalance detected for tabletId: {}, clusterId: {}", tabletId, clusterId) + assertFalse(true) + } + + paths << fromBe + circularRebalanceMap[clusterPreCacheKey] = paths + + if (!paths.contains(toBe)) { + paths << (toBe as String) + } + } + + if (!isCircularRebalanceDetected) { + log.info("No circular rebalance detected.") + } + } + + docker(options) { + def clusterName = "newcluster1" + // 添加一个新的cluster add_new_cluster + cluster.addBackend(2, clusterName) + + def ret = sql_return_maparray """show clusters""" + log.info("show clusters: {}", ret) + assertEquals(2, ret.size()) + + GetDebugPoint().enableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false") + sql """set global forward_to_master=false""" + + sql """ + CREATE TABLE table100 ( + class INT, + id INT, + score INT SUM + ) + AGGREGATE KEY(class, id) + DISTRIBUTED BY HASH(class) BUCKETS 48 + """ + + sql """ + INSERT INTO table100 VALUES (1, 1, 100); + """ + + dockerAwaitUntil(5) { + ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table100""" + log.info("replica distribution table100: {}", ret) + ret.size() == 5 + } + + sql """use @newcluster1""" + def result = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM table100; """ + assertEquals(5, result.size()) + int replicaNum = 0 + + for (def row : result) { + log.info("replica distribution: ${row} ".toString()) + if (row.CloudClusterName == "newcluster1") { + replicaNum = Integer.valueOf((String) row.ReplicaNum) + assertTrue(replicaNum <= 25 && replicaNum >= 23) + } + } + def fe1 = cluster.getFeByIndex(1) + String feLogPath = fe1.getLogFilePath() + // stop be id 1, 4 + cluster.stopBackends(1, 4) + // check log + sleep(10 * 1000) + check feLogPath + + // start be id 1, 4 + cluster.startBackends(1, 4) + GetDebugPoint().enableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false") + // check log + sleep(10 * 1000) + check feLogPath + } +}