Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.catalog;

import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.clone.PartitionRebalancer.TabletMove;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
Expand Down Expand Up @@ -806,7 +807,7 @@ public Map<Long, Long> getReplicaToTabletMap() {

// Only build from available bes, exclude colocate tables
public Map<TStorageMedium, TreeMultimap<Long, PartitionBalanceInfo>> buildPartitionInfoBySkew(
List<Long> availableBeIds) {
List<Long> availableBeIds, Map<Long, Pair<TabletMove, Long>> movesInProgress) {
Set<Long> dbIds = Sets.newHashSet();
Set<Long> tableIds = Sets.newHashSet();
Set<Long> partitionIds = Sets.newHashSet();
Expand All @@ -830,6 +831,26 @@ public Map<TStorageMedium, TreeMultimap<Long, PartitionBalanceInfo>> buildPartit
for (Table.Cell<Long, Long, Replica> cell : cells) {
Long tabletId = cell.getRowKey();
Long beId = cell.getColumnKey();
Pair<TabletMove, Long> movePair = movesInProgress.get(tabletId);
TabletMove move = movePair != null ? movePair.first : null;
// there exists move from fromBe to toBe
if (move != null && beId == move.fromBe
&& availableBeIds.contains(move.toBe)) {

// if movePair.second == -1, it means toBe hadn't added this tablet but it will add later;
// otherwise it means toBe had added this tablet
boolean toBeHadReplica = movePair.second != -1L;
if (toBeHadReplica) {
// toBe had add this tablet, fromBe just ignore this tablet
continue;
}

// later fromBe will delete this replica
// and toBe will add a replica
// so this replica should belong to toBe
beId = move.toBe;
}

try {
Preconditions.checkState(availableBeIds.contains(beId), "dead be " + beId);
TabletMeta tabletMeta = tabletMetaMap.get(tabletId);
Expand Down Expand Up @@ -911,6 +932,11 @@ public PartitionBalanceInfo(PartitionBalanceInfo info) {
this.indexId = info.indexId;
this.beByReplicaCount = TreeMultimap.create(info.beByReplicaCount);
}

@Override
public String toString() {
return "[partition=" + partitionId + ", index=" + indexId + ", replicaNum2BeId=" + beByReplicaCount + "]";
}
}

// just for ut
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.clone.BackendLoadStatistic.Classification;
import org.apache.doris.clone.BackendLoadStatistic.LoadScore;
import org.apache.doris.clone.PartitionRebalancer.TabletMove;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugPointUtil;
Expand Down Expand Up @@ -50,6 +51,7 @@ public class LoadStatisticForTag {

private final SystemInfoService infoService;
private final TabletInvertedIndex invertedIndex;
private final Rebalancer rebalancer;

private final Tag tag;

Expand All @@ -68,10 +70,11 @@ public class LoadStatisticForTag {
= Maps.newHashMap();

public LoadStatisticForTag(Tag tag, SystemInfoService infoService,
TabletInvertedIndex invertedIndex) {
TabletInvertedIndex invertedIndex, Rebalancer rebalancer) {
this.tag = tag;
this.infoService = infoService;
this.invertedIndex = invertedIndex;
this.rebalancer = rebalancer;
}

public Tag getTag() {
Expand Down Expand Up @@ -166,10 +169,13 @@ public void init() {
// Multimap<skew -> PartitionBalanceInfo>
// PartitionBalanceInfo: <pid -> <partitionReplicaCount, beId>>
// Only count available bes here, aligned with the beByTotalReplicaCountMaps.
skewMaps = invertedIndex.buildPartitionInfoBySkew(beLoadStatistics.stream()
List<Long> availableBeIds = beLoadStatistics.stream()
.filter(BackendLoadStatistic::isAvailable)
.map(BackendLoadStatistic::getBeId)
.collect(Collectors.toList()));
.collect(Collectors.toList());
Map<Long, Pair<TabletMove, Long>> movesInProgress = rebalancer == null ? Maps.newHashMap()
: ((PartitionRebalancer) rebalancer).getMovesInProgress();
skewMaps = invertedIndex.buildPartitionInfoBySkew(availableBeIds, movesInProgress);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ public void updateMapping(Map<Tag, LoadStatisticForTag> statisticMap, long expir
}
}

public Map<Tag, Map<TStorageMedium, MovesCache>> getCacheMap() {
return cacheMap;
}

public MovesCache getCache(Tag tag, TStorageMedium medium) {
Map<TStorageMedium, MovesCache> mediumMoves = cacheMap.get(tag);
if (mediumMoves != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultimap;
Expand Down Expand Up @@ -304,7 +305,8 @@ protected void completeSchedCtx(TabletSchedCtx tabletCtx)
List<Long> availPath = paths.stream().filter(path -> path.getStorageMedium() == tabletCtx.getStorageMedium()
&& path.isFit(tabletCtx.getTabletSize(), false) == BalanceStatus.OK)
.map(RootPathLoadStatistic::getPathHash).collect(Collectors.toList());
long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath, tabletCtx.getStorageMedium());
long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath, tabletCtx.getTag(),
tabletCtx.getStorageMedium());
if (pathHash == -1) {
throw new SchedException(SchedException.Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
"paths has no available balance slot: " + availPath);
Expand Down Expand Up @@ -368,12 +370,20 @@ public void updateLoadStatistic(Map<Tag, LoadStatisticForTag> statisticMap) {
}
}

public Map<Long, Pair<TabletMove, Long>> getMovesInProgress() {
Map<Long, Pair<TabletMove, Long>> moves = Maps.newHashMap();
movesCacheMap.getCacheMap().values().forEach(
m -> m.values().forEach(cache -> moves.putAll(cache.get().asMap())));

return moves;
}

// Represents a concrete move of a tablet from one be to another.
// Formed logically from a PartitionMove by specifying a tablet for the move.
public static class TabletMove {
Long tabletId;
Long fromBe;
Long toBe;
public Long tabletId;
public Long fromBe;
public Long toBe;

TabletMove(Long id, Long from, Long to) {
this.tabletId = id;
Expand All @@ -397,7 +407,11 @@ public static class ClusterBalanceInfo {
TreeMultimap<Long, TabletInvertedIndex.PartitionBalanceInfo> partitionInfoBySkew
= TreeMultimap.create(Ordering.natural(), Ordering.arbitrary());
TreeMultimap<Long, Long> beByTotalReplicaCount = TreeMultimap.create();
}

@Override
public String toString() {
return "[partitionSkew=" + partitionInfoBySkew + ", totalReplicaNum2Be=" + beByTotalReplicaCount + "]";
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,12 @@
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.EvictingQueue;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -375,7 +377,7 @@ private void updateLoadStatistic() {
Map<Tag, LoadStatisticForTag> newStatisticMap = Maps.newHashMap();
Set<Tag> tags = infoService.getTags();
for (Tag tag : tags) {
LoadStatisticForTag loadStatistic = new LoadStatisticForTag(tag, infoService, invertedIndex);
LoadStatisticForTag loadStatistic = new LoadStatisticForTag(tag, infoService, invertedIndex, rebalancer);
loadStatistic.init();
newStatisticMap.put(tag, loadStatistic);
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -2054,7 +2056,7 @@ public static class PathSlot {
private Map<Long, Slot> pathSlots = Maps.newConcurrentMap();
private long beId;
// only use in takeAnAvailBalanceSlotFrom, make pick RR
private Map<TStorageMedium, Long> lastPickPathHashs = Maps.newHashMap();
private Table<Tag, TStorageMedium, Long> lastPickPathHashs = HashBasedTable.create();

public PathSlot(Map<Long, TStorageMedium> paths, long beId) {
this.beId = beId;
Expand Down Expand Up @@ -2204,29 +2206,37 @@ public synchronized long takeBalanceSlot(long pathHash) {
return -1;
}

public long takeAnAvailBalanceSlotFrom(List<Long> pathHashs, TStorageMedium medium) {
public long takeAnAvailBalanceSlotFrom(List<Long> pathHashs, Tag tag, TStorageMedium medium) {
if (pathHashs.isEmpty()) {
return -1;
}

if (tag == null) {
tag = Tag.DEFAULT_BACKEND_TAG;
}

Collections.sort(pathHashs);
synchronized (this) {
int preferSlotIndex = pathHashs.indexOf(lastPickPathHashs.getOrDefault(medium, -1L)) + 1;
Long lastPathHash = lastPickPathHashs.get(tag, medium);
if (lastPathHash == null) {
lastPathHash = -1L;
}
int preferSlotIndex = pathHashs.indexOf(lastPathHash) + 1;
if (preferSlotIndex < 0 || preferSlotIndex >= pathHashs.size()) {
preferSlotIndex = 0;
}

for (int i = preferSlotIndex; i < pathHashs.size(); i++) {
long pathHash = pathHashs.get(i);
if (takeBalanceSlot(pathHash) != -1) {
lastPickPathHashs.put(medium, pathHash);
lastPickPathHashs.put(tag, medium, pathHash);
return pathHash;
}
}
for (int i = 0; i < preferSlotIndex; i++) {
long pathHash = pathHashs.get(i);
if (takeBalanceSlot(pathHash) != -1) {
lastPickPathHashs.put(medium, pathHash);
lastPickPathHashs.put(tag, medium, pathHash);
return pathHash;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,7 @@ public List<PartitionMove> getNextMoves(ClusterBalanceInfo info, int maxMovesNum
if (LOG.isDebugEnabled()) {
LOG.debug(keySet);
}
Preconditions.checkState(keySet.isEmpty() || keySet.last() == 0L,
"non-zero replica count on be while no partition skew information in skewMap");
// Nothing to balance: cluster is empty.

return Lists.newArrayList();
}

Expand All @@ -156,7 +154,6 @@ public List<PartitionMove> getNextMoves(ClusterBalanceInfo info, int maxMovesNum
return Lists.newArrayList();
}


List<PartitionMove> moves = Lists.newArrayList();
for (int i = 0; i < maxMovesNum; ++i) {
PartitionMove move = getNextMove(info.beByTotalReplicaCount, info.partitionInfoBySkew);
Expand All @@ -178,12 +175,8 @@ private PartitionMove getNextMove(TreeMultimap<Long, Long> beByTotalReplicaCount
return null;
}
long maxPartitionSkew = skewMap.keySet().last();
long maxBeSkew = beByTotalReplicaCount.keySet().last() - beByTotalReplicaCount.keySet().first();

// 1. Every partition is balanced(maxPartitionSkew<=1) and any move will unbalance a partition, so there
// is no potential for the greedy algorithm to balance the cluster.
// 2. Every partition is balanced(maxPartitionSkew<=1) and the cluster as a whole is balanced(maxBeSkew<=1).
if (maxPartitionSkew == 0L || (maxPartitionSkew <= 1L && maxBeSkew <= 1L)) {
// don't make a global balance because beByTotalReplicaCount may contains tablets for other medium or tag
if (maxPartitionSkew <= 1L) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void setUp() {
@Test
public void test() {
LoadStatisticForTag loadStatistic = new LoadStatisticForTag(
Tag.DEFAULT_BACKEND_TAG, systemInfoService, invertedIndex);
Tag.DEFAULT_BACKEND_TAG, systemInfoService, invertedIndex, null);
loadStatistic.init();
List<List<String>> infos = loadStatistic.getStatistic(TStorageMedium.HDD);
Assert.assertEquals(3, infos.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ long ignored() {

private void generateStatisticsAndPathSlots() {
LoadStatisticForTag loadStatistic = new LoadStatisticForTag(Tag.DEFAULT_BACKEND_TAG, systemInfoService,
invertedIndex);
invertedIndex, null);
loadStatistic.init();
statisticMap = Maps.newHashMap();
statisticMap.put(Tag.DEFAULT_BACKEND_TAG, loadStatistic);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.clone;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.utframe.TestWithFeService;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Set;
import java.util.stream.Collectors;

public class PartitionRebalancerTest extends TestWithFeService {

@Override
protected void beforeCreatingConnectContext() throws Exception {
Config.tablet_schedule_interval_ms = 100;
Config.tablet_checker_interval_ms = 100;
Config.tablet_rebalancer_type = "partition";
Config.tablet_repair_delay_factor_second = 1;
Config.schedule_slot_num_per_hdd_path = 10000;
Config.schedule_slot_num_per_ssd_path = 10000;
Config.schedule_batch_size = 10000;
Config.max_scheduling_tablets = 10000;
Config.max_balancing_tablets = 10000;
Config.partition_rebalance_max_moves_num_per_selection = 5;
}

@Override
protected int backendNum() {
return 3;
}

@Test
public void testBalance() throws Exception {
createDatabase("test");
createTable("CREATE TABLE test.tbl1 (k INT) DISTRIBUTED BY HASH(k) BUCKETS 32"
+ " PROPERTIES ('replication_num' = '1')");

Thread.sleep(2000);
Assertions.assertEquals(Sets.newHashSet(11, 11, 10), getBackendTabletNums());

checkBEHeartbeat(Lists.newArrayList(createBackend("127.0.0.4", lastFeRpcPort)));
Thread.sleep(2000);
Assertions.assertEquals(Sets.newHashSet(8, 8, 8, 8), getBackendTabletNums());

checkBEHeartbeat(Lists.newArrayList(createBackend("127.0.0.5", lastFeRpcPort)));
Thread.sleep(2000);
Assertions.assertEquals(Sets.newHashSet(7, 7, 6, 6, 6), getBackendTabletNums());
}

private Set<Integer> getBackendTabletNums() {
return Env.getCurrentSystemInfo().getAllBackendIds().stream()
.map(beId -> Env.getCurrentInvertedIndex().getTabletIdsByBackendId(beId).size())
.collect(Collectors.toSet());
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.common.Config;
import org.apache.doris.resource.Tag;
import org.apache.doris.thrift.TStorageMedium;

import com.google.common.collect.Lists;
Expand All @@ -33,7 +34,7 @@
class PathSlotTest {

@Test
public void test() {
public void test() throws Exception {
Config.balance_slot_num_per_path = 2;
Map<Long, TStorageMedium> paths = Maps.newHashMap();
List<Long> availPathHashs = Lists.newArrayList();
Expand All @@ -57,7 +58,8 @@ public void test() {
PathSlot ps = new PathSlot(paths, 1L);
for (int i = 0; i < expectPathHashs.size(); i++) {
Collections.shuffle(availPathHashs);
gotPathHashs.add(ps.takeAnAvailBalanceSlotFrom(availPathHashs, medium));
gotPathHashs.add(ps.takeAnAvailBalanceSlotFrom(availPathHashs,
Tag.create(Tag.TYPE_LOCATION, "zone1"), medium));
}
Assert.assertEquals(expectPathHashs, gotPathHashs);
}
Expand Down
Loading