Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,20 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultimap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.security.SecureRandom;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;

/*
Expand Down Expand Up @@ -115,40 +118,64 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
= algo.getNextMoves(clusterBalanceInfo, Config.partition_rebalance_max_moves_num_per_selection);

List<TabletSchedCtx> alternativeTablets = Lists.newArrayList();
List<Long> inProgressIds = movesInProgressList.stream().map(m -> m.tabletId).collect(Collectors.toList());
Set<Long> inProgressIds = movesInProgressList.stream().map(m -> m.tabletId).collect(Collectors.toSet());
Random rand = new SecureRandom();
for (TwoDimensionalGreedyRebalanceAlgo.PartitionMove move : moves) {
// Find all tablets of the specified partition that would have a replica at the source be,
// but would not have a replica at the destination be. That is to satisfy the restriction
// of having no more than one replica of the same tablet per be.
List<Long> tabletIds = invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.fromBe, medium);
List<Long> invalidIds = invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.toBe, medium);
tabletIds.removeAll(invalidIds);
// In-progress tablets can't be the candidate too.
tabletIds.removeAll(inProgressIds);
if (tabletIds.isEmpty()) {
continue;
}

Set<Long> invalidIds = Sets.newHashSet(
invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.toBe, medium));

BiPredicate<Long, TabletMeta> canMoveTablet = (Long tabletId, TabletMeta tabletMeta) -> {
return tabletMeta != null
&& tabletMeta.getPartitionId() == move.partitionId
&& tabletMeta.getIndexId() == move.indexId
&& !invalidIds.contains(tabletId)
&& !inProgressIds.contains(tabletId);
};

Map<Long, TabletMeta> tabletCandidates = Maps.newHashMap();
for (long tabletId : tabletIds) {
// Random pick one candidate to create tabletSchedCtx
int startIdx = rand.nextInt(tabletIds.size());
long pickedTabletId = -1L;
TabletMeta pickedTabletMeta = null;
for (int i = startIdx; i < tabletIds.size(); i++) {
long tabletId = tabletIds.get(i);
TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
if (tabletMeta != null && tabletMeta.getPartitionId() == move.partitionId
&& tabletMeta.getIndexId() == move.indexId) {
tabletCandidates.put(tabletId, tabletMeta);
if (canMoveTablet.test(tabletId, tabletMeta)) {
pickedTabletId = tabletId;
pickedTabletMeta = tabletMeta;
break;
}
}
LOG.debug("Find {} candidates for move {}", tabletCandidates.size(), move);
if (tabletCandidates.isEmpty()) {
continue;

if (pickedTabletId == -1L) {
for (int i = 0; i < startIdx; i++) {
long tabletId = tabletIds.get(i);
TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
if (canMoveTablet.test(tabletId, tabletMeta)) {
pickedTabletId = tabletId;
pickedTabletMeta = tabletMeta;
break;
}
}
}

// Random pick one candidate to create tabletSchedCtx
Random rand = new Random();
Object[] keys = tabletCandidates.keySet().toArray();
long pickedTabletId = (long) keys[rand.nextInt(keys.length)];
LOG.debug("Picked tablet id for move {}: {}", move, pickedTabletId);
if (pickedTabletId == -1L) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cann't picked tablet id for move {}", move);
}
continue;
}

TabletMeta tabletMeta = tabletCandidates.get(pickedTabletId);
TabletSchedCtx tabletCtx = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE,
tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId(),
tabletMeta.getIndexId(), pickedTabletId, null /* replica alloc is not used for balance*/,
pickedTabletMeta.getDbId(), pickedTabletMeta.getTableId(), pickedTabletMeta.getPartitionId(),
pickedTabletMeta.getIndexId(), pickedTabletId, null /* replica alloc is not used for balance*/,
System.currentTimeMillis());
tabletCtx.setTag(clusterStat.getTag());
// Balance task's priority is always LOW
Expand Down Expand Up @@ -268,7 +295,7 @@ protected void completeSchedCtx(TabletSchedCtx tabletCtx)
List<Long> availPath = paths.stream().filter(path -> path.getStorageMedium() == tabletCtx.getStorageMedium()
&& path.isFit(tabletCtx.getTabletSize(), false) == BalanceStatus.OK)
.map(RootPathLoadStatistic::getPathHash).collect(Collectors.toList());
long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath);
long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath, tabletCtx.getStorageMedium());
if (pathHash == -1) {
throw new SchedException(SchedException.Status.SCHEDULE_FAILED, SchedException.SubCode.WAITING_SLOT,
"paths has no available balance slot: " + availPath);
Expand Down Expand Up @@ -315,6 +342,11 @@ public Long getToDeleteReplicaId(TabletSchedCtx tabletCtx) {
}
}

