From 005de5492a814c0dc8ffc6c62aed62f5596a4ebf Mon Sep 17 00:00:00 2001 From: gengjun Date: Fri, 20 Nov 2020 18:29:01 +0800 Subject: [PATCH] fix --- .../doris/clone/ColocateTableBalancer.java | 501 ++++++------------ .../clone/ColocateTableBalancerTest.java | 256 +++++++-- 2 files changed, 396 insertions(+), 361 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java index 4dcb82a741c93a..389a2c610a53ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableBalancer.java @@ -26,7 +26,6 @@ import org.apache.doris.catalog.MaterializedIndex.IndexExtState; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.Tablet.TabletStatus; import org.apache.doris.clone.TabletSchedCtx.Priority; @@ -44,7 +43,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -74,241 +72,95 @@ public static ColocateTableBalancer getInstance() { @Override /* - * Each round, we do 3 steps: - * 1. Relocate group: + * Each round, we do 2 steps: + * 1. Relocate and balance group: * Backend is not available, find a new backend to replace it. - * Relocate at most one bucket in one group at a time. - * + * and after all unavailable has been replaced, balance the group + * * 2. Match group: * If replica mismatch backends in a group, that group will be marked as unstable, and pass that * tablet to TabletScheduler. * Otherwise, mark the group as stable - * - * 3. Balance group: - * Try balance group, and skip groups which contains unavailable backends. */ protected void runAfterCatalogReady() { - relocateGroup(); + relocateAndBalanceGroup(); matchGroup(); - balanceGroup(); } /* - * Traverse all colocate groups, for each group: - * Check if there are backends dead or unavailable(decommission) - * If yes, for each buckets in this group, if the unavailable backend belongs to this bucket, we will find - * a new backend to replace the unavailable one. - * - * eg: - * original bucket backends sequence is: - * [[1, 2, 3], [4, 1, 2], [3, 4, 1], [2, 3, 4], [1, 2, 3]] - * - * and backend 3 is dead, so we will find an available backend(eg. backend 4) to replace backend 3. - * [[1, 2, 4], [4, 1, 2], [3, 4, 1], [2, 3, 4], [1, 2, 3]] - * - * NOTICE that in this example, we only replace the #3 backend in first bucket. That is, we only replace - * one bucket in one group at each round. because we need to use newly-updated cluster load statistic to - * find next available backend. and cluster load statistic is updated every 20 seconds. + * relocate and balance group + * here we just let replicas in colocate table evenly distributed in cluster, not consider the + * cluster load statistic. + * for example: + * currently there are 4 backends A B C D with following load: + * + * +-+ + * | | + * +-+ +-+ +-+ | | + * | | | | | | | | + * +-+ +-+ +-+ +-+ + * A B C D + * + * And colocate group balancer will still evenly distribute the replicas to all 4 backends, not + * just 3 low load backends. + * + * X + * X + * X X X +-+ + * X X X | | + * +-+ +-+ +-+ | | + * | | | | | | | | + * +-+ +-+ +-+ +-+ + * A B C D + * + * So After colocate balance, the cluster may still 'unbalanced' from a global perspective. + * And the LoadBalancer will balance the non-colocate table's replicas to make the + * cluster balance, eventually. + * + * X X X X + * X X X X + * +-+ +-+ +-+ +-+ + * | | | | | | | | + * | | | | | | | | + * +-+ +-+ +-+ +-+ + * A B C D */ - private void relocateGroup() { - if (Config.disable_colocate_relocate) { + private void relocateAndBalanceGroup() { + if (Config.disable_colocate_balance) { return; } + Catalog catalog = Catalog.getCurrentCatalog(); ColocateTableIndex colocateIndex = catalog.getColocateTableIndex(); SystemInfoService infoService = Catalog.getCurrentSystemInfo(); - Map statisticMap = Catalog.getCurrentCatalog().getTabletScheduler().getStatisticMap(); - long currTime = System.currentTimeMillis(); - + // get all groups Set groupIds = colocateIndex.getAllGroupIds(); for (GroupId groupId : groupIds) { - // get all backends in this group - Set backends = colocateIndex.getBackendsByGroup(groupId); - long unavailableBeId = -1; - for (Long backendId : backends) { - // find an unavailable backend. even if there are than one unavailable backend, - // we just handle the first one. - Backend be = infoService.getBackend(backendId); - if (be == null) { - unavailableBeId = backendId; - break; - } else if (!be.isAvailable()) { - // 1. BE is dead for a long time - // 2. BE is under decommission - if ((!be.isAlive() && (currTime - be.getLastUpdateMs()) > Config.tablet_repair_delay_factor_second * 1000 * 2) - || be.isDecommissioned()) { - unavailableBeId = backendId; - break; - } - } - } - - if (unavailableBeId == -1) { - // all backends in this group are available. - // But in previous version we had a bug that replicas of a tablet may be located on same host. - // we have to check it. - List> backendsPerBucketsSeq = colocateIndex.getBackendsPerBucketSeq(groupId); - OUT: for (List backendIds : backendsPerBucketsSeq) { - Set hosts = Sets.newHashSet(); - for (Long beId : backendIds) { - Backend be = infoService.getBackend(beId); - if (be == null) { - // backend can be dropped any time, just skip this bucket - break; - } - if (!hosts.add(be.getHost())) { - // find replicas on same host. simply mark this backend as unavailable, - // so that following step will find another backend - unavailableBeId = beId; - break OUT; - } - } - } - - if (unavailableBeId == -1) { - // if everything is ok, continue - continue; - } - } - - // find the first bucket which contains the unavailable backend - LOG.info("backend {} is unavailable in colocate group {}", unavailableBeId, groupId); - List> bucketBackendsSeq = colocateIndex.getBackendsPerBucketSeqSet(groupId); - int tabletOrderIdx = 0; - for (Set set : bucketBackendsSeq) { - if (set.contains(unavailableBeId)) { - break; - } - tabletOrderIdx++; - } - - // select a new backend to replace the unavailable one - long newBackendId = selectSubstituteBackend(tabletOrderIdx, groupId, unavailableBeId, - bucketBackendsSeq.get(tabletOrderIdx), statisticMap); - if (newBackendId != -1) { - // replace backend - bucketBackendsSeq.get(tabletOrderIdx).remove(unavailableBeId); - bucketBackendsSeq.get(tabletOrderIdx).add(newBackendId); - colocateIndex.setBackendsSetByIdxForGroup(groupId, tabletOrderIdx, bucketBackendsSeq.get(tabletOrderIdx)); - LOG.info("select backend {} to replace backend {} for bucket {} in group {}. now backends set is: {}", - newBackendId, unavailableBeId, tabletOrderIdx, groupId, bucketBackendsSeq.get(tabletOrderIdx)); + Database db = catalog.getDb(groupId.dbId); + if (db == null) { + continue; } - // only handle one backend at a time - break; - } - } - - /* - * Select a substitute backend for specified bucket and colocate group. - * return -1 if backend not found. - * we need to move all replicas of this bucket to the new backend, so we have to check if the new - * backend can save all these replicas. - */ - private long selectSubstituteBackend(int tabletOrderIdx, GroupId groupId, long unavailableBeId, - Set excludeBeIds, Map statisticMap) { - ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex(); - Database db = Catalog.getCurrentCatalog().getDb(groupId.dbId); - if (db == null) { - LOG.info("db {} does not exist", groupId.dbId); - return -1; - } - ClusterLoadStatistic statistic = statisticMap.get(db.getClusterName()); - if (statistic == null) { - LOG.info("cluster {} statistic does not exist", db.getClusterName()); - return -1; - } - - // calculate the total replica size of this bucket - List tableIds = colocateIndex.getAllTableIds(groupId); - long totalReplicaNum = 0; - long totalReplicaSize = 0; - db.readLock(); - try { - for (Long tblId : tableIds) { - OlapTable tbl = (OlapTable) db.getTable(tblId); - if (tbl == null) { - continue; - } - - for (Partition partition : tbl.getPartitions()) { - for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) { - long tabletId = index.getTabletIdsInOrder().get(tabletOrderIdx); - Tablet tablet = index.getTablet(tabletId); - Replica replica = tablet.getReplicaByBackendId(unavailableBeId); - if (replica != null) { - totalReplicaNum++; - totalReplicaSize += replica.getDataSize(); - } - } - } + Map statisticMap = catalog.getTabletScheduler().getStatisticMap(); + if (statisticMap == null) { + continue; } - } finally { - db.readUnlock(); - } - LOG.debug("the number and size of replicas on backend {} of bucket {} is: {} and {}", - unavailableBeId, tabletOrderIdx, totalReplicaNum, totalReplicaSize); - - /* - * There is an unsolved problem of finding a new backend for data migration: - * Different table(partition) in this group may in different storage medium(SSD or HDD). If one backend - * is down, the best solution is to find a backend which has both SSD and HDD, and replicas can be - * relocated in corresponding storage medium. - * But in fact, backends can be heterogeneous, which may only has SSD or HDD. If we choose to strictly - * find backends with expecting storage medium, this may lead to a consequence that most of replicas - * are gathered in a small portion of backends. - * - * So for simplicity, we ignore the storage medium property, just find a low load backend which has - * capacity to save these replicas. - */ - List beStats = statistic.getSortedBeLoadStats(null /* mix medium */); - if (beStats.isEmpty()) { - LOG.warn("failed to relocate backend for colocate group: {}, no backends found", groupId); - return -1; - } - - // the selected backend should not be on same host of other backends of this bucket. - // here we generate a host set for further checking. - SystemInfoService infoService = Catalog.getCurrentSystemInfo(); - Set excludeHosts = Sets.newHashSet(); - for (Long excludeBeId : excludeBeIds) { - Backend be = infoService.getBackend(excludeBeId); - if (be == null) { - LOG.info("Backend {} has been dropped when finding backend for colocate group {}", excludeBeId, groupId); + ClusterLoadStatistic statistic = statisticMap.get(db.getClusterName()); + if (statistic == null) { continue; } - excludeHosts.add(be.getHost()); - } - Preconditions.checkState(excludeBeIds.size() >= excludeHosts.size()); - // beStats is ordered by load score, ascend. so finding the available from first to last - BackendLoadStatistic chosenBe = null; - for (BackendLoadStatistic beStat : beStats) { - if (beStat.isAvailable() && beStat.getBeId() != unavailableBeId && !excludeBeIds.contains(beStat.getBeId())) { - Backend be = infoService.getBackend(beStat.getBeId()); - if (be == null) { - continue; - } - if (excludeHosts.contains(be.getHost())) { - continue; - } - chosenBe = beStat; - break; + Set unavailableBeIds = getUnavailableBeIdsInGroup(infoService, colocateIndex, groupId); + List availableBeIds = getAvailableBeIdsInGroup(db.getClusterName(), infoService, unavailableBeIds); + List> balancedBackendsPerBucketSeq = Lists.newArrayList(); + if (relocateAndBalance(groupId, unavailableBeIds, availableBeIds, colocateIndex, infoService, statistic, balancedBackendsPerBucketSeq)) { + colocateIndex.addBackendsPerBucketSeq(groupId, balancedBackendsPerBucketSeq); + ColocatePersistInfo info = ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, balancedBackendsPerBucketSeq); + catalog.getEditLog().logColocateBackendsPerBucketSeq(info); + LOG.info("balance group {}. now backends per bucket sequence is: {}", groupId, balancedBackendsPerBucketSeq); } } - if (chosenBe == null) { - LOG.warn("failed to find an available backend to relocate for colocate group: {}", groupId); - return -1; - } - - // check if there is enough capacity to save all these replicas - if (!chosenBe.canFitInColocate(totalReplicaSize)) { - LOG.warn("no backend has enough capacity to save replicas in group {} with bucket: {}", groupId, tabletOrderIdx); - return -1; - } - - return chosenBe.getBeId(); } /* @@ -395,93 +247,6 @@ private void matchGroup() { } // end for groups } - /* - * Balance colocate groups which are unstable - * here we just let replicas in colocate table evenly distributed in cluster, not consider the - * cluster load statistic. - * for example: - * currently there are 4 backends A B C D with following load: - * - * +-+ - * | | - * +-+ +-+ +-+ | | - * | | | | | | | | - * +-+ +-+ +-+ +-+ - * A B C D - * - * And colocate group balancer will still evenly distribute the replicas to all 4 backends, not - * just 3 low load backends. - * - * X - * X - * X X X +-+ - * X X X | | - * +-+ +-+ +-+ | | - * | | | | | | | | - * +-+ +-+ +-+ +-+ - * A B C D - * - * So After colocate balance, the cluster may still 'unbalanced' from a global perspective. - * And the LoadBalancer will balance the non-colocate table's replicas to make the - * cluster balance, eventually. - * - * X X X X - * X X X X - * +-+ +-+ +-+ +-+ - * | | | | | | | | - * | | | | | | | | - * +-+ +-+ +-+ +-+ - * A B C D - */ - private void balanceGroup() { - if (Config.disable_colocate_balance) { - return; - } - Catalog catalog = Catalog.getCurrentCatalog(); - SystemInfoService infoService = Catalog.getCurrentSystemInfo(); - ColocateTableIndex colocateIndex = catalog.getColocateTableIndex(); - - Set groupIds = colocateIndex.getAllGroupIds(); - for (GroupId groupId : groupIds) { - // skip unstable groups - if (colocateIndex.isGroupUnstable(groupId)) { - continue; - } - - // skip backend unavailable groups - Set backendIds = colocateIndex.getBackendsByGroup(groupId); - boolean isAllBackendsAvailable = true; - for (Long beId : backendIds) { - Backend be = infoService.getBackend(beId); - if (be == null || !be.isAvailable()) { - isAllBackendsAvailable = false; - break; - } - } - if (!isAllBackendsAvailable) { - continue; - } - - // all backends are good - Database db = catalog.getDb(groupId.dbId); - if (db == null) { - continue; - } - - List allBackendIds = infoService.getClusterBackendIds(db.getClusterName(), true); - List allBackendIdsAvailable = allBackendIds.stream() - .filter(infoService::checkBackendAvailable) - .collect(Collectors.toList()); - List> balancedBackendsPerBucketSeq = Lists.newArrayList(); - if (balance(groupId, allBackendIdsAvailable, colocateIndex, infoService, balancedBackendsPerBucketSeq)) { - colocateIndex.addBackendsPerBucketSeq(groupId, balancedBackendsPerBucketSeq); - ColocatePersistInfo info = ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, balancedBackendsPerBucketSeq); - Catalog.getCurrentCatalog().getEditLog().logColocateBackendsPerBucketSeq(info); - LOG.info("balance group {}. now backends per bucket sequence is: {}", groupId, balancedBackendsPerBucketSeq); - } - } - } - /* * The balance logic is as follow: * @@ -508,7 +273,7 @@ private void balanceGroup() { * Algorithm: * 0. Generate the flat list of backends per bucket sequence: * A B C A D E A F G A H I - * 1. Sort the backend in this order by replication num, descending: + * 1. Sort backends order by replication num and load score for same replication num backends, descending: * A B C D E F G H I J * 2. Check the diff of the first backend(A)'s replica num and last backend(J)'s replica num. * If diff is less or equal than 1, we consider this group as balance. Jump to step 5. @@ -520,12 +285,16 @@ private void balanceGroup() { * Partition this flat list by replication num: * [J B C] [J D E] [A F G] [A H I] * And this is our new balanced backends per bucket sequence. - * + * + * relocate is similar to balance, but choosing unavailable be as src, and move all bucketIds on unavailable be to + * low be + * * Return true if backends per bucket sequence change and new sequence is saved in balancedBackendsPerBucketSeq. * Return false if nothing changed. */ - private boolean balance(GroupId groupId, List allAvailBackendIds, ColocateTableIndex colocateIndex, - SystemInfoService infoService, List> balancedBackendsPerBucketSeq) { + private boolean relocateAndBalance(GroupId groupId, Set unavailableBeIds, List availableBeIds, + ColocateTableIndex colocateIndex, SystemInfoService infoService, + ClusterLoadStatistic statistic, List> balancedBackendsPerBucketSeq) { ColocateGroupSchema groupSchema = colocateIndex.getGroupSchema(groupId); int replicationNum = groupSchema.getReplicationNum(); List> backendsPerBucketSeq = Lists.newArrayList(colocateIndex.getBackendsPerBucketSeq(groupId)); @@ -543,29 +312,49 @@ private boolean balance(GroupId groupId, List allAvailBackendIds, Colocate } Preconditions.checkState(backendsPerBucketSeq.size() == hostsPerBucketSeq.size()); - // sort backends with replica num + long srcBeId = -1; + List seqIndexes = null; + boolean hasUnavailableBe = false; + // first choose the unavailable be as src be + for (Long beId : unavailableBeIds) { + seqIndexes = getBeSeqIndexes(flatBackendsPerBucketSeq, beId); + if (seqIndexes.size() > 0) { + srcBeId = beId; + hasUnavailableBe = true; + break; + } + } + // sort backends with replica num in desc order List> backendWithReplicaNum = - getSortedBackendReplicaNumPairs(allAvailBackendIds, flatBackendsPerBucketSeq); - // if there is only one available backend, end the outer loop - if (backendWithReplicaNum.size() == 1) { - LOG.info("there is only one available backend, end the outer loop in colocate group {}", groupId); - break; + getSortedBackendReplicaNumPairs(availableBeIds, unavailableBeIds, statistic, flatBackendsPerBucketSeq); + if (seqIndexes == null || seqIndexes.size() <= 0) { + // if there is only one available backend and no unavailable bucketId to relocate, end the outer loop + if (backendWithReplicaNum.size() <= 1) { + break; + } + + // choose max bucketId num be as src be + srcBeId = backendWithReplicaNum.get(0).getKey(); + seqIndexes = getBeSeqIndexes(flatBackendsPerBucketSeq, srcBeId); } - int i = 0; + int i; + if (hasUnavailableBe) { + i = -1; + } else { + i = 0; + } int j = backendWithReplicaNum.size() - 1; while (i < j) { boolean isThisRoundChanged = false; - // we try to use a low backend to replace the high backend. + // we try to use a low backend to replace the src backend. // if replace failed(eg: both backends are on some host), select next low backend and try(j--) - Map.Entry highBackend = backendWithReplicaNum.get(i); Map.Entry lowBackend = backendWithReplicaNum.get(j); - if (highBackend.getValue() - lowBackend.getValue() <= 1) { + if ((!hasUnavailableBe) && (seqIndexes.size() - lowBackend.getValue()) <= 1) { // balanced break OUT; } - long srcBeId = highBackend.getKey(); long destBeId = lowBackend.getKey(); Backend destBe = infoService.getBackend(destBeId); if (destBe == null) { @@ -573,18 +362,6 @@ private boolean balance(GroupId groupId, List allAvailBackendIds, Colocate return false; } - /* - * get the array indexes of elements in flatBackendsPerBucketSeq which equals to srcBeId - * eg: - * flatBackendsPerBucketSeq: - * A B C A D E A F G A H I - * and srcBeId is A. - * so seqIndexes is: - * 0 3 6 9 - */ - List seqIndexes = IntStream.range(0, flatBackendsPerBucketSeq.size()).boxed().filter( - idx -> flatBackendsPerBucketSeq.get(idx).equals(srcBeId)).collect(Collectors.toList()); - for (int seqIndex : seqIndexes) { // the bucket index. // eg: 0 / 3 = 0, so that the bucket index of the 4th backend id in flatBackendsPerBucketSeq is 0. @@ -614,7 +391,7 @@ private boolean balance(GroupId groupId, List allAvailBackendIds, Colocate "end outer loop in colocate group {}", groupId); break OUT; } else { - // select another load backend and try again + // select another low backend and try again continue; } } @@ -649,19 +426,85 @@ private List> getHostsPerBucketSeq(List> backendsPerBuck return hostsPerBucketSeq; } - private List> getSortedBackendReplicaNumPairs(List allAvailBackendIds, - List flatBackendsPerBucketSeq) { + private List> getSortedBackendReplicaNumPairs(List allAvailBackendIds, Set unavailBackendIds, + ClusterLoadStatistic statistic, List flatBackendsPerBucketSeq) { // backend id -> replica num, and sorted by replica num, descending. Map backendToReplicaNum = flatBackendsPerBucketSeq.stream() .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); + // remove unavailable backend + for (Long backendId : unavailBackendIds) { + backendToReplicaNum.remove(backendId); + } // add backends which are not in flatBackendsPerBucketSeq, with replication number 0 for (Long backendId : allAvailBackendIds) { if (!backendToReplicaNum.containsKey(backendId)) { backendToReplicaNum.put(backendId, 0L); } } - List> backendWithReplicaNum = backendToReplicaNum.entrySet().stream().sorted( - Collections.reverseOrder(Map.Entry.comparingByValue())).collect(Collectors.toList()); - return backendWithReplicaNum; + + return backendToReplicaNum + .entrySet() + .stream() + .sorted((entry1, entry2) -> { + if (!entry1.getValue().equals(entry2.getValue())) { + return (int)(entry2.getValue() - entry1.getValue()); + } + BackendLoadStatistic beStat1 = statistic.getBackendLoadStatistic(entry1.getKey()); + BackendLoadStatistic beStat2 = statistic.getBackendLoadStatistic(entry2.getKey()); + if (beStat1 == null || beStat2 == null) { + return 0; + } + double loadScore1 = beStat1.getMixLoadScore(); + double loadScore2 = beStat2.getMixLoadScore(); + if (Math.abs(loadScore1 - loadScore2) < 1e-6) { + return 0; + } else if (loadScore2 > loadScore1) { + return 1; + } else { + return -1; + } + }) + .collect(Collectors.toList()); + } + + /* + * get the array indexes of elements in flatBackendsPerBucketSeq which equals to beId + * eg: + * flatBackendsPerBucketSeq: + * A B C A D E A F G A H I + * and srcBeId is A. + * so seqIndexes is: + * 0 3 6 9 + */ + private List getBeSeqIndexes(List flatBackendsPerBucketSeq, long beId) { + return IntStream.range(0, flatBackendsPerBucketSeq.size()).boxed().filter( + idx -> flatBackendsPerBucketSeq.get(idx).equals(beId)).collect(Collectors.toList()); + } + + private Set getUnavailableBeIdsInGroup(SystemInfoService infoService, ColocateTableIndex colocateIndex, GroupId groupId) { + Set backends = colocateIndex.getBackendsByGroup(groupId); + Set unavailableBeIds = Sets.newHashSet(); + long currTime = System.currentTimeMillis(); + for (Long backendId : backends) { + Backend be = infoService.getBackend(backendId); + if (be == null) { + unavailableBeIds.add(backendId); + } else if (!be.isAvailable()) { + // 1. BE is dead for a long time + // 2. BE is under decommission + if ((!be.isAlive() && (currTime - be.getLastUpdateMs()) > Config.tablet_repair_delay_factor_second * 1000 * 2) + || be.isDecommissioned()) { + unavailableBeIds.add(backendId); + } + } + } + return unavailableBeIds; + } + + private List getAvailableBeIdsInGroup(String cluster, SystemInfoService infoService, Set unavailableBeIds) { + List allBackendIds = infoService.getClusterBackendIds(cluster, true); + return allBackendIds.stream() + .filter(id -> !unavailableBeIds.contains(id)) + .collect(Collectors.toList()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableBalancerTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableBalancerTest.java index bb7612616a9836..ee59267ae97e86 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableBalancerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableBalancerTest.java @@ -17,12 +17,15 @@ package org.apache.doris.clone; -import org.apache.doris.catalog.Catalog; +import com.google.common.collect.Sets; +import mockit.Delegate; import org.apache.doris.catalog.ColocateGroupSchema; import org.apache.doris.catalog.ColocateTableIndex; import org.apache.doris.catalog.ColocateTableIndex.GroupId; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.TabletInvertedIndex; +import org.apache.doris.common.Config; import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; @@ -34,23 +37,17 @@ import org.junit.Before; import org.junit.Test; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import mockit.Expectations; import mockit.Mocked; public class ColocateTableBalancerTest { - - @Mocked - private Catalog catalog; - @Mocked - private SystemInfoService infoService; - - private TabletScheduler tabletScheduler; - private ColocateTableBalancer balancer = ColocateTableBalancer.getInstance(); - + private Backend backend1; private Backend backend2; private Backend backend3; @@ -61,6 +58,8 @@ public class ColocateTableBalancerTest { private Backend backend8; private Backend backend9; + private Map mixLoadScores; + @Before public void setUp() { backend1 = new Backend(1L, "192.168.1.1", 9050); @@ -74,6 +73,29 @@ public void setUp() { backend8 = new Backend(8L, "192.168.1.8", 9050); backend9 = new Backend(9L, "192.168.1.8", 9050); + mixLoadScores = Maps.newHashMap(); + mixLoadScores.put(1L, 0.1); + mixLoadScores.put(2L, 0.5); + mixLoadScores.put(3L, 0.4); + mixLoadScores.put(4L, 0.2); + mixLoadScores.put(5L, 0.3); + mixLoadScores.put(6L, 0.6); + mixLoadScores.put(7L, 0.8); + mixLoadScores.put(8L, 0.7); + mixLoadScores.put(9L, 0.9); + } + + private ColocateTableIndex createColocateIndex(GroupId groupId, List flatList) { + ColocateTableIndex colocateTableIndex = new ColocateTableIndex(); + int replicationNum = 3; + List> backendsPerBucketSeq = Lists.partition(flatList, replicationNum); + colocateTableIndex.addBackendsPerBucketSeq(groupId, backendsPerBucketSeq); + return colocateTableIndex; + } + + @Test + public void testBalance(@Mocked SystemInfoService infoService, + @Mocked ClusterLoadStatistic statistic) { new Expectations() { { infoService.getBackend(1L); @@ -103,20 +125,12 @@ public void setUp() { infoService.getBackend(9L); result = backend9; minTimes = 0; + + statistic.getBackendLoadStatistic(anyLong); + result = null; + minTimes = 0; } }; - } - - private ColocateTableIndex createColocateIndex(GroupId groupId, List flatList) { - ColocateTableIndex colocateTableIndex = new ColocateTableIndex(); - int replicationNum = 3; - List> backendsPerBucketSeq = Lists.partition(flatList, replicationNum); - colocateTableIndex.addBackendsPerBucketSeq(groupId, backendsPerBucketSeq); - return colocateTableIndex; - } - - @Test - public void testBalance() { GroupId groupId = new GroupId(10000, 10001); List distributionCols = Lists.newArrayList(); distributionCols.add(new Column("k1", PrimitiveType.INT)); @@ -132,9 +146,8 @@ public void testBalance() { List> balancedBackendsPerBucketSeq = Lists.newArrayList(); List allAvailBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L); - boolean changed = (Boolean) Deencapsulation.invoke(balancer, "balance", groupId, allAvailBackendIds, - colocateTableIndex, infoService, balancedBackendsPerBucketSeq); - System.out.println(balancedBackendsPerBucketSeq); + boolean changed = (Boolean) Deencapsulation.invoke(balancer, "relocateAndBalance", groupId, new HashSet(), allAvailBackendIds, + colocateTableIndex, infoService, statistic, balancedBackendsPerBucketSeq); List> expected = Lists.partition( Lists.newArrayList(9L, 5L, 3L, 4L, 6L, 8L, 7L, 6L, 1L, 2L, 9L, 4L, 1L, 2L, 3L), 3); Assert.assertTrue(changed); @@ -145,15 +158,50 @@ public void testBalance() { Lists.newArrayList(9L, 8L, 7L, 8L, 6L, 5L, 9L, 4L, 1L, 2L, 3L, 4L, 1L, 2L, 3L)); Deencapsulation.setField(colocateTableIndex, "group2Schema", group2Schema); balancedBackendsPerBucketSeq.clear(); - changed = (Boolean) Deencapsulation.invoke(balancer, "balance", groupId, allAvailBackendIds, - colocateTableIndex, infoService, balancedBackendsPerBucketSeq); + changed = (Boolean) Deencapsulation.invoke(balancer, "relocateAndBalance", groupId, new HashSet(), allAvailBackendIds, + colocateTableIndex, infoService, statistic, balancedBackendsPerBucketSeq); System.out.println(balancedBackendsPerBucketSeq); Assert.assertFalse(changed); Assert.assertTrue(balancedBackendsPerBucketSeq.isEmpty()); } @Test - public void testFixBalanceEndlessLoop() { + public void testFixBalanceEndlessLoop(@Mocked SystemInfoService infoService, + @Mocked ClusterLoadStatistic statistic) { + new Expectations() { + { + infoService.getBackend(1L); + result = backend1; + minTimes = 0; + infoService.getBackend(2L); + result = backend2; + minTimes = 0; + infoService.getBackend(3L); + result = backend3; + minTimes = 0; + infoService.getBackend(4L); + result = backend4; + minTimes = 0; + infoService.getBackend(5L); + result = backend5; + minTimes = 0; + infoService.getBackend(6L); + result = backend6; + minTimes = 0; + infoService.getBackend(7L); + result = backend7; + minTimes = 0; + infoService.getBackend(8L); + result = backend8; + minTimes = 0; + infoService.getBackend(9L); + result = backend9; + minTimes = 0; + statistic.getBackendLoadStatistic(anyLong); + result = null; + minTimes = 0; + } + }; GroupId groupId = new GroupId(10000, 10001); List distributionCols = Lists.newArrayList(); distributionCols.add(new Column("k1", PrimitiveType.INT)); @@ -168,8 +216,8 @@ public void testFixBalanceEndlessLoop() { List> balancedBackendsPerBucketSeq = Lists.newArrayList(); List allAvailBackendIds = Lists.newArrayList(7L); - boolean changed = Deencapsulation.invoke(balancer, "balance", groupId, allAvailBackendIds, - colocateTableIndex, infoService, balancedBackendsPerBucketSeq); + boolean changed = Deencapsulation.invoke(balancer, "relocateAndBalance", groupId, new HashSet(), allAvailBackendIds, + colocateTableIndex, infoService, statistic, balancedBackendsPerBucketSeq); Assert.assertFalse(changed); // 2. all backends are checked but this round is not changed @@ -180,8 +228,152 @@ public void testFixBalanceEndlessLoop() { balancedBackendsPerBucketSeq = Lists.newArrayList(); allAvailBackendIds = Lists.newArrayList(7L, 8L, 9L); - changed = Deencapsulation.invoke(balancer, "balance", groupId, allAvailBackendIds, - colocateTableIndex, infoService, balancedBackendsPerBucketSeq); + changed = Deencapsulation.invoke(balancer, "relocateAndBalance", groupId, new HashSet(), allAvailBackendIds, + colocateTableIndex, infoService, statistic, balancedBackendsPerBucketSeq); Assert.assertFalse(changed); } + + @Test + public void testGetSortedBackendReplicaNumPairs(@Mocked ClusterLoadStatistic statistic) { + new Expectations() { + { + statistic.getBackendLoadStatistic(anyLong); + result = new Delegate() { + BackendLoadStatistic delegate(Long beId) { + return new FakeBackendLoadStatistic(beId, null, null, null); + } + }; + minTimes = 0; + } + }; + + // all buckets are on different be + List allAvailBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L); + Set unavailBackendIds = Sets.newHashSet(9L); + List flatBackendsPerBucketSeq = Lists.newArrayList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L); + List> backends = Deencapsulation.invoke(balancer, "getSortedBackendReplicaNumPairs", + allAvailBackendIds, unavailBackendIds, statistic, flatBackendsPerBucketSeq); + long[] backendIds = backends.stream().mapToLong(Map.Entry::getKey).toArray(); + Assert.assertArrayEquals(new long[]{7L, 8L, 6L, 2L, 3L, 5L, 4L, 1L}, backendIds); + + // 0,1 bucket on same be and 5, 6 on same be + flatBackendsPerBucketSeq = Lists.newArrayList(1L, 1L, 3L, 4L, 5L, 6L, 7L, 7L, 9L); + backends = Deencapsulation.invoke(balancer, "getSortedBackendReplicaNumPairs", allAvailBackendIds, unavailBackendIds, + statistic, flatBackendsPerBucketSeq); + backendIds = backends.stream().mapToLong(Map.Entry::getKey).toArray(); + Assert.assertArrayEquals(new long[]{7L, 1L, 6L, 3L, 5L, 4L, 8L, 2L}, backendIds); + } + + public final class FakeBackendLoadStatistic extends BackendLoadStatistic { + public FakeBackendLoadStatistic(long beId, String clusterName, SystemInfoService infoService, + TabletInvertedIndex invertedIndex) { + super(beId, clusterName, infoService, invertedIndex); + } + + @Override + public double getMixLoadScore() { + return mixLoadScores.get(getBeId()); + } + } + + @Test + public void testGetBeSeqIndexes() { + List flatBackendsPerBucketSeq = Lists.newArrayList(1L, 2L, 2L, 3L, 4L, 2L); + List indexes = Deencapsulation.invoke(balancer, "getBeSeqIndexes", flatBackendsPerBucketSeq, 2L); + Assert.assertArrayEquals(new int[]{1, 2, 5}, indexes.stream().mapToInt(i->i).toArray()); + System.out.println("backend1 id is " + backend1.getId()); + } + + @Test + public void testGetUnavailableBeIdsInGroup(@Mocked ColocateTableIndex colocateTableIndex, + @Mocked SystemInfoService infoService, + @Mocked Backend myBackend2, + @Mocked Backend myBackend3, + @Mocked Backend myBackend4, + @Mocked Backend myBackend5 + ) { + GroupId groupId = new GroupId(10000, 10001); + Set allBackendsInGroup = Sets.newHashSet(1L, 2L, 3L, 4L, 5L); + new Expectations() { + { + infoService.getBackend(1L); + result = null; + minTimes = 0; + + // backend2 is available + infoService.getBackend(2L); + result = myBackend2; + minTimes = 0; + myBackend2.isAvailable(); + result = true; + minTimes = 0; + + // backend3 not available, and dead for a long time + infoService.getBackend(3L); + result = myBackend3; + minTimes = 0; + myBackend3.isAvailable(); + result = false; + minTimes = 0; + myBackend3.isAlive(); + result = false; + minTimes = 0; + myBackend3.getLastUpdateMs(); + result = System.currentTimeMillis() - Config.tablet_repair_delay_factor_second * 1000 * 20; + minTimes = 0; + + // backend4 not available, and dead for a short time + infoService.getBackend(4L); + result = myBackend4; + minTimes = 0; + myBackend4.isAvailable(); + result = false; + minTimes = 0; + myBackend4.isAlive(); + result = false; + minTimes = 0; + myBackend4.getLastUpdateMs(); + result = System.currentTimeMillis(); + minTimes = 0; + + // backend5 not available, and in decommission + infoService.getBackend(5L); + result = myBackend5; + minTimes = 0; + myBackend5.isAvailable(); + result = false; + minTimes = 0; + myBackend5.isAlive(); + result = true; + minTimes = 0; + myBackend5.isDecommissioned(); + result = true; + minTimes = 0; + + colocateTableIndex.getBackendsByGroup(groupId); + result = allBackendsInGroup; + minTimes = 0; + } + }; + + Set unavailableBeIds = Deencapsulation.invoke(balancer, "getUnavailableBeIdsInGroup", infoService, colocateTableIndex, groupId); + System.out.println(unavailableBeIds); + Assert.assertArrayEquals(new long[]{1L, 3L, 5L}, unavailableBeIds.stream().mapToLong(i->i).sorted().toArray()); + } + + @Test + public void testGetAvailableBeIdsInGroup(@Mocked SystemInfoService infoService) { + List clusterAliveBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L); + new Expectations(){ + { + infoService.getClusterBackendIds("cluster1", true); + result = clusterAliveBackendIds; + minTimes = 0; + } + }; + + Set unavailableBeIds = Sets.newHashSet(4L, 5L, 6L); + List availableBeIds = Deencapsulation.invoke(balancer, "getAvailableBeIdsInGroup","cluster1", infoService, unavailableBeIds); + Assert.assertArrayEquals(new long[]{1L, 2L, 3L}, availableBeIds.stream().mapToLong(i->i).sorted().toArray()); + } }