Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ public interface LoadBalancer extends Stoppable, ConfigurationObserver {
*/
@Deprecated
String HBASE_RSGROUP_LOADBALANCER_CLASS = "hbase.rsgroup.grouploadbalancer.class";

/**
* Set the current cluster status. This allows a LoadBalancer to map host name to a server
*/
void setClusterMetrics(ClusterMetrics st);
void updateClusterMetrics(ClusterMetrics metrics);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We change the method name because the method is now being used differently? Before we called 'set' once? But now we 'update' continuously during operation? Or was it just always incorrectly named?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a background chore called ClusterStatusChore to call this method periodically...



/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
Expand Down Expand Up @@ -53,6 +52,10 @@
* The base class for load balancers. It provides the the functions used to by
* {@code AssignmentManager} to assign regions in the edge cases. It doesn't provide an
* implementation of the actual balancing algorithm.
* <p/>
* Since 3.0.0, all the balancers will be wrapped inside a {@code RSGroupBasedLoadBalancer}, it will
* be in charge of the synchronization of balancing and configuration changing, so we do not need to
* synchronized by ourselves.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Wondering what the rationale for this (Perhaps it later in this patch)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is because a previous work, where we move rsgroup feature to core hbase instead of using coprocessor.

See HBASE-22514.

*/
@InterfaceAudience.Private
public abstract class BaseLoadBalancer implements LoadBalancer {
Expand All @@ -76,8 +79,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {

// slop for regions
protected float slop;
// overallSlop to control simpleLoadBalancer's cluster level threshold
protected float overallSlop;
protected RackManager rackManager;
protected MetricsBalancer metricsBalancer = null;
protected ClusterMetrics clusterStatus = null;
Expand All @@ -103,38 +104,8 @@ protected final Configuration getConf() {
return provider.getConfiguration();
}

protected void setConf(Configuration conf) {
setSlop(conf);
if (slop < 0) {
slop = 0;
} else if (slop > 1) {
slop = 1;
}

if (overallSlop < 0) {
overallSlop = 0;
} else if (overallSlop > 1) {
overallSlop = 1;
}

this.rackManager = new RackManager(conf);
useRegionFinder = conf.getBoolean("hbase.master.balancer.uselocality", true);
if (useRegionFinder) {
regionFinder = new RegionHDFSBlockLocationFinder();
regionFinder.setConf(conf);
}
this.isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
// Print out base configs. Don't print overallSlop since it for simple balancer exclusively.
LOG.info("slop={}", this.slop);
}

protected void setSlop(Configuration conf) {
this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2);
this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop);
}

@Override
public synchronized void setClusterMetrics(ClusterMetrics st) {
public void updateClusterMetrics(ClusterMetrics st) {
this.clusterStatus = st;
if (useRegionFinder) {
regionFinder.setClusterMetrics(st);
Expand All @@ -145,10 +116,6 @@ public synchronized void setClusterMetrics(ClusterMetrics st) {
@Override
public void setClusterInfoProvider(ClusterInfoProvider provider) {
this.provider = provider;
setConf(provider.getConfiguration());
if (useRegionFinder) {
this.regionFinder.setClusterInfoProvider(provider);
}
}

@Override
Expand All @@ -158,57 +125,6 @@ public void postMasterStartupInitialize() {
}
}

public void setRackManager(RackManager rackManager) {
this.rackManager = rackManager;
}

protected boolean needsBalance(TableName tableName, BalancerClusterState c) {
ClusterLoadState cs = new ClusterLoadState(c.clusterState);
if (cs.getNumServers() < MIN_SERVER_BALANCE) {
if (LOG.isDebugEnabled()) {
LOG.debug("Not running balancer because only " + cs.getNumServers()
+ " active regionserver(s)");
}
return false;
}
if (areSomeRegionReplicasColocated(c)) {
return true;
}
if(idleRegionServerExist(c)) {
return true;
}

// Check if we even need to do any load balancing
// HBASE-3681 check sloppiness first
float average = cs.getLoadAverage(); // for logging
int floor = (int) Math.floor(average * (1 - slop));
int ceiling = (int) Math.ceil(average * (1 + slop));
if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) {
NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad();
if (LOG.isTraceEnabled()) {
// If nothing to balance, then don't say anything unless trace-level logging.
LOG.trace("Skipping load balancing because balanced cluster; " +
"servers=" + cs.getNumServers() +
" regions=" + cs.getNumRegions() + " average=" + average +
" mostloaded=" + serversByLoad.lastKey().getLoad() +
" leastloaded=" + serversByLoad.firstKey().getLoad());
}
return false;
}
return true;
}

/**
* Subclasses should implement this to return true if the cluster has nodes that hosts
* multiple replicas for the same region, or, if there are multiple racks and the same
* rack hosts replicas of the same region
* @param c Cluster information
* @return whether region replicas are currently co-located
*/
protected boolean areSomeRegionReplicasColocated(BalancerClusterState c) {
return false;
}

protected final boolean idleRegionServerExist(BalancerClusterState c){
boolean isServerExistsWithMoreRegions = false;
boolean isServerExistsWithZeroRegions = false;
Expand Down Expand Up @@ -456,8 +372,37 @@ public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, Server
return Collections.unmodifiableMap(assignments);
}

protected final float normalizeSlop(float slop) {
if (slop < 0) {
return 0;
}
if (slop > 1) {
return 1;
}
return slop;
}

protected float getDefaultSlop() {
return 0.2f;
}

protected void loadConf(Configuration conf) {
this.slop =normalizeSlop(conf.getFloat("hbase.regions.slop", getDefaultSlop()));
this.rackManager = new RackManager(conf);
useRegionFinder = conf.getBoolean("hbase.master.balancer.uselocality", true);
if (useRegionFinder) {
regionFinder = new RegionHDFSBlockLocationFinder();
regionFinder.setConf(conf);
regionFinder.setClusterInfoProvider(provider);
}
this.isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
// Print out base configs. Don't print overallSlop since it for simple balancer exclusively.
LOG.info("slop={}", this.slop);
}

@Override
public void initialize() throws HBaseIOException{
public void initialize() {
loadConf(getConf());
}

@Override
Expand All @@ -475,7 +420,7 @@ public boolean isStopped() {

@Override
public void stop(String why) {
LOG.info("Load Balancer stop requested: "+why);
LOG.info("Load Balancer stop requested: {}", why);
stopped = true;
}

Expand Down Expand Up @@ -642,7 +587,7 @@ protected abstract List<RegionPlan> balanceTable(TableName tableName,
* @see #balanceTable(TableName, Map)
*/
@Override
public final synchronized List<RegionPlan>
public final List<RegionPlan>
balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
preBalanceCluster(loadOfAllTable);
if (isByTable) {
Expand All @@ -663,5 +608,6 @@ protected abstract List<RegionPlan> balanceTable(TableName tableName,

@Override
public void onConfigurationChange(Configuration conf) {
loadConf(conf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.master.balancer;

import com.google.errorprone.annotations.RestrictedApi;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -58,7 +59,8 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
private float avgLoadOverall;
private List<ServerAndLoad> serverLoadList = new ArrayList<>();

// overallSlop to control simpleLoadBalancer's cluster level threshold
private float overallSlop;
/**
* Stores additional per-server information about the regions added/removed
* during the run of the balancing algorithm.
Expand Down Expand Up @@ -104,6 +106,8 @@ void setNextRegionForUnload(int nextRegionForUnload) {
/**
* Pass RegionStates and allow balancer to set the current cluster load.
*/
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|SimpleLoadBalancer).java")
void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad) {
serverLoadList.clear();
Map<ServerName, Integer> server2LoadMap = new HashMap<>();
Expand All @@ -129,11 +133,17 @@ void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoa
setClusterLoad(loadOfAllTable);
}

@Override
protected void loadConf(Configuration conf) {
super.loadConf(conf);
this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop);
}

@Override
public void onConfigurationChange(Configuration conf) {
float originSlop = slop;
float originOverallSlop = overallSlop;
super.setConf(conf);
loadConf(conf);
LOG.info("Update configuration of SimpleLoadBalancer, previous slop is {},"
+ " current slop is {}, previous overallSlop is {}, current overallSlop is {}",
originSlop, slop, originOverallSlop, overallSlop);
Expand Down Expand Up @@ -168,6 +178,38 @@ private boolean overallNeedsBalance() {
return true;
}

private boolean needsBalance(BalancerClusterState c) {
ClusterLoadState cs = new ClusterLoadState(c.clusterState);
if (cs.getNumServers() < MIN_SERVER_BALANCE) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Not running balancer because only " + cs.getNumServers() + " active regionserver(s)");
}
return false;
}
if (idleRegionServerExist(c)) {
return true;
}

// Check if we even need to do any load balancing
// HBASE-3681 check sloppiness first
float average = cs.getLoadAverage(); // for logging
int floor = (int) Math.floor(average * (1 - slop));
int ceiling = (int) Math.ceil(average * (1 + slop));
if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) {
NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad();
if (LOG.isTraceEnabled()) {
// If nothing to balance, then don't say anything unless trace-level logging.
LOG.trace("Skipping load balancing because balanced cluster; " + "servers=" +
cs.getNumServers() + " regions=" + cs.getNumRegions() + " average=" + average +
" mostloaded=" + serversByLoad.lastKey().getLoad() + " leastloaded=" +
serversByLoad.firstKey().getLoad());
}
return false;
}
return true;
}

/**
* Generate a global load balancing plan according to the specified map of
* server information to the most loaded regions of each server.
Expand Down Expand Up @@ -262,7 +304,7 @@ protected List<RegionPlan> balanceTable(TableName tableName,
// argument as defaults
BalancerClusterState c =
new BalancerClusterState(loadOfOneTable, null, this.regionFinder, this.rackManager);
if (!this.needsBalance(tableName, c) && !this.overallNeedsBalance()) {
if (!needsBalance(c) && !this.overallNeedsBalance()) {
return null;
}
ClusterLoadState cs = new ClusterLoadState(loadOfOneTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public static void beforeAllTests() throws Exception {
conf.setClass("hbase.util.ip.to.rack.determiner", MockMapping.class, DNSToSwitchMapping.class);
conf.set("hbase.regions.slop", "0");
loadBalancer = new SimpleLoadBalancer();
loadBalancer.setConf(conf);
loadBalancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
loadBalancer.initialize();
}

int[] mockUniformCluster = new int[] { 5, 5, 5, 5, 5, 0 };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta;
Expand Down Expand Up @@ -70,18 +69,16 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored
private static final Logger LOG = LoggerFactory.getLogger(FavoredNodeLoadBalancer.class);

private MasterServices services;
private RackManager rackManager;
private FavoredNodesManager fnm;

public void setMasterServices(MasterServices services) {
this.services = services;
}

@Override
public synchronized void initialize() throws HBaseIOException {
public void initialize() {
super.initialize();
this.fnm = services.getFavoredNodesManager();
this.rackManager = new RackManager(getConf());
}

@Override
Expand Down Expand Up @@ -324,7 +321,7 @@ private void addRegionToMap(Map<ServerName, List<RegionInfo>> assignmentMapForFa
}

@Override
public synchronized List<ServerName> getFavoredNodes(RegionInfo regionInfo) {
public List<ServerName> getFavoredNodes(RegionInfo regionInfo) {
return this.fnm.getFavoredNodes(regionInfo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -928,8 +928,8 @@ private void finishActiveMasterInitialization(MonitoredTask status) throws IOExc

// initialize load balancer
this.balancer.setMasterServices(this);
this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
this.balancer.initialize();
this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());

// start up all service threads.
status.setStatus("Initializing master service threads");
Expand Down Expand Up @@ -1011,7 +1011,7 @@ private void finishActiveMasterInitialization(MonitoredTask status) throws IOExc
}

// set cluster status again after user regions are assigned
this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());

// Start balancer and meta catalog janitor after meta and regions have been assigned.
status.setStatus("Starting balancer and catalog janitor");
Expand Down Expand Up @@ -1724,7 +1724,7 @@ public boolean balance(boolean force) throws IOException {
}

//Give the balancer the current cluster state.
this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());

List<RegionPlan> plans = this.balancer.balanceCluster(assignments);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public ClusterStatusChore(HMaster master, LoadBalancer balancer) {
@Override
protected void chore() {
try {
balancer.setClusterMetrics(master.getClusterMetricsWithoutCoprocessor());
balancer.updateClusterMetrics(master.getClusterMetricsWithoutCoprocessor());
} catch (InterruptedIOException e) {
LOG.warn("Ignoring interruption", e);
}
Expand Down
Loading