From acd4dd30b53f30a3a6b90bac769cad21942f02a9 Mon Sep 17 00:00:00 2001 From: yujun Date: Wed, 25 Sep 2024 11:12:25 +0800 Subject: [PATCH] [improvement](tablet scheduler) fix higher priority tablet add failed due to pending queue full (#41076) When tablet checker search for unhealth tablets, it will search table one by one. If the pending queue is full, then the later search tablets couldn't add into the pending queue, even if they are higher priorty. So we use a minmax queue to holding the tablets. If a tablet is higher priority and the pending queue is full, it will envit the lowest priority tablet in the queue, then add this tablet into queue; other: 1. increase mow version incomplete task sched priority; --- .../ColocateTableCheckerAndBalancer.java | 11 +++--- .../org/apache/doris/clone/TabletChecker.java | 12 +++++-- .../apache/doris/clone/TabletSchedCtx.java | 4 +-- .../apache/doris/clone/TabletScheduler.java | 33 ++++++++++++----- .../doris/clone/TabletSchedCtxTest.java | 36 +++++++++++++++++-- 5 files changed, 77 insertions(+), 19 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java index 4febec9e922706..7727bc77e18667 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java @@ -561,15 +561,17 @@ private void matchGroups() { tabletCtx.setIsUniqKeyMergeOnWrite(isUniqKeyMergeOnWrite); AddResult res = tabletScheduler.addTablet(tabletCtx, false /* not force */); - if (res == AddResult.LIMIT_EXCEED || res == AddResult.DISABLED) { + if (res == AddResult.DISABLED) { // tablet in scheduler exceed limit, or scheduler is disabled, // skip this group and check next one. LOG.info("tablet scheduler return: {}. stop colocate table check", res.name()); break OUT; } else if (res == AddResult.ADDED) { counter.addToSchedulerTabletNum++; - } else { + } else if (res == AddResult.ALREADY_IN) { counter.tabletInScheduler++; + } else if (res == AddResult.REPLACE_ADDED || res == AddResult.LIMIT_EXCEED) { + counter.tabletExceedLimit++; } } } @@ -589,9 +591,10 @@ private void matchGroups() { } // end for groups long cost = System.currentTimeMillis() - start; - LOG.info("finished to check tablets. unhealth/total/added/in_sched/not_ready: {}/{}/{}/{}/{}, cost: {} ms", + LOG.info("finished to check tablets. unhealth/total/added/in_sched/not_ready/exceed_limit: {}/{}/{}/{}/{}/{}, " + + "cost: {} ms", counter.unhealthyTabletNum, counter.totalTabletNum, counter.addToSchedulerTabletNum, - counter.tabletInScheduler, counter.tabletNotReady, cost); + counter.tabletInScheduler, counter.tabletNotReady, counter.tabletExceedLimit, cost); } private GlobalColocateStatistic buildGlobalColocateStatistic() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java index f35282d37b64b4..78795e54f95c72 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java @@ -78,6 +78,7 @@ public class TabletChecker extends MasterDaemon { put("added", new AtomicLong(0L)); put("in_sched", new AtomicLong(0L)); put("not_ready", new AtomicLong(0L)); + put("exceed_limit", new AtomicLong(0L)); } }; @@ -224,6 +225,7 @@ public static class CheckerCounter { public long addToSchedulerTabletNum = 0; public long tabletInScheduler = 0; public long tabletNotReady = 0; + public long tabletExceedLimit = 0; } private enum LoopControlStatus { @@ -344,10 +346,12 @@ private void checkTablets() { tabletCountByStatus.get("added").set(counter.addToSchedulerTabletNum); tabletCountByStatus.get("in_sched").set(counter.tabletInScheduler); tabletCountByStatus.get("not_ready").set(counter.tabletNotReady); + tabletCountByStatus.get("exceed_limit").set(counter.tabletExceedLimit); - LOG.info("finished to check tablets. unhealth/total/added/in_sched/not_ready: {}/{}/{}/{}/{}, cost: {} ms", + LOG.info("finished to check tablets. unhealth/total/added/in_sched/not_ready/exceed_limit: {}/{}/{}/{}/{}/{}," + + "cost: {} ms", counter.unhealthyTabletNum, counter.totalTabletNum, counter.addToSchedulerTabletNum, - counter.tabletInScheduler, counter.tabletNotReady, cost); + counter.tabletInScheduler, counter.tabletNotReady, counter.tabletExceedLimit, cost); } private LoopControlStatus handlePartitionTablet(Database db, OlapTable tbl, Partition partition, boolean isInPrios, @@ -404,11 +408,13 @@ private LoopControlStatus handlePartitionTablet(Database db, OlapTable tbl, Part tabletCtx.setIsUniqKeyMergeOnWrite(isUniqKeyMergeOnWrite); AddResult res = tabletScheduler.addTablet(tabletCtx, false /* not force */); - if (res == AddResult.LIMIT_EXCEED || res == AddResult.DISABLED) { + if (res == AddResult.DISABLED) { LOG.info("tablet scheduler return: {}. stop tablet checker", res.name()); return LoopControlStatus.BREAK_OUT; } else if (res == AddResult.ADDED) { counter.addToSchedulerTabletNum++; + } else if (res == AddResult.REPLACE_ADDED || res == AddResult.LIMIT_EXCEED) { + counter.tabletExceedLimit++; } } } // indices diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index d004d21f79c82a..5e29adbb6da817 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -1334,13 +1334,13 @@ private long getCompareValue() { if (tabletHealth.aliveAndVersionCompleteNum < (replicaNum / 2 + 1)) { value -= 3 * baseTime; if (tabletHealth.hasRecentLoadFailed) { - value -= 3 * baseTime; + value -= 4 * baseTime; } } if (tabletHealth.hasAliveAndVersionIncomplete) { value -= 1 * baseTime; if (isUniqKeyMergeOnWrite) { - value -= 1 * baseTime; + value -= 2 * baseTime; } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 9768cc764ef4e2..97c9be0e887de1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -71,6 +71,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Sets; import com.google.common.collect.Table; import org.apache.logging.log4j.LogManager; @@ -81,7 +82,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.PriorityQueue; import java.util.Queue; import java.util.Set; @@ -120,7 +120,7 @@ public class TabletScheduler extends MasterDaemon { * * pendingTablets, allTabletTypes, runningTablets and schedHistory are protected by 'synchronized' */ - private PriorityQueue pendingTablets = new PriorityQueue<>(); + private MinMaxPriorityQueue pendingTablets = MinMaxPriorityQueue.create(); private Map allTabletTypes = Maps.newHashMap(); // contains all tabletCtxs which state are RUNNING private Map runningTablets = Maps.newHashMap(); @@ -149,6 +149,7 @@ public enum AddResult { ADDED, // success to add ALREADY_IN, // already added, skip LIMIT_EXCEED, // number of pending tablets exceed the limit + REPLACE_ADDED, // succ to add, and envit a lowest task DISABLED // scheduler has been disabled. } @@ -268,12 +269,22 @@ public synchronized AddResult addTablet(TabletSchedCtx tablet, boolean force) { return AddResult.ALREADY_IN; } + AddResult addResult = AddResult.ADDED; // if this is not a force add, // and number of scheduling tablets exceed the limit, // refuse to add. - if (!force && (pendingTablets.size() > Config.max_scheduling_tablets - || runningTablets.size() > Config.max_scheduling_tablets)) { - return AddResult.LIMIT_EXCEED; + if (!force && (pendingTablets.size() >= Config.max_scheduling_tablets + || runningTablets.size() >= Config.max_scheduling_tablets)) { + // For a sched tablet, if its compare value is bigger, it will be more close to queue's tail position, + // and its priority is lower. + TabletSchedCtx lowestPriorityTablet = pendingTablets.peekLast(); + if (lowestPriorityTablet == null || lowestPriorityTablet.compareTo(tablet) <= 0) { + return AddResult.LIMIT_EXCEED; + } + addResult = AddResult.REPLACE_ADDED; + pendingTablets.pollLast(); + finalizeTabletCtx(lowestPriorityTablet, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE, + "envit lower priority sched tablet because pending queue is full"); } if (!contains || tablet.getType() == TabletSchedCtx.Type.REPAIR) { @@ -285,7 +296,7 @@ public synchronized AddResult addTablet(TabletSchedCtx tablet, boolean force) { LOG.info("Add tablet to pending queue, {}", tablet); } - return AddResult.ADDED; + return addResult; } @@ -306,11 +317,12 @@ public synchronized void cancelRebalanceDisk(AdminCancelRebalanceDiskStmt stmt) * Iterate current tablets, change their priority to VERY_HIGH if necessary. */ public synchronized void changeTabletsPriorityToVeryHigh(long dbId, long tblId, List partitionIds) { - PriorityQueue newPendingTablets = new PriorityQueue<>(); + MinMaxPriorityQueue newPendingTablets = MinMaxPriorityQueue.create(); for (TabletSchedCtx tabletCtx : pendingTablets) { if (tabletCtx.getDbId() == dbId && tabletCtx.getTblId() == tblId && partitionIds.contains(tabletCtx.getPartitionId())) { tabletCtx.setPriority(Priority.VERY_HIGH); + tabletCtx.setLastVisitedTime(1L); } newPendingTablets.add(tabletCtx); } @@ -1745,7 +1757,7 @@ private synchronized List getNextTabletCtxBatch() { slotNum = 1; } while (list.size() < Config.schedule_batch_size && slotNum > 0) { - TabletSchedCtx tablet = pendingTablets.poll(); + TabletSchedCtx tablet = pendingTablets.pollFirst(); if (tablet == null) { // no more tablets break; @@ -1947,6 +1959,11 @@ public void handleRunningTablets() { }); } + // only use for fe ut + public MinMaxPriorityQueue getPendingTabletQueue() { + return pendingTablets; + } + public List> getPendingTabletsInfo(int limit) { return collectTabletCtx(getPendingTablets(limit)); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java index 852f072eca1c35..a8f48949239d46 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java @@ -17,25 +17,57 @@ package org.apache.doris.clone; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.clone.TabletSchedCtx.Priority; import org.apache.doris.clone.TabletSchedCtx.Type; +import org.apache.doris.common.Config; import com.google.common.collect.Lists; +import com.google.common.collect.MinMaxPriorityQueue; import org.junit.Assert; import org.junit.Test; import java.util.Collections; import java.util.List; -import java.util.PriorityQueue; public class TabletSchedCtxTest { + @Test + public void testAddTablet() { + List tablets = Lists.newArrayList(); + ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION; + for (long i = 0; i < 20; i++) { + tablets.add(new TabletSchedCtx(Type.REPAIR, 1, 2, 3, 4, + i, replicaAlloc, i)); + tablets.add(new TabletSchedCtx(Type.BALANCE, 1, 2, 3, 4, + 1000 + i, replicaAlloc, i)); + } + Collections.shuffle(tablets); + Config.max_scheduling_tablets = 5; + TabletScheduler scheduler = Env.getCurrentEnv().getTabletScheduler(); + for (TabletSchedCtx tablet : tablets) { + scheduler.addTablet(tablet, false); + } + + MinMaxPriorityQueue queue = scheduler.getPendingTabletQueue(); + List gotTablets = Lists.newArrayList(); + while (!queue.isEmpty()) { + gotTablets.add(queue.pollFirst()); + } + Assert.assertEquals(Config.max_scheduling_tablets, gotTablets.size()); + for (int i = 0; i < gotTablets.size(); i++) { + TabletSchedCtx tablet = gotTablets.get(i); + Assert.assertEquals(Type.REPAIR, tablet.getType()); + Assert.assertEquals((long) i, tablet.getCreateTime()); + } + } + @Test public void testPriorityCompare() { // equal priority, but info3's last visit time is earlier than info2 and info1, so info1 should ranks ahead - PriorityQueue pendingTablets = new PriorityQueue<>(); + MinMaxPriorityQueue pendingTablets = MinMaxPriorityQueue.create(); ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION; TabletSchedCtx ctx1 = new TabletSchedCtx(Type.REPAIR, 1, 2, 3, 4, 1000, replicaAlloc, System.currentTimeMillis());