Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ public List<RootPathLoadStatistic> getPathStatistics() {
return pathStatistics;
}

public long getAvailPathNum() {
return pathStatistics.stream().filter(p -> p.getDiskState() == DiskState.ONLINE).count();
}

public String getBrief() {
StringBuilder sb = new StringBuilder();
sb.append(beId).append(": replica: ").append(totalReplicaNum);
Expand Down
31 changes: 20 additions & 11 deletions fe/src/main/java/org/apache/doris/clone/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,35 +80,40 @@ private List<TabletSchedCtx> selectAlternativeTabletsForCluster(
List<TabletSchedCtx> alternativeTablets = Lists.newArrayList();

// get classification of backends
List<BackendLoadStatistic> lowBe = Lists.newArrayList();
List<BackendLoadStatistic> midBe = Lists.newArrayList();
List<BackendLoadStatistic> highBe = Lists.newArrayList();
clusterStat.getBackendStatisticByClass(lowBe, midBe, highBe);
List<BackendLoadStatistic> lowBEs = Lists.newArrayList();
List<BackendLoadStatistic> midBEs = Lists.newArrayList();
List<BackendLoadStatistic> highBEs = Lists.newArrayList();
clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs);

if (lowBe.isEmpty() && highBe.isEmpty()) {
if (lowBEs.isEmpty() && highBEs.isEmpty()) {
LOG.info("cluster is balance: {}. skip", clusterName);
return alternativeTablets;
}

// first we should check if low backends is available.
// if all low backends is not available, we should not start balance
if (lowBe.stream().allMatch(b -> !b.isAvailable())) {
if (lowBEs.stream().allMatch(b -> !b.isAvailable())) {
LOG.info("all low load backends is dead: {}. skip",
lowBe.stream().mapToLong(b -> b.getBeId()).toArray());
lowBEs.stream().mapToLong(b -> b.getBeId()).toArray());
return alternativeTablets;
}

if (lowBe.stream().allMatch(b -> !b.hasAvailDisk())) {
if (lowBEs.stream().allMatch(b -> !b.hasAvailDisk())) {
LOG.info("all low load backends have no available disk. skip",
lowBe.stream().mapToLong(b -> b.getBeId()).toArray());
lowBEs.stream().mapToLong(b -> b.getBeId()).toArray());
return alternativeTablets;
}

// get the number of low load paths. and we should at most select this number of tablets
long numOfLowPaths = lowBEs.stream().filter(b -> b.isAvailable() && b.hasAvailDisk()).mapToLong(
b -> b.getAvailPathNum()).sum();
LOG.info("get number of low load paths: {}", numOfLowPaths);

// choose tablets from high load backends.
// BackendLoadStatistic is sorted by load score in ascend order,
// so we need to traverse it from last to first
for (int i = highBe.size() - 1; i >= 0; i--) {
BackendLoadStatistic beStat = highBe.get(i);
OUTER: for (int i = highBEs.size() - 1; i >= 0; i--) {
BackendLoadStatistic beStat = highBEs.get(i);

// classify the paths.
Set<Long> pathLow = Sets.newHashSet();
Expand Down Expand Up @@ -156,6 +161,10 @@ private List<TabletSchedCtx> selectAlternativeTabletsForCluster(
tabletCtx.setOrigPriority(Priority.LOW);

alternativeTablets.add(tabletCtx);
if (--numOfLowPaths <= 0) {
// enough
break OUTER;
}

// update remaining paths
int remaining = remainingPaths.get(replicaPathHash) - 1;
Expand Down
19 changes: 17 additions & 2 deletions fe/src/main/java/org/apache/doris/clone/TabletScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,12 @@ public class TabletScheduler extends Daemon {

public static final int BALANCE_SLOT_NUM_FOR_PATH = 2;

// if the number of scheduled tablets in TabletScheduler exceed this threshold
// if the number of scheduled tablets in TabletScheduler exceed this threshold,
// skip checking.
public static final int MAX_SCHEDULING_TABLETS = 5000;
// if the number of balancing tablets in TabletScheduler exceed this threshold,
// no more balance check
public static final int MAX_BALANCING_TABLETS = 500;

/*
* Tablet is added to pendingTablets as well it's id in allTabletIds.
Expand Down Expand Up @@ -214,7 +217,7 @@ public synchronized AddResult addTablet(TabletSchedCtx tablet, boolean force) {
&& (pendingTablets.size() > MAX_SCHEDULING_TABLETS || runningTablets.size() > MAX_SCHEDULING_TABLETS)) {
return AddResult.LIMIT_EXCEED;
}

allTabletIds.add(tablet.getTabletId());
pendingTablets.offer(tablet);
return AddResult.ADDED;
Expand Down Expand Up @@ -753,6 +756,13 @@ private void selectTabletsForBalance() {
LOG.info("balance is disabled. skip selecting tablets for balance");
return;
}

long numOfBalancingTablets = getBalanceTabletsNumber();
if (numOfBalancingTablets > MAX_BALANCING_TABLETS) {
LOG.info("number of balancing tablets {} exceed limit: {}, skip selecting tablets for balance",
numOfBalancingTablets, MAX_BALANCING_TABLETS);
return;
}

LoadBalancer loadBalancer = new LoadBalancer(statisticMap);
List<TabletSchedCtx> alternativeTablets = loadBalancer.selectAlternativeTablets();
Expand Down Expand Up @@ -1055,6 +1065,11 @@ public synchronized int getTotalNum() {
return allTabletIds.size();
}

public synchronized long getBalanceTabletsNumber() {
return pendingTablets.stream().filter(t -> t.getType() == Type.BALANCE).count()
+ runningTablets.values().stream().filter(t -> t.getType() == Type.BALANCE).count();
}

/*
* PathSlot keeps track of slot num per path of a Backend.
* Each path on a Backend has several slot.
Expand Down