Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
*/
package org.apache.hadoop.hbase.master.balancer;

import java.util.List;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;

@InterfaceAudience.Private
class AssignRegionAction extends BalanceAction {
private final int region;
Expand All @@ -46,6 +50,12 @@ public BalanceAction undoAction() {
throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED);
}

@Override
List<RegionPlan> toRegionPlans(BalancerClusterState cluster) {
return ImmutableList
.of(new RegionPlan(cluster.regions[getRegion()], null, cluster.servers[getServer()]));
}

@Override
public String toString() {
return getType() + ": " + region + ":" + server;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.master.balancer;

import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand All @@ -28,11 +31,11 @@ enum Type {
ASSIGN_REGION,
MOVE_REGION,
SWAP_REGIONS,
MOVE_BATCH,
NULL,
}

static final BalanceAction NULL_ACTION = new BalanceAction(Type.NULL) {
};
static final BalanceAction NULL_ACTION = new NullBalanceAction();

private final Type type;

Expand All @@ -43,16 +46,39 @@ enum Type {
/**
* Returns an Action which would undo this action
*/
BalanceAction undoAction() {
return this;
}
abstract BalanceAction undoAction();

/**
* Returns the Action represented as RegionPlans
*/
abstract List<RegionPlan> toRegionPlans(BalancerClusterState cluster);

Type getType() {
return type;
}

long getStepCount() {
return 1;
}

@Override
public String toString() {
return type + ":";
}

private static final class NullBalanceAction extends BalanceAction {
private NullBalanceAction() {
super(Type.NULL);
}

@Override
BalanceAction undoAction() {
return this;
}

@Override
List<RegionPlan> toRegionPlans(BalancerClusterState cluster) {
return Collections.emptyList();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.agrona.collections.Hashing;
import org.agrona.collections.Int2IntCounterMap;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
Expand All @@ -39,6 +42,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;

/**
* An efficient array based implementation similar to ClusterState for keeping the status of the
* cluster in terms of region assignment and distribution. LoadBalancers, such as
Expand Down Expand Up @@ -123,6 +128,15 @@ class BalancerClusterState {
// Maps regionName -> oldServerName -> cache ratio of the region on the old server
Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap;

private final Supplier<List<Integer>> shuffledServerIndicesSupplier =
Suppliers.memoizeWithExpiration(() -> {
Collection<Integer> serverIndices = serversToIndex.values();
List<Integer> shuffledServerIndices = new ArrayList<>(serverIndices);
Collections.shuffle(shuffledServerIndices);
return shuffledServerIndices;
}, 5, TimeUnit.SECONDS);
private long stopRequestedAt = Long.MAX_VALUE;

static class DefaultRackManager extends RackManager {
@Override
public String getRack(ServerName server) {
Expand Down Expand Up @@ -728,8 +742,25 @@ public void doAction(BalanceAction action) {
regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
break;
case MOVE_BATCH:
assert action instanceof MoveBatchAction : action.getClass();
MoveBatchAction mba = (MoveBatchAction) action;
for (int serverIndex : mba.getServerToRegionsToRemove().keySet()) {
Set<Integer> regionsToRemove = mba.getServerToRegionsToRemove().get(serverIndex);
regionsPerServer[serverIndex] =
removeRegions(regionsPerServer[serverIndex], regionsToRemove);
}
for (int serverIndex : mba.getServerToRegionsToAdd().keySet()) {
Set<Integer> regionsToAdd = mba.getServerToRegionsToAdd().get(serverIndex);
regionsPerServer[serverIndex] = addRegions(regionsPerServer[serverIndex], regionsToAdd);
}
for (MoveRegionAction moveRegionAction : mba.getMoveActions()) {
regionMoved(moveRegionAction.getRegion(), moveRegionAction.getFromServer(),
moveRegionAction.getToServer());
}
break;
default:
throw new RuntimeException("Uknown action:" + action.getType());
throw new RuntimeException("Unknown action:" + action.getType());
}
}

Expand Down Expand Up @@ -891,6 +922,52 @@ int[] addRegion(int[] regions, int regionIndex) {
return newRegions;
}

int[] removeRegions(int[] regions, Set<Integer> regionIndicesToRemove) {
// Calculate the size of the new regions array
int newSize = regions.length - regionIndicesToRemove.size();
if (newSize < 0) {
throw new IllegalStateException(
"Region indices mismatch: more regions to remove than in the regions array");
}

int[] newRegions = new int[newSize];
int newIndex = 0;

// Copy only the regions not in the removal set
for (int region : regions) {
if (!regionIndicesToRemove.contains(region)) {
newRegions[newIndex++] = region;
}
}

// If the newIndex is smaller than newSize, some regions were missing from the input array
if (newIndex != newSize) {
throw new IllegalStateException("Region indices mismatch: some regions in the removal "
+ "set were not found in the regions array");
}

return newRegions;
}

int[] addRegions(int[] regions, Set<Integer> regionIndicesToAdd) {
int[] newRegions = new int[regions.length + regionIndicesToAdd.size()];

// Copy the existing regions to the new array
System.arraycopy(regions, 0, newRegions, 0, regions.length);

// Add the new regions at the end of the array
int newIndex = regions.length;
for (int regionIndex : regionIndicesToAdd) {
newRegions[newIndex++] = regionIndex;
}

return newRegions;
}

List<Integer> getShuffledServerIndices() {
return shuffledServerIndicesSupplier.get();
}

int[] addRegionSorted(int[] regions, int regionIndex) {
int[] newRegions = new int[regions.length + 1];
int i = 0;
Expand Down Expand Up @@ -990,6 +1067,18 @@ void setNumMovedRegions(int numMovedRegions) {
this.numMovedRegions = numMovedRegions;
}

public int getMaxReplicas() {
return maxReplicas;
}

void setStopRequestedAt(long stopRequestedAt) {
this.stopRequestedAt = stopRequestedAt;
}

long getStopRequestedAt() {
return stopRequestedAt;
}

@Override
public String toString() {
StringBuilder desc = new StringBuilder("Cluster={servers=[");
Expand Down
Loading