Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2256,6 +2256,17 @@ void cloneTableSchema(TableName tableName, TableName newTableName, boolean prese
*/
boolean isCompactionOffloadEnabled() throws IOException;

/**
* update compaction server total throughput bound
* @param upperBound the total throughput upper bound of all compaction servers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we talk of compaction servers here, are we talking about the new compaction server facility or are we talking about regionservers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can control compaction throughtput on compactionServers, can not control regionservers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most deploys will not use a compaction server suite. Is it correct then to talk of compaction servers in our Admin API? Should there be another channel for interacting with compaction servers? A CompactionServersAdmin ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method talk to master only, compaction servers get throughput control message through periodic heartbeat report

* @param lowerBound the total throughput lower bound of all compaction servers
* @param offPeak the total throughput offPeak bound of all compaction servers
* @return the now total throughput of all compaction servers
* @throws IOException if a remote or network exception occurs
*/
Map<String, Long> updateCompactionServerTotalThroughput(long upperBound, long lowerBound,
long offPeak) throws IOException;

/**
* Switch the exceed throttle quota. If enabled, user/table/namespace throttle quota
* can be exceeded if region server has availble quota.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,12 @@ public boolean switchCompactionOffload(boolean enable) throws IOException {
return get(admin.switchCompactionOffload(enable));
}

@Override
public Map<String, Long> updateCompactionServerTotalThroughput(long upperBound, long lowerBound,
long offPeak) throws IOException {
return get(admin.updateCompactionServerTotalThroughput(upperBound, lowerBound, offPeak));
}

@Override
public boolean isCompactionOffloadEnabled() throws IOException {
return get(admin.isCompactionOffloadEnabled());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1459,6 +1459,16 @@ CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState
*/
CompletableFuture<Boolean> switchCompactionOffload(boolean enable);

/**
* update compaction server total throughput bound
* @param upperBound the total throughput upper bound of all compaction servers
* @param lowerBound the total throughput lower bound of all compaction servers
* @param offPeak the total throughput offPeak bound of all compaction servers
* @return the now total throughput of all compaction servers
*/
CompletableFuture<Map<String, Long>> updateCompactionServerTotalThroughput(long upperBound,
long lowerBound, long offPeak);

/**
* Get if the compaction offload is enabled.
* @return True if compaction offload is enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,12 @@ public CompletableFuture<Boolean> switchCompactionOffload(boolean enable) {
return wrap(rawAdmin.switchCompactionOffload(enable));
}

@Override
public CompletableFuture<Map<String, Long>> updateCompactionServerTotalThroughput(long upperBound,
long lowerBound, long offPeak) {
return wrap(rawAdmin.updateCompactionServerTotalThroughput(upperBound, lowerBound, offPeak));
}

@Override
public CompletableFuture<Boolean> isCompactionOffloadEnabled() {
return wrap(rawAdmin.isCompactionOffloadEnabled());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UpdateCompactionServerTotalThroughputRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UpdateCompactionServerTotalThroughputResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
Expand Down Expand Up @@ -3822,6 +3824,27 @@ public CompletableFuture<Boolean> switchCompactionOffload(boolean enable) {
return future;
}

@Override
public CompletableFuture<Map<String, Long>> updateCompactionServerTotalThroughput(long upperBound,
long lowerBound, long offPeak) {
CompletableFuture<Map<String, Long>> future = this.<Map<String, Long>> newMasterCaller().action(
(controller, stub) -> this
.<UpdateCompactionServerTotalThroughputRequest, UpdateCompactionServerTotalThroughputResponse, Map<String, Long>> call(
controller, stub,
UpdateCompactionServerTotalThroughputRequest.newBuilder()
.setMaxThroughputUpperBound(upperBound).setMaxThroughputLowerBound(lowerBound)
.setMaxThroughputOffPeak(offPeak).build(),
(s, c, req, done) -> s.updateCompactionServerTotalThroughput(c, req, done), resp -> {
Map<String, Long> result = new HashMap<>();
result.put("UpperBound", resp.getMaxThroughputUpperBound());
result.put("LowerBound", resp.getMaxThroughputLowerBound());
result.put("OffPeak", resp.getMaxThroughputOffPeak());
return result;
}))
.call();
return future;
}

@Override
public CompletableFuture<Boolean> isCompactionOffloadEnabled() {
CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ message CompactionServerReportRequest {
}

message CompactionServerReportResponse {
required int64 max_throughput_upper_bound = 1 ;
required int64 max_throughput_lower_bound = 2 ;
required int64 max_throughput_off_peak = 3 ;
}

service CompactionServerStatusService {
Expand Down
15 changes: 15 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,18 @@ message SwitchCompactionOffloadResponse {
required bool previous_compaction_offload_enabled = 1;
}

message UpdateCompactionServerTotalThroughputRequest {
required int64 max_throughput_upper_bound = 1;
required int64 max_throughput_lower_bound = 2;
required int64 max_throughput_off_peak = 3;
}

message UpdateCompactionServerTotalThroughputResponse {
required int64 max_throughput_upper_bound = 1;
required int64 max_throughput_lower_bound = 2;
required int64 max_throughput_off_peak = 3;
}

service MasterService {
/** Used by the client to get the number of regions that have received the updated schema */
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
Expand Down Expand Up @@ -1112,6 +1124,9 @@ service MasterService {
rpc SwitchCompactionOffload (SwitchCompactionOffloadRequest)
returns (SwitchCompactionOffloadResponse);

rpc UpdateCompactionServerTotalThroughput(UpdateCompactionServerTotalThroughputRequest)
returns (UpdateCompactionServerTotalThroughputResponse);

/** Get if is compaction offload enabled */
rpc IsCompactionOffloadEnabled (IsCompactionOffloadEnabledRequest)
returns (IsCompactionOffloadEnabledResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class LocalHBaseCluster {
private final Configuration conf;
private final Class<? extends HMaster> masterClass;
private final Class<? extends HRegionServer> regionServerClass;

private final Class<? extends HCompactionServer> compactionServerClass;
/**
* Constructor.
* @param conf
Expand Down Expand Up @@ -125,12 +125,26 @@ private static Class<? extends HMaster> getMasterImplementation(final Configurat
HMaster.class);
}

@SuppressWarnings("unchecked")
private static Class<? extends HCompactionServer>
getCompactionServerImplementation(final Configuration conf) {
return (Class<? extends HCompactionServer>) conf.getClass(HConstants.COMPACTION_SERVER_IMPL,
HCompactionServer.class);
}

public LocalHBaseCluster(final Configuration conf, final int noMasters, final int noRegionServers,
final Class<? extends HMaster> masterClass,
final Class<? extends HRegionServer> regionServerClass) throws IOException {
this(conf, noMasters, 0, noRegionServers, masterClass, regionServerClass);
}

public LocalHBaseCluster(final Configuration conf, final int noMasters,
final int noAlwaysStandByMasters, final int noRegionServers,
final Class<? extends HMaster> masterClass,
final Class<? extends HRegionServer> regionServerClass) throws IOException {
this(conf, noMasters, noAlwaysStandByMasters, noRegionServers, 0, masterClass,
regionServerClass, getCompactionServerImplementation(conf));
}
/**
* Constructor.
* @param conf Configuration to use. Post construction has the master's
Expand All @@ -143,9 +157,10 @@ public LocalHBaseCluster(final Configuration conf, final int noMasters, final in
*/
@SuppressWarnings("unchecked")
public LocalHBaseCluster(final Configuration conf, final int noMasters,
final int noAlwaysStandByMasters, final int noRegionServers,
final int noAlwaysStandByMasters, final int noRegionServers, final int noCompactionServers,
final Class<? extends HMaster> masterClass,
final Class<? extends HRegionServer> regionServerClass) throws IOException {
final Class<? extends HRegionServer> regionServerClass,
final Class<? extends HCompactionServer> compactionServerClass) throws IOException {
this.conf = conf;

// When active, if a port selection is default then we switch to random
Expand Down Expand Up @@ -209,6 +224,14 @@ public LocalHBaseCluster(final Configuration conf, final int noMasters,
for (int j = 0; j < noRegionServers; j++) {
addRegionServer(new Configuration(conf), j);
}
// Start the CompactionServers.
this.compactionServerClass=
(Class<? extends HCompactionServer>)conf.getClass(HConstants.COMPACTION_SERVER_IMPL,
compactionServerClass);

for (int j = 0; j < noCompactionServers; j++) {
addCompactionServer(new Configuration(conf), j);
}
}

public JVMClusterUtil.RegionServerThread addRegionServer()
Expand Down Expand Up @@ -485,13 +508,14 @@ public void join() {
}

@SuppressWarnings("unchecked")
public JVMClusterUtil.CompactionServerThread addCompactionServer(
Configuration config, final int index) throws IOException {
private JVMClusterUtil.CompactionServerThread addCompactionServer(Configuration config,
final int index) throws IOException {
// Create each compaction server with its own Configuration instance so each has
// its Connection instance rather than share (see HBASE_INSTANCES down in
// the guts of ConnectionManager).
JVMClusterUtil.CompactionServerThread cst =
JVMClusterUtil.createCompactionServerThread(config, index);
JVMClusterUtil.CompactionServerThread cst = JVMClusterUtil.createCompactionServerThread(config,
(Class<? extends HCompactionServer>) conf.getClass(HConstants.COMPACTION_SERVER_IMPL,
this.compactionServerClass), index);
this.compactionServerThreads.add(cst);
return cst;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
Expand Down Expand Up @@ -94,6 +95,10 @@ public class CompactionThreadManager implements ThroughputControllerService {
new ConcurrentHashMap<>();
private static CompactionFilesCache compactionFilesCache = new CompactionFilesCache();

public ThroughputController getCompactionThroughputController() {
return compactThreadControl.getCompactionThroughputController();
}

CompactionThreadManager(final Configuration conf, HCompactionServer server) {
this.conf = conf;
this.server = server;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController;
import org.apache.hadoop.hbase.security.SecurityConstants;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.UserProvider;
Expand Down Expand Up @@ -144,7 +146,7 @@ protected void login(UserProvider user, String host) throws IOException {
SecurityConstants.COMPACTION_SERVER_KRB_PRINCIPAL, host);
}

private ClusterStatusProtos.CompactionServerLoad buildServerLoad(long reportStartTime,
protected ClusterStatusProtos.CompactionServerLoad buildServerLoad(long reportStartTime,
long reportEndTime) {
ClusterStatusProtos.CompactionServerLoad.Builder serverLoad =
ClusterStatusProtos.CompactionServerLoad.newBuilder();
Expand Down Expand Up @@ -179,7 +181,21 @@ private boolean tryCompactionServerReport(long reportStartTime, long reportEndTi
CompactionServerStatusProtos.CompactionServerReportRequest.newBuilder();
request.setServer(ProtobufUtil.toServerName(getServerName()));
request.setLoad(sl);
this.cssStub.compactionServerReport(null, request.build());
CompactionServerStatusProtos.CompactionServerReportResponse compactionServerReportResponse =
this.cssStub.compactionServerReport(null, request.build());
ThroughputController throughputController =
compactionThreadManager.getCompactionThroughputController();
if (throughputController instanceof PressureAwareCompactionThroughputController
&& compactionServerReportResponse.getMaxThroughputUpperBound() > 0) {
((PressureAwareCompactionThroughputController) throughputController)
.setMaxThroughputUpperBound(
compactionServerReportResponse.getMaxThroughputUpperBound());
((PressureAwareCompactionThroughputController) throughputController)
.setMaxThroughputLowerBound(
compactionServerReportResponse.getMaxThroughputLowerBound());
((PressureAwareCompactionThroughputController) throughputController)
.setMaxThroughputOffPeak(compactionServerReportResponse.getMaxThroughputOffPeak());
}
} catch (ServiceException se) {
IOException ioe = ProtobufUtil.getRemoteException(se);
if (ioe instanceof YouAreDeadException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Triple;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -343,6 +344,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UpdateCompactionServerTotalThroughputRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UpdateCompactionServerTotalThroughputResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse.NamespaceQuotaSnapshot;
Expand Down Expand Up @@ -682,14 +685,17 @@ public CompactionServerReportResponse compactionServerReport(RpcController contr
versionNumber = VersionInfoUtil.getVersionNumber(versionInfo);
}
ServerName serverName = ProtobufUtil.toServerName(request.getServer());
CompactionServerMetrics newLoad =
CompactionServerMetricsBuilder.toCompactionServerMetrics(serverName, versionNumber,
version, request.getLoad());
master.getCompactionOffloadManager().compactionServerReport(serverName, newLoad);
CompactionServerMetrics newLoad = CompactionServerMetricsBuilder
.toCompactionServerMetrics(serverName, versionNumber, version, request.getLoad());
Triple<Double, Double, Double> throughput =
master.getCompactionOffloadManager().compactionServerReport(serverName, newLoad);
return CompactionServerReportResponse.newBuilder()
.setMaxThroughputUpperBound(throughput.getFirst().longValue())
.setMaxThroughputLowerBound(throughput.getSecond().longValue())
.setMaxThroughputOffPeak(throughput.getThird().longValue()).build();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
return CompactionServerReportResponse.newBuilder().build();
}

@Override
Expand Down Expand Up @@ -3496,4 +3502,29 @@ public CompleteCompactionResponse completeCompaction(RpcController controller,
throw new UnsupportedOperationException("master not receive completeCompaction");
}

@Override
public UpdateCompactionServerTotalThroughputResponse updateCompactionServerTotalThroughput(
RpcController controller, UpdateCompactionServerTotalThroughputRequest request)
throws ServiceException {
if (request.getMaxThroughputUpperBound() > 0) {
master.getCompactionOffloadManager()
.setMaxThroughputUpperBound(request.getMaxThroughputUpperBound());
}
if (request.getMaxThroughputLowerBound() > 0) {
master.getCompactionOffloadManager()
.setMaxThroughputLowerBound(request.getMaxThroughputLowerBound());
}
if (request.getMaxThroughputOffPeak() > 0) {
master.getCompactionOffloadManager()
.setMaxThroughputOffPeak(request.getMaxThroughputOffPeak());
}
return UpdateCompactionServerTotalThroughputResponse.newBuilder()
.setMaxThroughputUpperBound(
master.getCompactionOffloadManager().getMaxThroughputUpperBound())
.setMaxThroughputLowerBound(
master.getCompactionOffloadManager().getMaxThroughputLowerBound())
.setMaxThroughputOffPeak(master.getCompactionOffloadManager().getMaxThroughputOffPeak())
.build();
}

}
Loading