diff --git a/fe/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java b/fe/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java index cb02c7415cf9a3..8740396f7fc964 100644 --- a/fe/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java +++ b/fe/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java @@ -270,6 +270,10 @@ public List getPathStatistics() { return pathStatistics; } + public long getAvailPathNum() { + return pathStatistics.stream().filter(p -> p.getDiskState() == DiskState.ONLINE).count(); + } + public String getBrief() { StringBuilder sb = new StringBuilder(); sb.append(beId).append(": replica: ").append(totalReplicaNum); diff --git a/fe/src/main/java/org/apache/doris/clone/LoadBalancer.java b/fe/src/main/java/org/apache/doris/clone/LoadBalancer.java index ce14f412235227..abf81181bf6043 100644 --- a/fe/src/main/java/org/apache/doris/clone/LoadBalancer.java +++ b/fe/src/main/java/org/apache/doris/clone/LoadBalancer.java @@ -80,35 +80,40 @@ private List selectAlternativeTabletsForCluster( List alternativeTablets = Lists.newArrayList(); // get classification of backends - List lowBe = Lists.newArrayList(); - List midBe = Lists.newArrayList(); - List highBe = Lists.newArrayList(); - clusterStat.getBackendStatisticByClass(lowBe, midBe, highBe); + List lowBEs = Lists.newArrayList(); + List midBEs = Lists.newArrayList(); + List highBEs = Lists.newArrayList(); + clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs); - if (lowBe.isEmpty() && highBe.isEmpty()) { + if (lowBEs.isEmpty() && highBEs.isEmpty()) { LOG.info("cluster is balance: {}. skip", clusterName); return alternativeTablets; } // first we should check if low backends is available. // if all low backends is not available, we should not start balance - if (lowBe.stream().allMatch(b -> !b.isAvailable())) { + if (lowBEs.stream().allMatch(b -> !b.isAvailable())) { LOG.info("all low load backends is dead: {}. skip", - lowBe.stream().mapToLong(b -> b.getBeId()).toArray()); + lowBEs.stream().mapToLong(b -> b.getBeId()).toArray()); return alternativeTablets; } - if (lowBe.stream().allMatch(b -> !b.hasAvailDisk())) { + if (lowBEs.stream().allMatch(b -> !b.hasAvailDisk())) { LOG.info("all low load backends have no available disk. skip", - lowBe.stream().mapToLong(b -> b.getBeId()).toArray()); + lowBEs.stream().mapToLong(b -> b.getBeId()).toArray()); return alternativeTablets; } + // get the number of low load paths. and we should at most select this number of tablets + long numOfLowPaths = lowBEs.stream().filter(b -> b.isAvailable() && b.hasAvailDisk()).mapToLong( + b -> b.getAvailPathNum()).sum(); + LOG.info("get number of low load paths: {}", numOfLowPaths); + // choose tablets from high load backends. // BackendLoadStatistic is sorted by load score in ascend order, // so we need to traverse it from last to first - for (int i = highBe.size() - 1; i >= 0; i--) { - BackendLoadStatistic beStat = highBe.get(i); + OUTER: for (int i = highBEs.size() - 1; i >= 0; i--) { + BackendLoadStatistic beStat = highBEs.get(i); // classify the paths. Set pathLow = Sets.newHashSet(); @@ -156,6 +161,10 @@ private List selectAlternativeTabletsForCluster( tabletCtx.setOrigPriority(Priority.LOW); alternativeTablets.add(tabletCtx); + if (--numOfLowPaths <= 0) { + // enough + break OUTER; + } // update remaining paths int remaining = remainingPaths.get(replicaPathHash) - 1; diff --git a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java index ec2425552b68f0..08b50d3e032031 100644 --- a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -92,9 +92,12 @@ public class TabletScheduler extends Daemon { public static final int BALANCE_SLOT_NUM_FOR_PATH = 2; - // if the number of scheduled tablets in TabletScheduler exceed this threshold + // if the number of scheduled tablets in TabletScheduler exceed this threshold, // skip checking. public static final int MAX_SCHEDULING_TABLETS = 5000; + // if the number of balancing tablets in TabletScheduler exceed this threshold, + // no more balance check + public static final int MAX_BALANCING_TABLETS = 500; /* * Tablet is added to pendingTablets as well it's id in allTabletIds. @@ -214,7 +217,7 @@ public synchronized AddResult addTablet(TabletSchedCtx tablet, boolean force) { && (pendingTablets.size() > MAX_SCHEDULING_TABLETS || runningTablets.size() > MAX_SCHEDULING_TABLETS)) { return AddResult.LIMIT_EXCEED; } - + allTabletIds.add(tablet.getTabletId()); pendingTablets.offer(tablet); return AddResult.ADDED; @@ -753,6 +756,13 @@ private void selectTabletsForBalance() { LOG.info("balance is disabled. skip selecting tablets for balance"); return; } + + long numOfBalancingTablets = getBalanceTabletsNumber(); + if (numOfBalancingTablets > MAX_BALANCING_TABLETS) { + LOG.info("number of balancing tablets {} exceed limit: {}, skip selecting tablets for balance", + numOfBalancingTablets, MAX_BALANCING_TABLETS); + return; + } LoadBalancer loadBalancer = new LoadBalancer(statisticMap); List alternativeTablets = loadBalancer.selectAlternativeTablets(); @@ -1055,6 +1065,11 @@ public synchronized int getTotalNum() { return allTabletIds.size(); } + public synchronized long getBalanceTabletsNumber() { + return pendingTablets.stream().filter(t -> t.getType() == Type.BALANCE).count() + + runningTablets.values().stream().filter(t -> t.getType() == Type.BALANCE).count(); + } + /* * PathSlot keeps track of slot num per path of a Backend. * Each path on a Backend has several slot.