Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -313,16 +314,16 @@ protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
regionIndex++;
}

if (LOG.isDebugEnabled()) {
if (LOG.isTraceEnabled()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These logs are really noisy in tests, and low value imo, so I've demoted them to trace

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

for (int i = 0; i < numServers; i++) {
LOG.debug("server {} has {} regions", i, regionsPerServer[i].length);
LOG.trace("server {} has {} regions", i, regionsPerServer[i].length);
}
}
for (int i = 0; i < serversPerHostList.size(); i++) {
serversPerHost[i] = new int[serversPerHostList.get(i).size()];
for (int j = 0; j < serversPerHost[i].length; j++) {
serversPerHost[i][j] = serversPerHostList.get(i).get(j);
LOG.debug("server {} is on host {}", serversPerHostList.get(i).get(j), i);
LOG.trace("server {} is on host {}", serversPerHostList.get(i).get(j), i);
}
if (serversPerHost[i].length > 1) {
multiServersPerHost = true;
Expand All @@ -333,7 +334,7 @@ protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
serversPerRack[i] = new int[serversPerRackList.get(i).size()];
for (int j = 0; j < serversPerRack[i].length; j++) {
serversPerRack[i][j] = serversPerRackList.get(i).get(j);
LOG.info("server {} is on rack {}", serversPerRackList.get(i).get(j), i);
LOG.trace("server {} is on rack {}", serversPerRackList.get(i).get(j), i);
}
}

Expand Down Expand Up @@ -1089,8 +1090,8 @@ void setStopRequestedAt(long stopRequestedAt) {
this.stopRequestedAt = stopRequestedAt;
}

long getStopRequestedAt() {
return stopRequestedAt;
boolean isStopRequested() {
return EnvironmentEdgeManager.currentTime() > stopRequestedAt;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ final class BalancerConditionals implements Configurable {
"hbase.master.balancer.stochastic.conditionals.distributeReplicas";
public static final boolean DISTRIBUTE_REPLICAS_DEFAULT = false;

public static final String ISOLATE_META_TABLE_KEY =
"hbase.master.balancer.stochastic.conditionals.isolateMetaTable";
public static final boolean ISOLATE_META_TABLE_DEFAULT = false;

public static final String ADDITIONAL_CONDITIONALS_KEY =
"hbase.master.balancer.stochastic.additionalConditionals";

Expand Down Expand Up @@ -90,8 +94,14 @@ boolean isReplicaDistributionEnabled() {
.anyMatch(DistributeReplicasConditional.class::isAssignableFrom);
}

boolean shouldSkipSloppyServerEvaluation() {
return isConditionalBalancingEnabled();
boolean isTableIsolationEnabled() {
return conditionalClasses.contains(MetaTableIsolationConditional.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This strategy of feature enablement by class presence is interesting. I wonder how we'll have to change this approach if we introduce user-provided implementations in the future. Just a thought.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I introduce SystemTableIsolation I anticipate making this an isAssignableFrom check much like isReplicaDistributionEnabled

}

boolean isServerHostingIsolatedTables(BalancerClusterState cluster, int serverIdx) {
return conditionals.stream().filter(TableIsolationConditional.class::isInstance)
.map(TableIsolationConditional.class::cast)
.anyMatch(conditional -> conditional.isServerHostingIsolatedTables(cluster, serverIdx));
}

boolean isConditionalBalancingEnabled() {
Expand Down Expand Up @@ -192,6 +202,11 @@ public void setConf(Configuration conf) {
conditionalClasses.add(DistributeReplicasConditional.class);
}

boolean isolateMetaTable = conf.getBoolean(ISOLATE_META_TABLE_KEY, ISOLATE_META_TABLE_DEFAULT);
if (isolateMetaTable) {
conditionalClasses.add(MetaTableIsolationConditional.class);
}

Class<?>[] classes = conf.getClasses(ADDITIONAL_CONDITIONALS_KEY);
for (Class<?> clazz : classes) {
if (!RegionPlanConditional.class.isAssignableFrom(clazz)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKey;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -60,10 +59,7 @@ BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing
List<MoveRegionAction> moveRegionActions = new ArrayList<>();
List<Integer> shuffledServerIndices = cluster.getShuffledServerIndices();
for (int sourceIndex : shuffledServerIndices) {
if (
moveRegionActions.size() >= BATCH_SIZE
|| EnvironmentEdgeManager.currentTime() > cluster.getStopRequestedAt()
) {
if (moveRegionActions.size() >= BATCH_SIZE || cluster.isStopRequested()) {
break;
}
int[] serverRegions = cluster.regionsPerServer[sourceIndex];
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;

import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public final class MetaTableIsolationCandidateGenerator extends TableIsolationCandidateGenerator {

MetaTableIsolationCandidateGenerator(BalancerConditionals balancerConditionals) {
super(balancerConditionals);
}

@Override
boolean shouldBeIsolated(RegionInfo regionInfo) {
return regionInfo.isMetaRegion();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;

import org.apache.hadoop.hbase.client.RegionInfo;

/**
* If enabled, this class will help the balancer ensure that the meta table lives on its own
* RegionServer. Configure this via {@link BalancerConditionals#ISOLATE_META_TABLE_KEY}
*/
class MetaTableIsolationConditional extends TableIsolationConditional {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation, and the MetaTableIsolationCandidateGenerator, are so lightweight because I anticipate adding a SystemTableIsolationConditional/CandidateGenerator soon


public MetaTableIsolationConditional(BalancerConditionals balancerConditionals,
BalancerClusterState cluster) {
super(balancerConditionals, cluster);
}

@Override
boolean isRegionToIsolate(RegionInfo regionInfo) {
return regionInfo.isMetaRegion();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it kind of a code smell that both the CandidateGenerator and the Conditional implementation have repeated logic? As you land your additional implementations, I wonder if this will suggest a change to the interfaces.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably yes, let me think about reusability maybe. But I might chalk this up as a relatively small imperfection

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to leave this alone for now, I think it's a disappointing but small reality of the logical divide between conditionals and candidate generators. Will keep thinking on this as I introduce the system table isolation conditional though

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,11 @@ BalanceAction generate(BalancerClusterState cluster) {
return balanceAction;
}

MoveBatchAction batchMovesAndResetClusterState(BalancerClusterState cluster,
BalanceAction batchMovesAndResetClusterState(BalancerClusterState cluster,
List<MoveRegionAction> moves) {
if (moves.isEmpty()) {
return BalanceAction.NULL_ACTION;
}
Comment on lines +67 to +71
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning a null action is better here, because it signifies that there's no work to be done by the generator — whereas a batch, even an empty one, is easily misinterpreted as actionable work

MoveBatchAction batchAction = new MoveBatchAction(moves);
undoBatchAction(cluster, batchAction);
return batchAction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ final class SlopFixingCandidateGenerator extends RegionPlanConditionalCandidateG

@Override
BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing) {
boolean isTableIsolationEnabled = getBalancerConditionals().isTableIsolationEnabled();
ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
float average = cs.getLoadAverage();
int ceiling = (int) Math.ceil(average * (1 + slop));
Expand All @@ -63,6 +64,13 @@ BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing
List<MoveRegionAction> moves = new ArrayList<>();
Set<ServerAndLoad> fixedServers = new HashSet<>();
for (int sourceServer : sloppyServerIndices) {
if (
isTableIsolationEnabled
&& getBalancerConditionals().isServerHostingIsolatedTables(cluster, sourceServer)
) {
// Don't fix sloppiness of servers hosting isolated tables
continue;
}
for (int regionIdx : cluster.regionsPerServer[sourceServer]) {
boolean regionFoundMove = false;
for (ServerAndLoad serverAndLoad : cs.getServersByLoad().keySet()) {
Expand All @@ -88,8 +96,8 @@ BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing
}
fixedServers.forEach(s -> cs.getServersByLoad().remove(s));
fixedServers.clear();
if (!regionFoundMove) {
LOG.debug("Could not find a destination for region {} from server {}.", regionIdx,
if (!regionFoundMove && LOG.isTraceEnabled()) {
LOG.trace("Could not find a destination for region {} from server {}.", regionIdx,
sourceServer);
}
if (cluster.regionsPerServer[sourceServer].length <= ceiling) {
Expand All @@ -98,8 +106,6 @@ BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing
}
}

MoveBatchAction batch = new MoveBatchAction(moves);
undoBatchAction(cluster, batch);
return batch;
return batchMovesAndResetClusterState(cluster, moves);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,10 @@ boolean needsBalance(TableName tableName, BalancerClusterState cluster) {
return true;
}

if (sloppyRegionServerExist(cs)) {
if (
// table isolation is inherently incompatible with naive "sloppy server" checks
!balancerConditionals.isTableIsolationEnabled() && sloppyRegionServerExist(cs)
) {
LOG.info("Running balancer because cluster has sloppy server(s)." + " function cost={}",
functionCost());
return true;
Expand Down Expand Up @@ -700,7 +703,7 @@ protected List<RegionPlan> balanceTable(TableName tableName,
updateCostsAndWeightsWithAction(cluster, undoAction);
}

if (EnvironmentEdgeManager.currentTime() > cluster.getStopRequestedAt()) {
if (cluster.isStopRequested()) {
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public abstract class TableIsolationCandidateGenerator
extends RegionPlanConditionalCandidateGenerator {

private static final Logger LOG = LoggerFactory.getLogger(TableIsolationCandidateGenerator.class);

TableIsolationCandidateGenerator(BalancerConditionals balancerConditionals) {
super(balancerConditionals);
}

abstract boolean shouldBeIsolated(RegionInfo regionInfo);

@Override
BalanceAction generate(BalancerClusterState cluster) {
return generateCandidate(cluster, false);
}

BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is pretty straightforward in my opinion; if there's any head scratching complexity, then it's in this method. I'm happy to explore breaking up this method, which is admittedly probably too long, but doing so would probably introduce a much larger diff because we'd need more objects to represent the state that should be passed around across the goals of isolation + colocation

if (!getBalancerConditionals().isTableIsolationEnabled()) {
return BalanceAction.NULL_ACTION;
}

List<MoveRegionAction> moves = new ArrayList<>();
List<Integer> serverIndicesHoldingIsolatedRegions = new ArrayList<>();
int isolatedTableMaxReplicaCount = 1;
for (int serverIdx : cluster.getShuffledServerIndices()) {
if (cluster.isStopRequested()) {
break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this return a NULL_ACTION instead of falling through to the rest of the method?

... this method is long enough that I cannot easily tell the nesting from cursory reading. Maybe the entirety of the remaining method body is inside this for-loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method is definitely a little long; I talk about this a little bit here, but I'm basically on the fence about whether it's worth splitting this up

Regarding whether this should return a null action, I'd say we should leave that to the logic at the end of this method which will submit any moves we have found, or return a null action if we haven't found any good moves

}
boolean hasRegionsToIsolate = false;
Set<Integer> regionsToMove = new HashSet<>();

// Move non-target regions away from target regions,
// and track replica counts so we know how many isolated hosts we need
for (int regionIdx : cluster.regionsPerServer[serverIdx]) {
RegionInfo regionInfo = cluster.regions[regionIdx];
if (shouldBeIsolated(regionInfo)) {
hasRegionsToIsolate = true;
int replicaCount = regionInfo.getReplicaId() + 1;
if (replicaCount > isolatedTableMaxReplicaCount) {
isolatedTableMaxReplicaCount = replicaCount;
}
} else {
regionsToMove.add(regionIdx);
}
}

if (hasRegionsToIsolate) {
serverIndicesHoldingIsolatedRegions.add(serverIdx);
}

// Generate non-system regions to move, if applicable
if (hasRegionsToIsolate && !regionsToMove.isEmpty()) {
for (int regionToMove : regionsToMove) {
for (int i = 0; i < cluster.numServers; i++) {
int targetServer = pickOtherRandomServer(cluster, serverIdx);
MoveRegionAction possibleMove =
new MoveRegionAction(regionToMove, serverIdx, targetServer);
if (!getBalancerConditionals().isViolating(cluster, possibleMove)) {
if (isWeighing) {
return possibleMove;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, what does weighing mean ? Why early return here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. So all candidate generators maintain a "weight" which influences their likelihood of being selected for the next proposed action. Conditional candidate generators, on brand with their binary decision making, are weighed based on whether they have any work to do — if they do, then we'll prioritize their moves over the more generic generators (like the RandomCandidateGenerator). So generateAction can get called in two distinct ways: to produce a robust plan of action, or to weigh whether there is any work to do. In the latter, we'd like to exit as early as possible to speed things up

}
cluster.doAction(possibleMove); // Update cluster state to reflect move
moves.add(possibleMove);
break;
}
}
}
}
}

// Try to consolidate regions on only n servers, where n is the number of replicas
if (serverIndicesHoldingIsolatedRegions.size() > isolatedTableMaxReplicaCount) {
// One target per replica
List<Integer> targetServerIndices = new ArrayList<>();
for (int i = 0; i < isolatedTableMaxReplicaCount; i++) {
targetServerIndices.add(serverIndicesHoldingIsolatedRegions.get(i));
}
// Move all isolated regions from non-targets to targets
for (int i = isolatedTableMaxReplicaCount; i
< serverIndicesHoldingIsolatedRegions.size(); i++) {
int fromServer = serverIndicesHoldingIsolatedRegions.get(i);
for (int regionIdx : cluster.regionsPerServer[fromServer]) {
RegionInfo regionInfo = cluster.regions[regionIdx];
if (shouldBeIsolated(regionInfo)) {
int targetServer = targetServerIndices.get(i % isolatedTableMaxReplicaCount);
MoveRegionAction possibleMove =
new MoveRegionAction(regionIdx, fromServer, targetServer);
if (!getBalancerConditionals().isViolating(cluster, possibleMove)) {
if (isWeighing) {
return possibleMove;
}
cluster.doAction(possibleMove); // Update cluster state to reflect move
moves.add(possibleMove);
}
}
}
}
}
return batchMovesAndResetClusterState(cluster, moves);
}
}
Loading