@Override
public void invalidateToDeleteReplicaId(TabletSchedCtx tabletCtx) {
movesCacheMap.invalidateTablet(tabletCtx);
}

@Override
public void updateLoadStatistic(Map<Tag, LoadStatisticForTag> statisticMap) {
super.updateLoadStatistic(statisticMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ public Long getToDeleteReplicaId(TabletSchedCtx tabletCtx) {
return -1L;
}

public void invalidateToDeleteReplicaId(TabletSchedCtx tabletCtx) {
}

public void onTabletFailed(TabletSchedCtx tabletCtx) {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,9 @@ private boolean deleteReplicaChosenByRebalancer(TabletSchedCtx tabletCtx, boolea
if (chosenReplica == null) {
return false;
}

deleteReplicaInternal(tabletCtx, chosenReplica, "src replica of rebalance", force);
rebalancer.invalidateToDeleteReplicaId(tabletCtx);

return true;
}
Expand Down Expand Up @@ -1962,11 +1964,10 @@ public static class PathSlot {
private Map<Long, Slot> pathSlots = Maps.newConcurrentMap();
private long beId;
// only use in takeAnAvailBalanceSlotFrom, make pick RR
private long lastPickPathHash;
private Map<TStorageMedium, Long> lastPickPathHashs = Maps.newHashMap();

public PathSlot(Map<Long, TStorageMedium> paths, long beId) {
this.beId = beId;
this.lastPickPathHash = -1;
for (Map.Entry<Long, TStorageMedium> entry : paths.entrySet()) {
pathSlots.put(entry.getKey(), new Slot(entry.getValue()));
}
Expand Down Expand Up @@ -2109,29 +2110,29 @@ public synchronized long takeBalanceSlot(long pathHash) {
return -1;
}

public long takeAnAvailBalanceSlotFrom(List<Long> pathHashs) {
public long takeAnAvailBalanceSlotFrom(List<Long> pathHashs, TStorageMedium medium) {
if (pathHashs.isEmpty()) {
return -1;
}

Collections.sort(pathHashs);
synchronized (this) {
int preferSlotIndex = pathHashs.indexOf(lastPickPathHash) + 1;
int preferSlotIndex = pathHashs.indexOf(lastPickPathHashs.getOrDefault(medium, -1L)) + 1;
if (preferSlotIndex < 0 || preferSlotIndex >= pathHashs.size()) {
preferSlotIndex = 0;
}

for (int i = preferSlotIndex; i < pathHashs.size(); i++) {
long pathHash = pathHashs.get(i);
if (takeBalanceSlot(pathHash) != -1) {
lastPickPathHash = pathHash;
lastPickPathHashs.put(medium, pathHash);
return pathHash;
}
}
for (int i = 0; i < preferSlotIndex; i++) {
long pathHash = pathHashs.get(i);
if (takeBalanceSlot(pathHash) != -1) {
lastPickPathHash = pathHash;
lastPickPathHashs.put(medium, pathHash);
return pathHash;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ public void test() {
List<Long> availPathHashs = Lists.newArrayList();
List<Long> expectPathHashs = Lists.newArrayList();
List<Long> gotPathHashs = Lists.newArrayList();
TStorageMedium medium = TStorageMedium.HDD;
long startPath = 10001L;
long endPath = 10006L;
for (long pathHash = startPath; pathHash < endPath; pathHash++) {
paths.put(pathHash, TStorageMedium.HDD);
paths.put(pathHash, medium);
availPathHashs.add(pathHash);
expectPathHashs.add(pathHash);
}
Expand All @@ -56,7 +57,7 @@ public void test() {
PathSlot ps = new PathSlot(paths, 1L);
for (int i = 0; i < expectPathHashs.size(); i++) {
Collections.shuffle(availPathHashs);
gotPathHashs.add(ps.takeAnAvailBalanceSlotFrom(availPathHashs));
gotPathHashs.add(ps.takeAnAvailBalanceSlotFrom(availPathHashs, medium));
}
Assert.assertEquals(expectPathHashs, gotPathHashs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ protected int backendNum() {
@Override
protected void beforeCluster() {
FeConstants.runningUnitTest = true;
needCleanDir = false;
}

@BeforeAll
Expand Down