Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -311,11 +312,16 @@ protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
regionIndex++;
}

if (LOG.isTraceEnabled()) {
for (int i = 0; i < numServers; i++) {
LOG.trace("server {} has {} regions", i, regionsPerServer[i].length);
}
}
for (int i = 0; i < serversPerHostList.size(); i++) {
serversPerHost[i] = new int[serversPerHostList.get(i).size()];
for (int j = 0; j < serversPerHost[i].length; j++) {
serversPerHost[i][j] = serversPerHostList.get(i).get(j);
LOG.debug("server {} is on host {}", serversPerHostList.get(i).get(j), i);
LOG.trace("server {} is on host {}", serversPerHostList.get(i).get(j), i);
}
if (serversPerHost[i].length > 1) {
multiServersPerHost = true;
Expand All @@ -326,7 +332,7 @@ protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
serversPerRack[i] = new int[serversPerRackList.get(i).size()];
for (int j = 0; j < serversPerRack[i].length; j++) {
serversPerRack[i][j] = serversPerRackList.get(i).get(j);
LOG.info("server {} is on rack {}", serversPerRackList.get(i).get(j), i);
LOG.trace("server {} is on rack {}", serversPerRackList.get(i).get(j), i);
}
}

Expand Down Expand Up @@ -1075,8 +1081,8 @@ void setStopRequestedAt(long stopRequestedAt) {
this.stopRequestedAt = stopRequestedAt;
}

long getStopRequestedAt() {
return stopRequestedAt;
boolean isStopRequested() {
return EnvironmentEdgeManager.currentTime() > stopRequestedAt;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ final class BalancerConditionals implements Configurable {
"hbase.master.balancer.stochastic.conditionals.distributeReplicas";
public static final boolean DISTRIBUTE_REPLICAS_DEFAULT = false;

public static final String ISOLATE_META_TABLE_KEY =
"hbase.master.balancer.stochastic.conditionals.isolateMetaTable";
public static final boolean ISOLATE_META_TABLE_DEFAULT = false;

public static final String ADDITIONAL_CONDITIONALS_KEY =
"hbase.master.balancer.stochastic.additionalConditionals";

Expand Down Expand Up @@ -91,8 +95,14 @@ boolean isReplicaDistributionEnabled() {
.anyMatch(DistributeReplicasConditional.class::isAssignableFrom);
}

boolean shouldSkipSloppyServerEvaluation() {
return isConditionalBalancingEnabled();
boolean isTableIsolationEnabled() {
return conditionalClasses.contains(MetaTableIsolationConditional.class);
}

boolean isServerHostingIsolatedTables(BalancerClusterState cluster, int serverIdx) {
return conditionals.stream().filter(TableIsolationConditional.class::isInstance)
.map(TableIsolationConditional.class::cast)
.anyMatch(conditional -> conditional.isServerHostingIsolatedTables(cluster, serverIdx));
}

boolean isConditionalBalancingEnabled() {
Expand Down Expand Up @@ -193,6 +203,11 @@ public void setConf(Configuration conf) {
conditionalClasses.add(DistributeReplicasConditional.class);
}

boolean isolateMetaTable = conf.getBoolean(ISOLATE_META_TABLE_KEY, ISOLATE_META_TABLE_DEFAULT);
if (isolateMetaTable) {
conditionalClasses.add(MetaTableIsolationConditional.class);
}

Class<?>[] classes = conf.getClasses(ADDITIONAL_CONDITIONALS_KEY);
for (Class<?> clazz : classes) {
if (!RegionPlanConditional.class.isAssignableFrom(clazz)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKey;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -60,10 +59,7 @@ BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing
List<MoveRegionAction> moveRegionActions = new ArrayList<>();
List<Integer> shuffledServerIndices = cluster.getShuffledServerIndices();
for (int sourceIndex : shuffledServerIndices) {
if (
moveRegionActions.size() >= BATCH_SIZE
|| EnvironmentEdgeManager.currentTime() > cluster.getStopRequestedAt()
) {
if (moveRegionActions.size() >= BATCH_SIZE || cluster.isStopRequested()) {
break;
}
int[] serverRegions = cluster.regionsPerServer[sourceIndex];
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;

import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public final class MetaTableIsolationCandidateGenerator extends TableIsolationCandidateGenerator {

MetaTableIsolationCandidateGenerator(BalancerConditionals balancerConditionals) {
super(balancerConditionals);
}

@Override
boolean shouldBeIsolated(RegionInfo regionInfo) {
return regionInfo.isMetaRegion();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;

import org.apache.hadoop.hbase.client.RegionInfo;

/**
* If enabled, this class will help the balancer ensure that the meta table lives on its own
* RegionServer. Configure this via {@link BalancerConditionals#ISOLATE_META_TABLE_KEY}
*/
class MetaTableIsolationConditional extends TableIsolationConditional {

public MetaTableIsolationConditional(BalancerConditionals balancerConditionals,
BalancerClusterState cluster) {
super(balancerConditionals, cluster);
}

@Override
boolean isRegionToIsolate(RegionInfo regionInfo) {
return regionInfo.isMetaRegion();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,11 @@ BalanceAction generate(BalancerClusterState cluster) {
return balanceAction;
}

MoveBatchAction batchMovesAndResetClusterState(BalancerClusterState cluster,
BalanceAction batchMovesAndResetClusterState(BalancerClusterState cluster,
List<MoveRegionAction> moves) {
if (moves.isEmpty()) {
return BalanceAction.NULL_ACTION;
}
MoveBatchAction batchAction = new MoveBatchAction(moves);
undoBatchAction(cluster, batchAction);
return batchAction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ final class SlopFixingCandidateGenerator extends RegionPlanConditionalCandidateG

@Override
BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing) {
boolean isTableIsolationEnabled = getBalancerConditionals().isTableIsolationEnabled();
ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
float average = cs.getLoadAverage();
int ceiling = (int) Math.ceil(average * (1 + slop));
Expand All @@ -63,6 +64,13 @@ BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing
List<MoveRegionAction> moves = new ArrayList<>();
Set<ServerAndLoad> fixedServers = new HashSet<>();
for (int sourceServer : sloppyServerIndices) {
if (
isTableIsolationEnabled
&& getBalancerConditionals().isServerHostingIsolatedTables(cluster, sourceServer)
) {
// Don't fix sloppiness of servers hosting isolated tables
continue;
}
for (int regionIdx : cluster.regionsPerServer[sourceServer]) {
boolean regionFoundMove = false;
for (ServerAndLoad serverAndLoad : cs.getServersByLoad().keySet()) {
Expand All @@ -88,8 +96,8 @@ BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing
}
fixedServers.forEach(s -> cs.getServersByLoad().remove(s));
fixedServers.clear();
if (!regionFoundMove) {
LOG.debug("Could not find a destination for region {} from server {}.", regionIdx,
if (!regionFoundMove && LOG.isTraceEnabled()) {
LOG.trace("Could not find a destination for region {} from server {}.", regionIdx,
sourceServer);
}
if (cluster.regionsPerServer[sourceServer].length <= ceiling) {
Expand All @@ -98,8 +106,6 @@ BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing
}
}

MoveBatchAction batch = new MoveBatchAction(moves);
undoBatchAction(cluster, batch);
return batch;
return batchMovesAndResetClusterState(cluster, moves);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,10 @@ boolean needsBalance(TableName tableName, BalancerClusterState cluster) {
return true;
}

if (sloppyRegionServerExist(cs)) {
if (
// table isolation is inherently incompatible with naive "sloppy server" checks
!balancerConditionals.isTableIsolationEnabled() && sloppyRegionServerExist(cs)
) {
LOG.info("Running balancer because cluster has sloppy server(s)." + " function cost={}",
functionCost());
return true;
Expand Down Expand Up @@ -747,7 +750,7 @@ protected List<RegionPlan> balanceTable(TableName tableName,
updateCostsAndWeightsWithAction(cluster, undoAction);
}

if (EnvironmentEdgeManager.currentTime() > cluster.getStopRequestedAt()) {
if (cluster.isStopRequested()) {
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public abstract class TableIsolationCandidateGenerator
extends RegionPlanConditionalCandidateGenerator {

private static final Logger LOG = LoggerFactory.getLogger(TableIsolationCandidateGenerator.class);

TableIsolationCandidateGenerator(BalancerConditionals balancerConditionals) {
super(balancerConditionals);
}

abstract boolean shouldBeIsolated(RegionInfo regionInfo);

@Override
BalanceAction generate(BalancerClusterState cluster) {
return generateCandidate(cluster, false);
}

BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing) {
if (!getBalancerConditionals().isTableIsolationEnabled()) {
return BalanceAction.NULL_ACTION;
}

List<MoveRegionAction> moves = new ArrayList<>();
List<Integer> serverIndicesHoldingIsolatedRegions = new ArrayList<>();
int isolatedTableMaxReplicaCount = 1;
for (int serverIdx : cluster.getShuffledServerIndices()) {
if (cluster.isStopRequested()) {
break;
}
boolean hasRegionsToIsolate = false;
Set<Integer> regionsToMove = new HashSet<>();

// Move non-target regions away from target regions,
// and track replica counts so we know how many isolated hosts we need
for (int regionIdx : cluster.regionsPerServer[serverIdx]) {
RegionInfo regionInfo = cluster.regions[regionIdx];
if (shouldBeIsolated(regionInfo)) {
hasRegionsToIsolate = true;
int replicaCount = regionInfo.getReplicaId() + 1;
if (replicaCount > isolatedTableMaxReplicaCount) {
isolatedTableMaxReplicaCount = replicaCount;
}
} else {
regionsToMove.add(regionIdx);
}
}

if (hasRegionsToIsolate) {
serverIndicesHoldingIsolatedRegions.add(serverIdx);
}

// Generate non-system regions to move, if applicable
if (hasRegionsToIsolate && !regionsToMove.isEmpty()) {
for (int regionToMove : regionsToMove) {
for (int i = 0; i < cluster.numServers; i++) {
int targetServer = pickOtherRandomServer(cluster, serverIdx);
MoveRegionAction possibleMove =
new MoveRegionAction(regionToMove, serverIdx, targetServer);
if (!getBalancerConditionals().isViolating(cluster, possibleMove)) {
if (isWeighing) {
return possibleMove;
}
cluster.doAction(possibleMove); // Update cluster state to reflect move
moves.add(possibleMove);
break;
}
}
}
}
}

// Try to consolidate regions on only n servers, where n is the number of replicas
if (serverIndicesHoldingIsolatedRegions.size() > isolatedTableMaxReplicaCount) {
// One target per replica
List<Integer> targetServerIndices = new ArrayList<>();
for (int i = 0; i < isolatedTableMaxReplicaCount; i++) {
targetServerIndices.add(serverIndicesHoldingIsolatedRegions.get(i));
}
// Move all isolated regions from non-targets to targets
for (int i = isolatedTableMaxReplicaCount; i
< serverIndicesHoldingIsolatedRegions.size(); i++) {
int fromServer = serverIndicesHoldingIsolatedRegions.get(i);
for (int regionIdx : cluster.regionsPerServer[fromServer]) {
RegionInfo regionInfo = cluster.regions[regionIdx];
if (shouldBeIsolated(regionInfo)) {
int targetServer = targetServerIndices.get(i % isolatedTableMaxReplicaCount);
MoveRegionAction possibleMove =
new MoveRegionAction(regionIdx, fromServer, targetServer);
if (!getBalancerConditionals().isViolating(cluster, possibleMove)) {
if (isWeighing) {
return possibleMove;
}
cluster.doAction(possibleMove); // Update cluster state to reflect move
moves.add(possibleMove);
}
}
}
}
}
return batchMovesAndResetClusterState(cluster, moves);
}
}
Loading