-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Closed
Description
Is your feature request related to a problem? Please describe.
After #4763, we want to import the kudu's rebalance strategy TwoDimensionalGreedyAlgo.
The kudu's two dims are cluster & table. Two dims of Doris should be cluster & partition.
And we only consider about the replica count, do not consider replica size.
Describe the solution you'd like(POC)
PartitionRebalancer:
@override
selectAlternativeTabletsForCluster:
// Cleanup expired moves, or we can move the cleanup into updateLoadStatistic()
movesInProgress.cleanUp()
TwoDimensionalGreedyAlgo.CheckMovesCompleted(movesInProgress)
// GetNextMoves gens vector of move{partitionId, indexId, fromBE, destBE}
moves = TwoDimensionalGreedyAlgo.getNextMoves(clusterBalanceInfo, movesInProgress, maxMoveNum)
// The TabletSchedCtx which will add to TabletScheduler must have to specify the tablet id
for move in moves:
// Pick a tablet of [partitionId-indexId] on fromBE randomly
tabletId = randomPickFrom(move)
// srcReplica & destBe will be specified later, in completeSchedCtx()
tabletSchedCtx = generateSchedCtx(tabletId)
alternativeTablets.add(tabletSchedCtx)
// Mark this move as in progress, the map is <tabletId, {Move, ToDeleteReplicaId}>, the ToDeleteReplicaId is only for func getToDeleteReplicaId()
movesInProgress.add(tabletId, {move, -1})
@override
completeSchedCtx:
try:
move = movesInProgress.get(tabletId).move
// ClusterBalanceInfo may be updated, the move may not improve the partition's or cluster's balance. So we should check the move's validation.
checkValidation(move, clusterBalanceInfo)
// Specify srcReplica & destBe, consider about PathSlots ...
// Omitted ...
movesInProgress.put(tabletId, {move, srcReplicaId})
except e:
// Can't create balance task for this tablet
movesInProgress.invalidate(tabletId)
return error
@override
getToDeleteReplicaId:
// We don't invalidate the entry here, cuz the redundant repair progress is just started. The move should invalidate by ttl or Algo.CheckMoveCompleted()
return movesInProgress.get(tabletId).srcReplicaId
// Cache cleanUp may be not necessary if we cleanup cache in selectAlternativeTabletsForCluster().
// @override
// updateLoadStatistic:
// super.updateLoadStatistic()
// if time_interval > config:
// movesInProgress.cleanUp()
@private
// The validation check cannot be accurate, cuz the production of moves do have ordering. When some moves failed, the cluster skew is different with the skew we used in production. So we just do some basic checks.
checkValidation:
check if fromBe and toBe are alive
check if fromBe.totalReplicaCount > toBe.totalReplicaCount
check if partition.fromBe.replicaCount >
partition.toBe.replicaCount
// To be improved
We need ClusterBalanceInfo for TwoDimensionalGreedyAlgo, actually two maps. Ref kudu ClusterBalanceInfo.
// Balance information for a cluster,
// excluding decommissioned bes and replicas on them.
ClusterBalanceInfo{
partitionInfoBySkew
beByTotoalReplicaCount
}
The algo persudo code:
TwoDimensionalGreedyAlgo:
getNextMoves:
// copy info for applying move
info = copy(clusterBalanceInfo)
applyMoves(info, movesInProgress)
for i in range(0, maxMoveNum):
move = getNextMove(info)
moves.add(move)
applyMove(info, move)
appleyMove:
// Update the balance state in 'ClusterBalanceInfo' with the outcome of the move
// omitted...
getNextMove:
maxPartitionSkew = partitionInfoBySkew.firstKey()
maxBeSkew = maxReplicaNumOfBe - minReplicaNumOfBe
maxSet = clusterBalanceInfo.partitionInfoBySkew.get(maxPartitionSkew)
// Among the partitions with maximum skew, attempt to pick a partition where there is
// a move that improves the partition skew and the cluster skew, if possible. If
// not, attempt to pick a move that improves the partition skew. If all partitions
// are balanced, attempt to pick a move that preserves partition balance and
// improves cluster skew.
findOneMove(maxSet)
checkMovesCompleted:
// TODO
weizuo93 and acelyc111acelyc111
Metadata
Metadata
Assignees
Labels
No labels