From f863a2c81395784b50674eea8288703031fc4c83 Mon Sep 17 00:00:00 2001 From: yujun777 Date: Tue, 13 Aug 2024 22:59:10 +0800 Subject: [PATCH] fix partition rebalancer unstable --- .../doris/catalog/TabletInvertedIndex.java | 28 ++++++- .../doris/clone/LoadStatisticForTag.java | 12 ++- .../org/apache/doris/clone/MovesCacheMap.java | 4 + .../doris/clone/PartitionRebalancer.java | 24 ++++-- .../apache/doris/clone/TabletScheduler.java | 22 ++++-- .../TwoDimensionalGreedyRebalanceAlgo.java | 13 +--- .../clone/ClusterLoadStatisticsTest.java | 2 +- .../apache/doris/clone/DiskRebalanceTest.java | 2 +- .../doris/clone/PartitionRebalancerTest.java | 78 +++++++++++++++++++ .../org/apache/doris/clone/PathSlotTest.java | 6 +- .../org/apache/doris/clone/RebalanceTest.java | 2 +- ...TwoDimensionalGreedyRebalanceAlgoTest.java | 4 +- 12 files changed, 165 insertions(+), 32 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/clone/PartitionRebalancerTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index 08d947677aec91..b1c2a30a1137e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -18,6 +18,7 @@ package org.apache.doris.catalog; import org.apache.doris.catalog.Replica.ReplicaState; +import org.apache.doris.clone.PartitionRebalancer.TabletMove; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; @@ -806,7 +807,7 @@ public Map getReplicaToTabletMap() { // Only build from available bes, exclude colocate tables public Map> buildPartitionInfoBySkew( - List availableBeIds) { + List availableBeIds, Map> movesInProgress) { Set dbIds = Sets.newHashSet(); Set tableIds = Sets.newHashSet(); Set partitionIds = Sets.newHashSet(); @@ -830,6 +831,26 @@ public Map> buildPartit for (Table.Cell cell : cells) { Long tabletId = cell.getRowKey(); Long beId = cell.getColumnKey(); + Pair movePair = movesInProgress.get(tabletId); + TabletMove move = movePair != null ? movePair.first : null; + // there exists move from fromBe to toBe + if (move != null && beId == move.fromBe + && availableBeIds.contains(move.toBe)) { + + // if movePair.second == -1, it means toBe hadn't added this tablet but it will add later; + // otherwise it means toBe had added this tablet + boolean toBeHadReplica = movePair.second != -1L; + if (toBeHadReplica) { + // toBe had add this tablet, fromBe just ignore this tablet + continue; + } + + // later fromBe will delete this replica + // and toBe will add a replica + // so this replica should belong to toBe + beId = move.toBe; + } + try { Preconditions.checkState(availableBeIds.contains(beId), "dead be " + beId); TabletMeta tabletMeta = tabletMetaMap.get(tabletId); @@ -911,6 +932,11 @@ public PartitionBalanceInfo(PartitionBalanceInfo info) { this.indexId = info.indexId; this.beByReplicaCount = TreeMultimap.create(info.beByReplicaCount); } + + @Override + public String toString() { + return "[partition=" + partitionId + ", index=" + indexId + ", replicaNum2BeId=" + beByReplicaCount + "]"; + } } // just for ut diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java b/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java index 0b8aac65d2cca1..60a0d147917f6f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java @@ -20,6 +20,7 @@ import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.clone.BackendLoadStatistic.Classification; import org.apache.doris.clone.BackendLoadStatistic.LoadScore; +import org.apache.doris.clone.PartitionRebalancer.TabletMove; import org.apache.doris.common.Config; import org.apache.doris.common.Pair; import org.apache.doris.common.util.DebugPointUtil; @@ -50,6 +51,7 @@ public class LoadStatisticForTag { private final SystemInfoService infoService; private final TabletInvertedIndex invertedIndex; + private final Rebalancer rebalancer; private final Tag tag; @@ -68,10 +70,11 @@ public class LoadStatisticForTag { = Maps.newHashMap(); public LoadStatisticForTag(Tag tag, SystemInfoService infoService, - TabletInvertedIndex invertedIndex) { + TabletInvertedIndex invertedIndex, Rebalancer rebalancer) { this.tag = tag; this.infoService = infoService; this.invertedIndex = invertedIndex; + this.rebalancer = rebalancer; } public Tag getTag() { @@ -166,10 +169,13 @@ public void init() { // Multimap PartitionBalanceInfo> // PartitionBalanceInfo: > // Only count available bes here, aligned with the beByTotalReplicaCountMaps. - skewMaps = invertedIndex.buildPartitionInfoBySkew(beLoadStatistics.stream() + List availableBeIds = beLoadStatistics.stream() .filter(BackendLoadStatistic::isAvailable) .map(BackendLoadStatistic::getBeId) - .collect(Collectors.toList())); + .collect(Collectors.toList()); + Map> movesInProgress = rebalancer == null ? Maps.newHashMap() + : ((PartitionRebalancer) rebalancer).getMovesInProgress(); + skewMaps = invertedIndex.buildPartitionInfoBySkew(availableBeIds, movesInProgress); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/MovesCacheMap.java b/fe/fe-core/src/main/java/org/apache/doris/clone/MovesCacheMap.java index 0bd5c6d803d1a5..06484ecd54ae74 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/MovesCacheMap.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/MovesCacheMap.java @@ -86,6 +86,10 @@ public void updateMapping(Map statisticMap, long expir } } + public Map> getCacheMap() { + return cacheMap; + } + public MovesCache getCache(Tag tag, TStorageMedium medium) { Map mediumMoves = cacheMap.get(tag); if (mediumMoves != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java index 5af920c74fdde2..f6618ccd3cc98f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java @@ -30,6 +30,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.collect.TreeMultimap; @@ -304,7 +305,8 @@ protected void completeSchedCtx(TabletSchedCtx tabletCtx) List availPath = paths.stream().filter(path -> path.getStorageMedium() == tabletCtx.getStorageMedium() && path.isFit(tabletCtx.getTabletSize(), false) == BalanceStatus.OK) .map(RootPathLoadStatistic::getPathHash).collect(Collectors.toList()); - long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath, tabletCtx.getStorageMedium()); + long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath, tabletCtx.getTag(), + tabletCtx.getStorageMedium()); if (pathHash == -1) { throw new SchedException(SchedException.Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT, "paths has no available balance slot: " + availPath); @@ -368,12 +370,20 @@ public void updateLoadStatistic(Map statisticMap) { } } + public Map> getMovesInProgress() { + Map> moves = Maps.newHashMap(); + movesCacheMap.getCacheMap().values().forEach( + m -> m.values().forEach(cache -> moves.putAll(cache.get().asMap()))); + + return moves; + } + // Represents a concrete move of a tablet from one be to another. // Formed logically from a PartitionMove by specifying a tablet for the move. public static class TabletMove { - Long tabletId; - Long fromBe; - Long toBe; + public Long tabletId; + public Long fromBe; + public Long toBe; TabletMove(Long id, Long from, Long to) { this.tabletId = id; @@ -397,7 +407,11 @@ public static class ClusterBalanceInfo { TreeMultimap partitionInfoBySkew = TreeMultimap.create(Ordering.natural(), Ordering.arbitrary()); TreeMultimap beByTotalReplicaCount = TreeMultimap.create(); - } + @Override + public String toString() { + return "[partitionSkew=" + partitionInfoBySkew + ", totalReplicaNum2Be=" + beByTotalReplicaCount + "]"; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index a83308a650bad7..886fd709927eae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -67,10 +67,12 @@ import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.EvictingQueue; +import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.common.collect.Table; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -375,7 +377,7 @@ private void updateLoadStatistic() { Map newStatisticMap = Maps.newHashMap(); Set tags = infoService.getTags(); for (Tag tag : tags) { - LoadStatisticForTag loadStatistic = new LoadStatisticForTag(tag, infoService, invertedIndex); + LoadStatisticForTag loadStatistic = new LoadStatisticForTag(tag, infoService, invertedIndex, rebalancer); loadStatistic.init(); newStatisticMap.put(tag, loadStatistic); if (LOG.isDebugEnabled()) { @@ -2054,7 +2056,7 @@ public static class PathSlot { private Map pathSlots = Maps.newConcurrentMap(); private long beId; // only use in takeAnAvailBalanceSlotFrom, make pick RR - private Map lastPickPathHashs = Maps.newHashMap(); + private Table lastPickPathHashs = HashBasedTable.create(); public PathSlot(Map paths, long beId) { this.beId = beId; @@ -2204,14 +2206,22 @@ public synchronized long takeBalanceSlot(long pathHash) { return -1; } - public long takeAnAvailBalanceSlotFrom(List pathHashs, TStorageMedium medium) { + public long takeAnAvailBalanceSlotFrom(List pathHashs, Tag tag, TStorageMedium medium) { if (pathHashs.isEmpty()) { return -1; } + if (tag == null) { + tag = Tag.DEFAULT_BACKEND_TAG; + } + Collections.sort(pathHashs); synchronized (this) { - int preferSlotIndex = pathHashs.indexOf(lastPickPathHashs.getOrDefault(medium, -1L)) + 1; + Long lastPathHash = lastPickPathHashs.get(tag, medium); + if (lastPathHash == null) { + lastPathHash = -1L; + } + int preferSlotIndex = pathHashs.indexOf(lastPathHash) + 1; if (preferSlotIndex < 0 || preferSlotIndex >= pathHashs.size()) { preferSlotIndex = 0; } @@ -2219,14 +2229,14 @@ public long takeAnAvailBalanceSlotFrom(List pathHashs, TStorageMedium medi for (int i = preferSlotIndex; i < pathHashs.size(); i++) { long pathHash = pathHashs.get(i); if (takeBalanceSlot(pathHash) != -1) { - lastPickPathHashs.put(medium, pathHash); + lastPickPathHashs.put(tag, medium, pathHash); return pathHash; } } for (int i = 0; i < preferSlotIndex; i++) { long pathHash = pathHashs.get(i); if (takeBalanceSlot(pathHash) != -1) { - lastPickPathHashs.put(medium, pathHash); + lastPickPathHashs.put(tag, medium, pathHash); return pathHash; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TwoDimensionalGreedyRebalanceAlgo.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TwoDimensionalGreedyRebalanceAlgo.java index 840c7e0a4b53a2..2d5977d0b9ebbb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TwoDimensionalGreedyRebalanceAlgo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TwoDimensionalGreedyRebalanceAlgo.java @@ -140,9 +140,7 @@ public List getNextMoves(ClusterBalanceInfo info, int maxMovesNum if (LOG.isDebugEnabled()) { LOG.debug(keySet); } - Preconditions.checkState(keySet.isEmpty() || keySet.last() == 0L, - "non-zero replica count on be while no partition skew information in skewMap"); - // Nothing to balance: cluster is empty. + return Lists.newArrayList(); } @@ -156,7 +154,6 @@ public List getNextMoves(ClusterBalanceInfo info, int maxMovesNum return Lists.newArrayList(); } - List moves = Lists.newArrayList(); for (int i = 0; i < maxMovesNum; ++i) { PartitionMove move = getNextMove(info.beByTotalReplicaCount, info.partitionInfoBySkew); @@ -178,12 +175,8 @@ private PartitionMove getNextMove(TreeMultimap beByTotalReplicaCount return null; } long maxPartitionSkew = skewMap.keySet().last(); - long maxBeSkew = beByTotalReplicaCount.keySet().last() - beByTotalReplicaCount.keySet().first(); - - // 1. Every partition is balanced(maxPartitionSkew<=1) and any move will unbalance a partition, so there - // is no potential for the greedy algorithm to balance the cluster. - // 2. Every partition is balanced(maxPartitionSkew<=1) and the cluster as a whole is balanced(maxBeSkew<=1). - if (maxPartitionSkew == 0L || (maxPartitionSkew <= 1L && maxBeSkew <= 1L)) { + // don't make a global balance because beByTotalReplicaCount may contains tablets for other medium or tag + if (maxPartitionSkew <= 1L) { return null; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java index 05abfacdce0c2d..f1c3a5c8b38680 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java @@ -167,7 +167,7 @@ public void setUp() { @Test public void test() { LoadStatisticForTag loadStatistic = new LoadStatisticForTag( - Tag.DEFAULT_BACKEND_TAG, systemInfoService, invertedIndex); + Tag.DEFAULT_BACKEND_TAG, systemInfoService, invertedIndex, null); loadStatistic.init(); List> infos = loadStatistic.getStatistic(TStorageMedium.HDD); Assert.assertEquals(3, infos.size()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java index 62ba34cf4e3d47..9ccd607b915496 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java @@ -138,7 +138,7 @@ long ignored() { private void generateStatisticsAndPathSlots() { LoadStatisticForTag loadStatistic = new LoadStatisticForTag(Tag.DEFAULT_BACKEND_TAG, systemInfoService, - invertedIndex); + invertedIndex, null); loadStatistic.init(); statisticMap = Maps.newHashMap(); statisticMap.put(Tag.DEFAULT_BACKEND_TAG, loadStatistic); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/PartitionRebalancerTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/PartitionRebalancerTest.java new file mode 100644 index 00000000000000..efc87a10076657 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/PartitionRebalancerTest.java @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.clone; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.utframe.TestWithFeService; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Set; +import java.util.stream.Collectors; + +public class PartitionRebalancerTest extends TestWithFeService { + + @Override + protected void beforeCreatingConnectContext() throws Exception { + Config.tablet_schedule_interval_ms = 100; + Config.tablet_checker_interval_ms = 100; + Config.tablet_rebalancer_type = "partition"; + Config.tablet_repair_delay_factor_second = 1; + Config.schedule_slot_num_per_hdd_path = 10000; + Config.schedule_slot_num_per_ssd_path = 10000; + Config.schedule_batch_size = 10000; + Config.max_scheduling_tablets = 10000; + Config.max_balancing_tablets = 10000; + Config.partition_rebalance_max_moves_num_per_selection = 5; + } + + @Override + protected int backendNum() { + return 3; + } + + @Test + public void testBalance() throws Exception { + createDatabase("test"); + createTable("CREATE TABLE test.tbl1 (k INT) DISTRIBUTED BY HASH(k) BUCKETS 32" + + " PROPERTIES ('replication_num' = '1')"); + + Thread.sleep(2000); + Assertions.assertEquals(Sets.newHashSet(11, 11, 10), getBackendTabletNums()); + + checkBEHeartbeat(Lists.newArrayList(createBackend("127.0.0.4", lastFeRpcPort))); + Thread.sleep(2000); + Assertions.assertEquals(Sets.newHashSet(8, 8, 8, 8), getBackendTabletNums()); + + checkBEHeartbeat(Lists.newArrayList(createBackend("127.0.0.5", lastFeRpcPort))); + Thread.sleep(2000); + Assertions.assertEquals(Sets.newHashSet(7, 7, 6, 6, 6), getBackendTabletNums()); + } + + private Set getBackendTabletNums() { + return Env.getCurrentSystemInfo().getAllBackendIds().stream() + .map(beId -> Env.getCurrentInvertedIndex().getTabletIdsByBackendId(beId).size()) + .collect(Collectors.toSet()); + } + +} + diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/PathSlotTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/PathSlotTest.java index 99d49ceb30cd6c..61e0e27f890023 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/PathSlotTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/PathSlotTest.java @@ -19,6 +19,7 @@ import org.apache.doris.clone.TabletScheduler.PathSlot; import org.apache.doris.common.Config; +import org.apache.doris.resource.Tag; import org.apache.doris.thrift.TStorageMedium; import com.google.common.collect.Lists; @@ -33,7 +34,7 @@ class PathSlotTest { @Test - public void test() { + public void test() throws Exception { Config.balance_slot_num_per_path = 2; Map paths = Maps.newHashMap(); List availPathHashs = Lists.newArrayList(); @@ -57,7 +58,8 @@ public void test() { PathSlot ps = new PathSlot(paths, 1L); for (int i = 0; i < expectPathHashs.size(); i++) { Collections.shuffle(availPathHashs); - gotPathHashs.add(ps.takeAnAvailBalanceSlotFrom(availPathHashs, medium)); + gotPathHashs.add(ps.takeAnAvailBalanceSlotFrom(availPathHashs, + Tag.create(Tag.TYPE_LOCATION, "zone1"), medium)); } Assert.assertEquals(expectPathHashs, gotPathHashs); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java index 52ccb90a12c778..fc3bbb28485c54 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java @@ -177,7 +177,7 @@ long ignored() { private void generateStatisticMap() { LoadStatisticForTag loadStatistic = new LoadStatisticForTag( - Tag.DEFAULT_BACKEND_TAG, systemInfoService, invertedIndex); + Tag.DEFAULT_BACKEND_TAG, systemInfoService, invertedIndex, null); loadStatistic.init(); statisticMap = Maps.newHashMap(); statisticMap.put(Tag.DEFAULT_BACKEND_TAG, loadStatistic); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TwoDimensionalGreedyRebalanceAlgoTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TwoDimensionalGreedyRebalanceAlgoTest.java index 090c15ae5d3104..c150e6dc4f3f16 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TwoDimensionalGreedyRebalanceAlgoTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TwoDimensionalGreedyRebalanceAlgoTest.java @@ -193,10 +193,10 @@ public void testInvalidClusterBalanceInfo() { beByTotalReplicaCount.put(0L, 10001L); beByTotalReplicaCount.put(1L, 10002L); } - }, 0); + }, -1); Assert.fail("Exception will be thrown in GetNextMoves"); } catch (Exception e) { - Assert.assertSame(e.getClass(), IllegalStateException.class); + Assert.assertSame(e.getClass(), IllegalArgumentException.class); LOG.info(e.getMessage()); } }