diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index 1f23d87461da..36064f57a7c6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -2256,6 +2256,17 @@ void cloneTableSchema(TableName tableName, TableName newTableName, boolean prese */ boolean isCompactionOffloadEnabled() throws IOException; + /** + * update compaction server total throughput bound + * @param upperBound the total throughput upper bound of all compaction servers + * @param lowerBound the total throughput lower bound of all compaction servers + * @param offPeak the total throughput offPeak bound of all compaction servers + * @return the now total throughput of all compaction servers + * @throws IOException if a remote or network exception occurs + */ + Map updateCompactionServerTotalThroughput(long upperBound, long lowerBound, + long offPeak) throws IOException; + /** * Switch the exceed throttle quota. If enabled, user/table/namespace throttle quota * can be exceeded if region server has availble quota. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java index 35f2a74b6950..44a2bb1c5072 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java @@ -918,6 +918,12 @@ public boolean switchCompactionOffload(boolean enable) throws IOException { return get(admin.switchCompactionOffload(enable)); } + @Override + public Map updateCompactionServerTotalThroughput(long upperBound, long lowerBound, + long offPeak) throws IOException { + return get(admin.updateCompactionServerTotalThroughput(upperBound, lowerBound, offPeak)); + } + @Override public boolean isCompactionOffloadEnabled() throws IOException { return get(admin.isCompactionOffloadEnabled()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index a9f66ade8871..fc6379c2004b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -1459,6 +1459,16 @@ CompletableFuture> compactionSwitch(boolean switchState */ CompletableFuture switchCompactionOffload(boolean enable); + /** + * update compaction server total throughput bound + * @param upperBound the total throughput upper bound of all compaction servers + * @param lowerBound the total throughput lower bound of all compaction servers + * @param offPeak the total throughput offPeak bound of all compaction servers + * @return the now total throughput of all compaction servers + */ + CompletableFuture> updateCompactionServerTotalThroughput(long upperBound, + long lowerBound, long offPeak); + /** * Get if the compaction offload is enabled. * @return True if compaction offload is enabled diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 9f874b4b6c7b..a1955e4faf46 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -797,6 +797,12 @@ public CompletableFuture switchCompactionOffload(boolean enable) { return wrap(rawAdmin.switchCompactionOffload(enable)); } + @Override + public CompletableFuture> updateCompactionServerTotalThroughput(long upperBound, + long lowerBound, long offPeak) { + return wrap(rawAdmin.updateCompactionServerTotalThroughput(upperBound, lowerBound, offPeak)); + } + @Override public CompletableFuture isCompactionOffloadEnabled() { return wrap(rawAdmin.isCompactionOffloadEnabled()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index d37e4cdc7228..e33b569700ea 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -283,6 +283,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UpdateCompactionServerTotalThroughputRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UpdateCompactionServerTotalThroughputResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest; @@ -3822,6 +3824,27 @@ public CompletableFuture switchCompactionOffload(boolean enable) { return future; } + @Override + public CompletableFuture> updateCompactionServerTotalThroughput(long upperBound, + long lowerBound, long offPeak) { + CompletableFuture> future = this.> newMasterCaller().action( + (controller, stub) -> this + .> call( + controller, stub, + UpdateCompactionServerTotalThroughputRequest.newBuilder() + .setMaxThroughputUpperBound(upperBound).setMaxThroughputLowerBound(lowerBound) + .setMaxThroughputOffPeak(offPeak).build(), + (s, c, req, done) -> s.updateCompactionServerTotalThroughput(c, req, done), resp -> { + Map result = new HashMap<>(); + result.put("UpperBound", resp.getMaxThroughputUpperBound()); + result.put("LowerBound", resp.getMaxThroughputLowerBound()); + result.put("OffPeak", resp.getMaxThroughputOffPeak()); + return result; + })) + .call(); + return future; + } + @Override public CompletableFuture isCompactionOffloadEnabled() { CompletableFuture future = this. newMasterCaller() diff --git a/hbase-protocol-shaded/src/main/protobuf/server/CompactionServerStatus.proto b/hbase-protocol-shaded/src/main/protobuf/server/CompactionServerStatus.proto index 990f36c4052a..24d6d8096fbe 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/CompactionServerStatus.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/CompactionServerStatus.proto @@ -37,6 +37,9 @@ message CompactionServerReportRequest { } message CompactionServerReportResponse { + required int64 max_throughput_upper_bound = 1 ; + required int64 max_throughput_lower_bound = 2 ; + required int64 max_throughput_off_peak = 3 ; } service CompactionServerStatusService { diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto index d8a91aad582c..66b46f7184c5 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto @@ -732,6 +732,18 @@ message SwitchCompactionOffloadResponse { required bool previous_compaction_offload_enabled = 1; } +message UpdateCompactionServerTotalThroughputRequest { + required int64 max_throughput_upper_bound = 1; + required int64 max_throughput_lower_bound = 2; + required int64 max_throughput_off_peak = 3; +} + +message UpdateCompactionServerTotalThroughputResponse { + required int64 max_throughput_upper_bound = 1; + required int64 max_throughput_lower_bound = 2; + required int64 max_throughput_off_peak = 3; +} + service MasterService { /** Used by the client to get the number of regions that have received the updated schema */ rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest) @@ -1112,6 +1124,9 @@ service MasterService { rpc SwitchCompactionOffload (SwitchCompactionOffloadRequest) returns (SwitchCompactionOffloadResponse); + rpc UpdateCompactionServerTotalThroughput(UpdateCompactionServerTotalThroughputRequest) + returns (UpdateCompactionServerTotalThroughputResponse); + /** Get if is compaction offload enabled */ rpc IsCompactionOffloadEnabled (IsCompactionOffloadEnabledRequest) returns (IsCompactionOffloadEnabledResponse); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java index b856c7a71977..db052b336ea3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java @@ -74,7 +74,7 @@ public class LocalHBaseCluster { private final Configuration conf; private final Class masterClass; private final Class regionServerClass; - + private final Class compactionServerClass; /** * Constructor. * @param conf @@ -125,12 +125,26 @@ private static Class getMasterImplementation(final Configurat HMaster.class); } + @SuppressWarnings("unchecked") + private static Class + getCompactionServerImplementation(final Configuration conf) { + return (Class) conf.getClass(HConstants.COMPACTION_SERVER_IMPL, + HCompactionServer.class); + } + public LocalHBaseCluster(final Configuration conf, final int noMasters, final int noRegionServers, final Class masterClass, final Class regionServerClass) throws IOException { this(conf, noMasters, 0, noRegionServers, masterClass, regionServerClass); } + public LocalHBaseCluster(final Configuration conf, final int noMasters, + final int noAlwaysStandByMasters, final int noRegionServers, + final Class masterClass, + final Class regionServerClass) throws IOException { + this(conf, noMasters, noAlwaysStandByMasters, noRegionServers, 0, masterClass, + regionServerClass, getCompactionServerImplementation(conf)); + } /** * Constructor. * @param conf Configuration to use. Post construction has the master's @@ -143,9 +157,10 @@ public LocalHBaseCluster(final Configuration conf, final int noMasters, final in */ @SuppressWarnings("unchecked") public LocalHBaseCluster(final Configuration conf, final int noMasters, - final int noAlwaysStandByMasters, final int noRegionServers, + final int noAlwaysStandByMasters, final int noRegionServers, final int noCompactionServers, final Class masterClass, - final Class regionServerClass) throws IOException { + final Class regionServerClass, + final Class compactionServerClass) throws IOException { this.conf = conf; // When active, if a port selection is default then we switch to random @@ -209,6 +224,14 @@ public LocalHBaseCluster(final Configuration conf, final int noMasters, for (int j = 0; j < noRegionServers; j++) { addRegionServer(new Configuration(conf), j); } + // Start the CompactionServers. + this.compactionServerClass= + (Class)conf.getClass(HConstants.COMPACTION_SERVER_IMPL, + compactionServerClass); + + for (int j = 0; j < noCompactionServers; j++) { + addCompactionServer(new Configuration(conf), j); + } } public JVMClusterUtil.RegionServerThread addRegionServer() @@ -485,13 +508,14 @@ public void join() { } @SuppressWarnings("unchecked") - public JVMClusterUtil.CompactionServerThread addCompactionServer( - Configuration config, final int index) throws IOException { + private JVMClusterUtil.CompactionServerThread addCompactionServer(Configuration config, + final int index) throws IOException { // Create each compaction server with its own Configuration instance so each has // its Connection instance rather than share (see HBASE_INSTANCES down in // the guts of ConnectionManager). - JVMClusterUtil.CompactionServerThread cst = - JVMClusterUtil.createCompactionServerThread(config, index); + JVMClusterUtil.CompactionServerThread cst = JVMClusterUtil.createCompactionServerThread(config, + (Class) conf.getClass(HConstants.COMPACTION_SERVER_IMPL, + this.compactionServerClass), index); this.compactionServerThreads.add(cst); return cst; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java index 2eb697d4526e..03b54099d7f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControllerService; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; @@ -94,6 +95,10 @@ public class CompactionThreadManager implements ThroughputControllerService { new ConcurrentHashMap<>(); private static CompactionFilesCache compactionFilesCache = new CompactionFilesCache(); + public ThroughputController getCompactionThroughputController() { + return compactThreadControl.getCompactionThroughputController(); + } + CompactionThreadManager(final Configuration conf, HCompactionServer server) { this.conf = conf; this.server = server; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java index f78e58abef0d..aa999e8f512d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/HCompactionServer.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController; import org.apache.hadoop.hbase.security.SecurityConstants; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.UserProvider; @@ -144,7 +146,7 @@ protected void login(UserProvider user, String host) throws IOException { SecurityConstants.COMPACTION_SERVER_KRB_PRINCIPAL, host); } - private ClusterStatusProtos.CompactionServerLoad buildServerLoad(long reportStartTime, + protected ClusterStatusProtos.CompactionServerLoad buildServerLoad(long reportStartTime, long reportEndTime) { ClusterStatusProtos.CompactionServerLoad.Builder serverLoad = ClusterStatusProtos.CompactionServerLoad.newBuilder(); @@ -179,7 +181,21 @@ private boolean tryCompactionServerReport(long reportStartTime, long reportEndTi CompactionServerStatusProtos.CompactionServerReportRequest.newBuilder(); request.setServer(ProtobufUtil.toServerName(getServerName())); request.setLoad(sl); - this.cssStub.compactionServerReport(null, request.build()); + CompactionServerStatusProtos.CompactionServerReportResponse compactionServerReportResponse = + this.cssStub.compactionServerReport(null, request.build()); + ThroughputController throughputController = + compactionThreadManager.getCompactionThroughputController(); + if (throughputController instanceof PressureAwareCompactionThroughputController + && compactionServerReportResponse.getMaxThroughputUpperBound() > 0) { + ((PressureAwareCompactionThroughputController) throughputController) + .setMaxThroughputUpperBound( + compactionServerReportResponse.getMaxThroughputUpperBound()); + ((PressureAwareCompactionThroughputController) throughputController) + .setMaxThroughputLowerBound( + compactionServerReportResponse.getMaxThroughputLowerBound()); + ((PressureAwareCompactionThroughputController) throughputController) + .setMaxThroughputOffPeak(compactionServerReportResponse.getMaxThroughputOffPeak()); + } } catch (ServiceException se) { IOException ioe = ProtobufUtil.getRemoteException(se); if (ioe instanceof YouAreDeadException) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index b8c8891a0bc3..0a24b12555f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -124,6 +124,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Triple; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.yetus.audience.InterfaceAudience; @@ -343,6 +344,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UpdateCompactionServerTotalThroughputRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UpdateCompactionServerTotalThroughputResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse.NamespaceQuotaSnapshot; @@ -682,14 +685,17 @@ public CompactionServerReportResponse compactionServerReport(RpcController contr versionNumber = VersionInfoUtil.getVersionNumber(versionInfo); } ServerName serverName = ProtobufUtil.toServerName(request.getServer()); - CompactionServerMetrics newLoad = - CompactionServerMetricsBuilder.toCompactionServerMetrics(serverName, versionNumber, - version, request.getLoad()); - master.getCompactionOffloadManager().compactionServerReport(serverName, newLoad); + CompactionServerMetrics newLoad = CompactionServerMetricsBuilder + .toCompactionServerMetrics(serverName, versionNumber, version, request.getLoad()); + Triple throughput = + master.getCompactionOffloadManager().compactionServerReport(serverName, newLoad); + return CompactionServerReportResponse.newBuilder() + .setMaxThroughputUpperBound(throughput.getFirst().longValue()) + .setMaxThroughputLowerBound(throughput.getSecond().longValue()) + .setMaxThroughputOffPeak(throughput.getThird().longValue()).build(); } catch (IOException ioe) { throw new ServiceException(ioe); } - return CompactionServerReportResponse.newBuilder().build(); } @Override @@ -3496,4 +3502,29 @@ public CompleteCompactionResponse completeCompaction(RpcController controller, throw new UnsupportedOperationException("master not receive completeCompaction"); } + @Override + public UpdateCompactionServerTotalThroughputResponse updateCompactionServerTotalThroughput( + RpcController controller, UpdateCompactionServerTotalThroughputRequest request) + throws ServiceException { + if (request.getMaxThroughputUpperBound() > 0) { + master.getCompactionOffloadManager() + .setMaxThroughputUpperBound(request.getMaxThroughputUpperBound()); + } + if (request.getMaxThroughputLowerBound() > 0) { + master.getCompactionOffloadManager() + .setMaxThroughputLowerBound(request.getMaxThroughputLowerBound()); + } + if (request.getMaxThroughputOffPeak() > 0) { + master.getCompactionOffloadManager() + .setMaxThroughputOffPeak(request.getMaxThroughputOffPeak()); + } + return UpdateCompactionServerTotalThroughputResponse.newBuilder() + .setMaxThroughputUpperBound( + master.getCompactionOffloadManager().getMaxThroughputUpperBound()) + .setMaxThroughputLowerBound( + master.getCompactionOffloadManager().getMaxThroughputLowerBound()) + .setMaxThroughputOffPeak(master.getCompactionOffloadManager().getMaxThroughputOffPeak()) + .build(); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionOffloadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionOffloadManager.java index a88fc7304024..ff0d96a95703 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionOffloadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionOffloadManager.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.master.procedure.SwitchCompactionOffloadProcedure; import org.apache.hadoop.hbase.regionserver.CompactionOffloadSwitchStorage; import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.hadoop.hbase.util.Triple; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,9 +56,46 @@ public class CompactionOffloadManager { /** Map of registered servers to their current load */ private final Cache onlineServers; private CompactionOffloadSwitchStorage compactionOffloadSwitchStorage; + private volatile long maxThroughputUpperBound; + private volatile long maxThroughputLowerBound; + private volatile long maxThroughputOffPeak; + // throughput config + public static final String COMPACTION_SERVER_MAX_THROUGHPUT_HIGHER_BOUND = + "hbase.compaction.server.compaction.throughput.higher.bound"; + public static final long DEFAULT_COMPACTION_SERVER_MAX_THROUGHPUT_HIGHER_BOUND = 1000L * 1024 * 1024; + public static final String COMPACTION_SERVER_MAX_THROUGHPUT_LOWER_BOUND = + "hbase.compaction.server.compaction.throughput.lower.bound"; + public static final long DEFAULT_COMPACTION_SERVER_MAX_THROUGHPUT_LOWER_BOUND = 500L * 1024 * 1024; + public static final String COMPACTION_SERVER_MAX_THROUGHPUT_OFFPEAK = + "hbase.compaction.server.compaction.throughput.offpeak"; + public static final long DEFAULT_COMPACTION_SERVER_MAX_THROUGHPUT_OFFPEAK = Long.MAX_VALUE; private static final Logger LOG = LoggerFactory.getLogger(CompactionOffloadManager.class.getName()); + public long getMaxThroughputUpperBound() { + return maxThroughputUpperBound; + } + + public void setMaxThroughputUpperBound(long maxThroughputUpperBound) { + this.maxThroughputUpperBound = maxThroughputUpperBound; + } + + public long getMaxThroughputLowerBound() { + return maxThroughputLowerBound; + } + + public void setMaxThroughputLowerBound(long maxThroughputLowerBound) { + this.maxThroughputLowerBound = maxThroughputLowerBound; + } + + public long getMaxThroughputOffPeak() { + return maxThroughputOffPeak; + } + + public void setMaxThroughputOffPeak(long maxThroughputOffPeak) { + this.maxThroughputOffPeak = maxThroughputOffPeak; + } + public CompactionOffloadManager(final MasterServices master) { this.masterServices = master; int compactionServerMsgInterval = @@ -68,10 +106,24 @@ public CompactionOffloadManager(final MasterServices master) { compactionServerMsgInterval * compactionServerExpiredFactor, TimeUnit.MILLISECONDS).build(); this.compactionOffloadSwitchStorage = new CompactionOffloadSwitchStorage( masterServices.getZooKeeper(), masterServices.getConfiguration()); + this.maxThroughputUpperBound = + master.getConfiguration().getLong(COMPACTION_SERVER_MAX_THROUGHPUT_HIGHER_BOUND, + DEFAULT_COMPACTION_SERVER_MAX_THROUGHPUT_HIGHER_BOUND); + this.maxThroughputLowerBound = + master.getConfiguration().getLong(COMPACTION_SERVER_MAX_THROUGHPUT_LOWER_BOUND, + DEFAULT_COMPACTION_SERVER_MAX_THROUGHPUT_LOWER_BOUND); + this.maxThroughputOffPeak = master.getConfiguration().getLong( + COMPACTION_SERVER_MAX_THROUGHPUT_OFFPEAK, DEFAULT_COMPACTION_SERVER_MAX_THROUGHPUT_OFFPEAK); } - public void compactionServerReport(ServerName sn, CompactionServerMetrics sl) { + public Triple compactionServerReport(ServerName sn, + CompactionServerMetrics sl) { this.onlineServers.put(sn, sl); + int totalTask = Math.max(onlineServers.asMap().values().stream() + .mapToInt(metric -> metric.getCompactionTasks().size()).sum(), 1); + double factor = 1.0 * onlineServers.asMap().get(sn).getCompactionTasks().size() / totalTask; + return new Triple<>(maxThroughputUpperBound * factor, maxThroughputLowerBound * factor, + maxThroughputOffPeak * factor); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java index d37d7c35b6cc..6692985807f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareCompactionThroughputController.java @@ -70,7 +70,7 @@ public class PressureAwareCompactionThroughputController extends PressureAwareTh private static final String HBASE_HSTORE_COMPACTION_THROUGHPUT_CONTROL_CHECK_INTERVAL = "hbase.hstore.compaction.throughput.control.check.interval"; - private long maxThroughputOffpeak; + private long maxThroughputOffPeak; @Override public void setup(final ThroughputControllerService server) { @@ -90,7 +90,7 @@ private void tune(double compactionPressure) { // set to unlimited if some stores already reach the blocking store file count maxThroughputToSet = Double.MAX_VALUE; } else if (offPeakHours.isOffPeakHour()) { - maxThroughputToSet = maxThroughputOffpeak; + maxThroughputToSet = maxThroughputOffPeak; } else { // compactionPressure is between 0.0 and 1.0, we use a simple linear formula to // calculate the throughput limitation. @@ -122,7 +122,7 @@ public void setConf(Configuration conf) { this.maxThroughputLowerBound = conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND); - this.maxThroughputOffpeak = + this.maxThroughputOffPeak = conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK, DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK); this.offPeakHours = OffPeakHours.getInstance(conf); @@ -136,7 +136,7 @@ public void setConf(Configuration conf) { LOG.info("Compaction throughput configurations, higher bound: " + throughputDesc(maxThroughputUpperBound) + ", lower bound " + throughputDesc(maxThroughputLowerBound) + ", off peak: " - + throughputDesc(maxThroughputOffpeak) + ", tuning period: " + tuningPeriod + " ms"); + + throughputDesc(maxThroughputOffPeak) + ", tuning period: " + tuningPeriod + " ms"); } @Override @@ -145,4 +145,12 @@ public String toString() { + throughputDesc(getMaxThroughput()) + ", activeCompactions=" + activeOperations.size() + "]"; } + + public void setMaxThroughputOffPeak(long maxThroughputOffPeak) { + this.maxThroughputOffPeak = maxThroughputOffPeak; + } + + public long getMaxThroughputOffPeak() { + return maxThroughputOffPeak; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java index a6904335b398..96537991c7c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/PressureAwareThroughputController.java @@ -64,9 +64,9 @@ private static final class ActiveOperation { } } - protected long maxThroughputUpperBound; + protected volatile long maxThroughputUpperBound; - protected long maxThroughputLowerBound; + protected volatile long maxThroughputLowerBound; protected OffPeakHours offPeakHours; @@ -170,4 +170,20 @@ public void setMaxThroughput(double maxThroughput) { this.maxThroughput = maxThroughput; maxThroughputPerOperation = getMaxThroughput() / activeOperations.size(); } + + public void setMaxThroughputUpperBound(long maxThroughputUpperBound) { + this.maxThroughputUpperBound = maxThroughputUpperBound; + } + + public void setMaxThroughputLowerBound(long maxThroughputLowerBound) { + this.maxThroughputLowerBound = maxThroughputLowerBound; + } + + public long getMaxThroughputUpperBound() { + return maxThroughputUpperBound; + } + + public long getMaxThroughputLowerBound() { + return maxThroughputLowerBound; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java index 679190026944..534e17c1e797 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java @@ -132,15 +132,23 @@ public void waitForServerOnline() { * Creates a {@link CompactionServerThread}. * Call 'start' on the returned thread to make it run. * @param c Configuration to use. + * @param hcsc Class to create. * @param index Used distinguishing the object returned. * @throws IOException * @return Compaction server added. */ public static JVMClusterUtil.CompactionServerThread createCompactionServerThread( - final Configuration c, final int index) throws IOException { + final Configuration c, final Class hcsc, final int index) + throws IOException { HCompactionServer server; try { - server = new HCompactionServer(c); + Constructor ctor = hcsc.getConstructor(Configuration.class); + ctor.setAccessible(true); + server = ctor.newInstance(c); + } catch (InvocationTargetException ite) { + Throwable target = ite.getTargetException(); + throw new RuntimeException("Failed construction of CompactionServer: " + hcsc.toString() + + ((target.getCause() != null) ? target.getCause().getMessage() : ""), target); } catch (Exception e) { throw new IOException(e); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index dcdcb199e727..21c18f3eff1e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -1115,7 +1115,7 @@ public MiniHBaseCluster startMiniHBaseCluster(StartMiniClusterOption option) TraceUtil.initTracer(c); this.hbaseCluster = new MiniHBaseCluster(c, option.getNumMasters(), option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), - option.getNumCompactionServers(), option.getMasterClass(), option.getRsClass()); + option.getNumCompactionServers(), option.getMasterClass(), option.getRsClass(), option.getCsClass()); // Populate the master address configuration from mini cluster configuration. conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c)); // Don't leave here till we've done a successful scan of the hbase:meta diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java index 05fad48a5814..73a037e53896 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -107,6 +107,15 @@ public MiniHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandBy regionserverClass); } + public MiniHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters, + int numRegionServers, List rsPorts, int numCompactionServers, + Class masterClass, + Class regionserverClass) + throws IOException, InterruptedException { + this(conf, numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, numCompactionServers, + masterClass, regionserverClass, null); + } + /** * @param numCompactionServers initial number of compaction servers to start. * @throws IOException @@ -115,7 +124,8 @@ public MiniHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandBy public MiniHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters, int numRegionServers, List rsPorts, int numCompactionServers, Class masterClass, - Class regionserverClass) + Class regionserverClass, + Class compactionserverClass) throws IOException, InterruptedException { super(conf); @@ -123,7 +133,7 @@ public MiniHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandBy CompatibilityFactory.getInstance(MetricsAssertHelper.class).init(); init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, numCompactionServers, - masterClass, regionserverClass); + masterClass, regionserverClass, compactionserverClass); this.initialClusterStatus = getClusterMetrics(); } @@ -242,7 +252,8 @@ public void run() { private void init(final int nMasterNodes, final int numAlwaysStandByMasters, final int nRegionNodes, List rsPorts, int numCompactionServers, Class masterClass, - Class regionserverClass) + Class regionserverClass, + Class compactionserverClass) throws IOException, InterruptedException { try { if (masterClass == null) { @@ -251,10 +262,13 @@ private void init(final int nMasterNodes, final int numAlwaysStandByMasters, if (regionserverClass == null) { regionserverClass = MiniHBaseCluster.MiniHBaseClusterRegionServer.class; } + if (compactionserverClass == null) { + compactionserverClass = HCompactionServer.class; + } // start up a LocalHBaseCluster - hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, numAlwaysStandByMasters, 0, - masterClass, regionserverClass); + hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, numAlwaysStandByMasters, 0, 0, + masterClass, regionserverClass, compactionserverClass); // manually add the regionservers as other users for (int i = 0; i < nRegionNodes; i++) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java index d7b8951c3d01..4368cec2a105 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/StartMiniClusterOption.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hbase.compactionserver.HCompactionServer; import org.apache.hadoop.hbase.master.HMaster; import org.apache.yetus.audience.InterfaceAudience; @@ -76,6 +77,11 @@ public final class StartMiniClusterOption { */ private Class rsClass; + /** + * The class to use as CompactionServer, or null for default. + */ + private Class csClass; + /** * Number of compaction servers to start up. */ @@ -115,8 +121,8 @@ public final class StartMiniClusterOption { private StartMiniClusterOption(int numMasters, int numAlwaysStandByMasters, Class masterClass, int numRegionServers, List rsPorts, Class rsClass, - int numCompactionServers, int numDataNodes, String[] dataNodeHosts, int numZkServers, - boolean createRootDir, boolean createWALDir) { + int numCompactionServers, Class csClass, int numDataNodes, + String[] dataNodeHosts, int numZkServers, boolean createRootDir, boolean createWALDir) { this.numMasters = numMasters; this.numAlwaysStandByMasters = numAlwaysStandByMasters; this.masterClass = masterClass; @@ -124,6 +130,7 @@ private StartMiniClusterOption(int numMasters, int numAlwaysStandByMasters, this.rsPorts = rsPorts; this.rsClass = rsClass; this.numCompactionServers = numCompactionServers; + this.csClass = csClass; this.numDataNodes = numDataNodes; this.dataNodeHosts = dataNodeHosts; this.numZkServers = numZkServers; @@ -155,6 +162,10 @@ public Class getRsClass return rsClass; } + public Class getCsClass() { + return csClass; + } + public int getNumCompactionServers() { return numCompactionServers; } @@ -210,6 +221,7 @@ public static final class Builder { private List rsPorts = null; private int numCompactionServers; private Class rsClass = null; + private Class csClass = null; private int numDataNodes = 1; private String[] dataNodeHosts = null; private int numZkServers = 1; @@ -224,8 +236,8 @@ public StartMiniClusterOption build() { numDataNodes = dataNodeHosts.length; } return new StartMiniClusterOption(numMasters, numAlwaysStandByMasters, masterClass, - numRegionServers, rsPorts, rsClass, numCompactionServers, numDataNodes, dataNodeHosts, - numZkServers, createRootDir, createWALDir); + numRegionServers, rsPorts, rsClass, numCompactionServers, csClass, numDataNodes, + dataNodeHosts, numZkServers, createRootDir, createWALDir); } public Builder numMasters(int numMasters) { @@ -263,6 +275,11 @@ public Builder rsClass(Class csClass) { + this.csClass = csClass; + return this; + } + public Builder numDataNodes(int numDataNodes) { this.numDataNodes = numDataNodes; return this; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServerWithThroughputController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServerWithThroughputController.java new file mode 100644 index 000000000000..352a9f29d7cc --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServerWithThroughputController.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.compactionserver; + +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.StartMiniClusterOption; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.master.compaction.CompactionOffloadManager; +import org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.TestCompactionWithThroughputControllerBase; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +@Category({ RegionServerTests.class, LargeTests.class }) +public class TestCompactionServerWithThroughputController + extends TestCompactionWithThroughputControllerBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCompactionServerWithThroughputController.class); + + private static final Logger LOG = + LoggerFactory.getLogger(TestCompactionServerWithThroughputController.class); + + protected void setThroughputLimitConf(long throughputLimit) { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(CompactionOffloadManager.COMPACTION_SERVER_MAX_THROUGHPUT_HIGHER_BOUND, + throughputLimit); + conf.setLong(CompactionOffloadManager.COMPACTION_SERVER_MAX_THROUGHPUT_LOWER_BOUND, + throughputLimit); + conf.setLong( + PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, + throughputLimit); + conf.setLong( + PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, + throughputLimit); + } + + protected void startMiniCluster() throws Exception { + TEST_UTIL.startMiniCluster(StartMiniClusterOption.builder().numCompactionServers(1).build()); + TEST_UTIL.getAdmin().switchCompactionOffload(true); + } + + protected void shutdownMiniCluster() throws Exception { + HCompactionServer compactionServer = + TEST_UTIL.getMiniHBaseCluster().getCompactionServerThreads().get(0).getCompactionServer(); + Assert.assertTrue(compactionServer.requestCount.sum() > 0); + TEST_UTIL.shutdownMiniCluster(); + } + + protected Table createTable() throws IOException { + TableDescriptor tableDescriptor = + TableDescriptorBuilder.newBuilder(tableName).setCompactionOffloadEnabled(true).build(); + return TEST_UTIL.createTable(tableDescriptor, Bytes.toByteArrays(family), + TEST_UTIL.getConfiguration()); + } + + @Test + public void testCompaction() throws Exception { + long limitTime = testCompactionWithThroughputLimit(); + long noLimitTime = testCompactionWithoutThroughputLimit(); + LOG.info("With 1M/s limit, compaction use " + limitTime + "ms; without limit, compaction use " + + noLimitTime + "ms"); + // usually the throughput of a compaction without limitation is about 40MB/sec at least, so this + // is a very weak assumption. + assertTrue(limitTime > noLimitTime * 2); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestUpdateCompactionServerTotalThroughput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestUpdateCompactionServerTotalThroughput.java new file mode 100644 index 000000000000..dc3fceb26327 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestUpdateCompactionServerTotalThroughput.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.compactionserver; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.StartMiniClusterOption; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController; +import org.apache.hadoop.hbase.testclassification.CompactionServerTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; + +@Category({ MediumTests.class, CompactionServerTests.class }) +public class TestUpdateCompactionServerTotalThroughput { + public static class MyCompactionServer extends HCompactionServer { + public MyCompactionServer(final Configuration conf) throws IOException { + super(conf); + } + + protected ClusterStatusProtos.CompactionServerLoad buildServerLoad(long reportStartTime, + long reportEndTime) { + ClusterStatusProtos.CompactionServerLoad.Builder serverLoad = + ClusterStatusProtos.CompactionServerLoad.newBuilder(); + serverLoad.addCompactionTasks("compactionTask"); + serverLoad.setCompactedCells(0); + serverLoad.setCompactingCells(0); + serverLoad.setTotalNumberOfRequests(0); + serverLoad.setReportStartTime(reportStartTime); + serverLoad.setReportEndTime(reportEndTime); + return serverLoad.build(); + } + } + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestUpdateCompactionServerTotalThroughput.class); + private static HBaseTestingUtility TEST_UTIL; + private static Configuration conf = HBaseConfiguration.create(); + private static Connection connection; + private static Admin admin; + private static int COMPACTION_SERVER_NUM = 2; + + @Before + public void setUp() throws Exception { + TEST_UTIL = new HBaseTestingUtility(conf); + TEST_UTIL.startMiniCluster(StartMiniClusterOption.builder() + .numCompactionServers(COMPACTION_SERVER_NUM) + .csClass(MyCompactionServer.class) + .build()); + admin = TEST_UTIL.getAdmin(); + connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); + } + + @After + public void tearDown() throws Exception { + if (admin != null) { + admin.close(); + } + if (connection != null) { + connection.close(); + } + TEST_UTIL.shutdownMiniCluster(); + } + + private void checkThroughPut(long upperBound, long lowBound, long offPeak) { + TEST_UTIL.getHBaseCluster().getCompactionServerThreads().forEach(cs -> { + PressureAwareCompactionThroughputController throughputController = + (PressureAwareCompactionThroughputController) cs + .getCompactionServer().compactionThreadManager.getCompactionThroughputController(); + TEST_UTIL.waitFor(60000, + () -> throughputController.getMaxThroughputUpperBound() == upperBound + && throughputController.getMaxThroughputLowerBound() == lowBound + && throughputController.getMaxThroughputOffPeak() == offPeak); + }); + } + + private void checkUpdateThroughPutResult(Map result, long upperBound, long lowBound, + long offPeak) { + Assert.assertEquals(upperBound, result.get("UpperBound").longValue()); + Assert.assertEquals(lowBound, result.get("LowerBound").longValue()); + Assert.assertEquals(offPeak, result.get("OffPeak").longValue()); + } + + @Test + public void testUpdateCompactionServerTotalThroughput() throws IOException { + Admin admin = TEST_UTIL.getAdmin(); + // each compactionServer only report on compactionTask, so the throughput for compaction get + // from master is totalLimit/CompactionServerNum + Map result = admin.updateCompactionServerTotalThroughput(200L, 100L, 400L); + checkThroughPut(200 / COMPACTION_SERVER_NUM, 100 / COMPACTION_SERVER_NUM, + 400 / COMPACTION_SERVER_NUM); + checkUpdateThroughPutResult(result, 200, 100, 400); + + result = admin.updateCompactionServerTotalThroughput(0L, 0L, 0L); + // set TotalThroughput to 0 will not take effect + checkUpdateThroughPutResult(result, 200, 100, 400); + + result = admin.updateCompactionServerTotalThroughput(-100L, -100L, -100L); + // set TotalThroughput to negative will not take effect + checkUpdateThroughPutResult(result, 200, 100, 400); + + result = admin.updateCompactionServerTotalThroughput(2000L, 500L, 10000L); + checkThroughPut(2000 / COMPACTION_SERVER_NUM, 500 / COMPACTION_SERVER_NUM, + 10000 / COMPACTION_SERVER_NUM); + checkUpdateThroughPutResult(result, 2000, 500, 10000); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java index 2345dc9482bc..995abaca8a18 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java @@ -20,15 +20,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -36,10 +29,8 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HStore; -import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.StoreEngine; import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; @@ -47,7 +38,6 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -55,7 +45,8 @@ import org.slf4j.LoggerFactory; @Category({ RegionServerTests.class, LargeTests.class }) -public class TestCompactionWithThroughputController { +public class TestCompactionWithThroughputController + extends TestCompactionWithThroughputControllerBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -64,107 +55,6 @@ public class TestCompactionWithThroughputController { private static final Logger LOG = LoggerFactory.getLogger(TestCompactionWithThroughputController.class); - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - private static final double EPSILON = 1E-6; - - private final TableName tableName = TableName.valueOf(getClass().getSimpleName()); - - private final byte[] family = Bytes.toBytes("f"); - - private final byte[] qualifier = Bytes.toBytes("q"); - - private HStore getStoreWithName(TableName tableName) { - MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); - List rsts = cluster.getRegionServerThreads(); - for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { - HRegionServer hrs = rsts.get(i).getRegionServer(); - for (Region region : hrs.getRegions(tableName)) { - return ((HRegion) region).getStores().iterator().next(); - } - } - return null; - } - - private HStore prepareData() throws IOException { - Admin admin = TEST_UTIL.getAdmin(); - if (admin.tableExists(tableName)) { - admin.disableTable(tableName); - admin.deleteTable(tableName); - } - Table table = TEST_UTIL.createTable(tableName, family); - for (int i = 0; i < 10; i++) { - for (int j = 0; j < 10; j++) { - byte[] value = new byte[128 * 1024]; - ThreadLocalRandom.current().nextBytes(value); - table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); - } - admin.flush(tableName); - } - return getStoreWithName(tableName); - } - - private long testCompactionWithThroughputLimit() throws Exception { - long throughputLimit = 1024L * 1024; - Configuration conf = TEST_UTIL.getConfiguration(); - conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); - conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); - conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); - conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); - conf.setLong( - PressureAwareCompactionThroughputController - .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, - throughputLimit); - conf.setLong( - PressureAwareCompactionThroughputController - .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, - throughputLimit); - conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, - PressureAwareCompactionThroughputController.class.getName()); - TEST_UTIL.startMiniCluster(1); - try { - HStore store = prepareData(); - assertEquals(10, store.getStorefilesCount()); - long startTime = System.currentTimeMillis(); - TEST_UTIL.getAdmin().majorCompact(tableName); - while (store.getStorefilesCount() != 1) { - Thread.sleep(20); - } - long duration = System.currentTimeMillis() - startTime; - double throughput = (double) store.getStorefilesSize() / duration * 1000; - // confirm that the speed limit work properly(not too fast, and also not too slow) - // 20% is the max acceptable error rate. - assertTrue(throughput < throughputLimit * 1.2); - assertTrue(throughput > throughputLimit * 0.8); - return System.currentTimeMillis() - startTime; - } finally { - TEST_UTIL.shutdownMiniCluster(); - } - } - - private long testCompactionWithoutThroughputLimit() throws Exception { - Configuration conf = TEST_UTIL.getConfiguration(); - conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); - conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); - conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); - conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); - conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, - NoLimitThroughputController.class.getName()); - TEST_UTIL.startMiniCluster(1); - try { - HStore store = prepareData(); - assertEquals(10, store.getStorefilesCount()); - long startTime = System.currentTimeMillis(); - TEST_UTIL.getAdmin().majorCompact(tableName); - while (store.getStorefilesCount() != 1) { - Thread.sleep(20); - } - return System.currentTimeMillis() - startTime; - } finally { - TEST_UTIL.shutdownMiniCluster(); - } - } - @Test public void testCompaction() throws Exception { long limitTime = testCompactionWithThroughputLimit(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputControllerBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputControllerBase.java new file mode 100644 index 000000000000..f369329dc2e8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputControllerBase.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.throttle; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.StoreEngine; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; + + +public class TestCompactionWithThroughputControllerBase { + protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + protected static final double EPSILON = 1E-6; + + protected final TableName tableName = TableName.valueOf(getClass().getSimpleName()); + + protected final byte[] family = Bytes.toBytes("f"); + + protected final byte[] qualifier = Bytes.toBytes("q"); + + protected HStore getStoreWithName(TableName tableName) { + MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + List rsts = cluster.getRegionServerThreads(); + for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { + HRegionServer hrs = rsts.get(i).getRegionServer(); + for (Region region : hrs.getRegions(tableName)) { + return ((HRegion) region).getStores().iterator().next(); + } + } + return null; + } + + protected Table createTable() throws IOException { + return TEST_UTIL.createTable(tableName, family); + } + + protected HStore prepareData() throws IOException { + Admin admin = TEST_UTIL.getAdmin(); + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + Table table = createTable(); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + byte[] value = new byte[128 * 1024]; + ThreadLocalRandom.current().nextBytes(value); + table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); + } + admin.flush(tableName); + } + return getStoreWithName(tableName); + } + + protected void startMiniCluster() throws Exception{ + TEST_UTIL.startMiniCluster(1); + } + + protected void shutdownMiniCluster() throws Exception{ + TEST_UTIL.shutdownMiniCluster(); + } + protected void setThroughputLimitConf(long throughputLimit){ + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong( + PressureAwareCompactionThroughputController + .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, + throughputLimit); + conf.setLong( + PressureAwareCompactionThroughputController + .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, + throughputLimit); + } + + protected long testCompactionWithThroughputLimit() throws Exception { + long throughputLimit = 1024L * 1024; + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); + conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); + conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); + setThroughputLimitConf(throughputLimit); + conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, + PressureAwareCompactionThroughputController.class.getName()); + startMiniCluster(); + try { + HStore store = prepareData(); + assertEquals(10, store.getStorefilesCount()); + long startTime = System.currentTimeMillis(); + TEST_UTIL.getAdmin().majorCompact(tableName); + while (store.getStorefilesCount() != 1) { + Thread.sleep(20); + } + long duration = System.currentTimeMillis() - startTime; + double throughput = (double) store.getStorefilesSize() / duration * 1000; + // confirm that the speed limit work properly(not too fast, and also not too slow) + // 20% is the max acceptable error rate. + assertTrue(throughput < throughputLimit * 1.2); + assertTrue(throughput > throughputLimit * 0.8); + return System.currentTimeMillis() - startTime; + } finally { + shutdownMiniCluster(); + } + } + + + protected long testCompactionWithoutThroughputLimit() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); + conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); + conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); + conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, + NoLimitThroughputController.class.getName()); + startMiniCluster(); + try { + HStore store = prepareData(); + assertEquals(10, store.getStorefilesCount()); + long startTime = System.currentTimeMillis(); + TEST_UTIL.getAdmin().majorCompact(tableName); + while (store.getStorefilesCount() != 1) { + Thread.sleep(20); + } + return System.currentTimeMillis() - startTime; + } finally { + shutdownMiniCluster(); + } + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java index d2f62bae982d..418e5dcbb3a0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java @@ -728,6 +728,11 @@ public boolean switchCompactionOffload(boolean enable) throws IOException { return admin.switchCompactionOffload(enable); } + public Map updateCompactionServerTotalThroughput(long upperBound, long lowerBound, + long offPeak) throws IOException { + return admin.updateCompactionServerTotalThroughput(upperBound, lowerBound, offPeak); + } + public boolean isCompactionOffloadEnabled() throws IOException { return admin.isCompactionOffloadEnabled(); } diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index ab0f7ef9e761..090e6c6663a7 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -121,6 +121,13 @@ def compaction_switch(on_or_off, regionserver_names) @admin.compactionSwitch(java.lang.Boolean.valueOf(on_or_off), servers) end + # update compaction server total throughput bound + def update_compaction_server_total_throughput(upper_bound, lower_bound, offpeak) + @admin.updateCompactionServerTotalThroughput(java.lang.Long.value_of(upper_bound), + java.lang.Long.value_of(lower_bound), + java.lang.Long.value_of(offpeak)) + end + #---------------------------------------------------------------------------------------------- # Gets compaction state for specified table def getCompactionState(table_name) diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index 6b2a195233e2..c13957cfb732 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -489,6 +489,9 @@ def self.exception_handler(hide_traceback) list_decommissioned_regionservers decommission_regionservers recommission_regionserver + set_compaction_server_throughput_lower_bound + set_compaction_server_throughput_upper_bound + set_compaction_server_throughput_offpeak ], # TODO: remove older hlog_roll command aliases: { diff --git a/hbase-shell/src/main/ruby/shell/commands/set_compaction_server_throughput_lower_bound.rb b/hbase-shell/src/main/ruby/shell/commands/set_compaction_server_throughput_lower_bound.rb new file mode 100644 index 000000000000..c58a66437194 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/set_compaction_server_throughput_lower_bound.rb @@ -0,0 +1,43 @@ +# +# Copyright The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class SetCompactionServerThroughputLowerBound < Command + def help + <<-EOF +Set the compaction server total throughput lower bound. + +Examples: + + # set bandwidth=2MB for total throughput lower bound. + hbase> set_compaction_server_throughput_lower_bound 2097152 + + EOF + end + + def command(bandwidth = 0) + formatter.header(%w(['BOUND' 'BANDWIDTH'])) + throughtput = admin.update_compaction_server_total_throughput(0, bandwidth, 0) + throughtput.each { |k, v| formatter.row([k, java.lang.String.valueOf(v)]) } + end + end + end +end diff --git a/hbase-shell/src/main/ruby/shell/commands/set_compaction_server_throughput_offpeak.rb b/hbase-shell/src/main/ruby/shell/commands/set_compaction_server_throughput_offpeak.rb new file mode 100644 index 000000000000..6a9d196d56eb --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/set_compaction_server_throughput_offpeak.rb @@ -0,0 +1,43 @@ +# +# Copyright The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class SetCompactionServerThroughputOffpeak < Command + def help + <<-EOF +Set the compaction server total throughput offpeak. + +Examples: + + # set bandwidth=2MB for total throughput offpeak. + hbase> set_compaction_server_throughput_offpeak 2097152 + + EOF + end + + def command(bandwidth = 0) + formatter.header(%w(['BOUND' 'BANDWIDTH'])) + throughtput = admin.update_compaction_server_total_throughput(0, 0, bandwidth) + throughtput.each { |k, v| formatter.row([k, java.lang.String.valueOf(v)]) } + end + end + end +end diff --git a/hbase-shell/src/main/ruby/shell/commands/set_compaction_server_throughput_upper_bound.rb b/hbase-shell/src/main/ruby/shell/commands/set_compaction_server_throughput_upper_bound.rb new file mode 100644 index 000000000000..34e656b4c800 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/set_compaction_server_throughput_upper_bound.rb @@ -0,0 +1,43 @@ +# +# Copyright The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class SetCompactionServerThroughputUpperBound < Command + def help + <<-EOF +Set the compaction server total throughput upper bound. + +Examples: + + # set bandwidth=2MB for total throughput upper bound. + hbase> set_compaction_server_throughput_upper_bound 2097152 + + EOF + end + + def command(bandwidth = 0) + formatter.header(%w(['BOUND' 'BANDWIDTH'])) + throughtput = admin.update_compaction_server_total_throughput(bandwidth, 0, 0) + throughtput.each { |k, v| formatter.row([k, java.lang.String.valueOf(v)]) } + end + end + end +end diff --git a/hbase-shell/src/test/ruby/hbase/admin_test.rb b/hbase-shell/src/test/ruby/hbase/admin_test.rb index 09e659b9ff06..07caa0be6bab 100644 --- a/hbase-shell/src/test/ruby/hbase/admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/admin_test.rb @@ -512,6 +512,24 @@ def teardown #------------------------------------------------------------------------------- + define_test 'set compaction server throughput lower bound' do + output = capture_stdout { command(:set_compaction_server_throughput_lower_bound, 500)} + assert(output.include?("LowerBound 500")) + output = capture_stdout { command(:set_compaction_server_throughput_lower_bound, 1500)} + assert(output.include?("LowerBound 1500")) + end + define_test 'set compaction server throughput upper bound' do + output = capture_stdout { command(:set_compaction_server_throughput_upper_bound, 5000)} + assert(output.include?("UpperBound 5000")) + output = capture_stdout { command(:set_compaction_server_throughput_upper_bound, 15000)} + assert(output.include?("UpperBound 15000")) + end + define_test 'set compaction server throughput offPeak' do + output = capture_stdout { command(:set_compaction_server_throughput_offpeak, 5000)} + assert(output.include?("OffPeak 5000")) + output = capture_stdout { command(:set_compaction_server_throughput_offpeak, 15000)} + assert(output.include?("OffPeak 15000")) + end define_test "describe should fail for non-existent tables" do assert_raise(ArgumentError) do admin.describe('NOT.EXISTS') diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java index 7997e4d82e38..e01c7df726e0 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java @@ -458,6 +458,13 @@ public boolean switchCompactionOffload(boolean enable) throws IOException { "switchCompactionOffload by pattern not supported in ThriftAdmin"); } + @Override + public Map updateCompactionServerTotalThroughput(long upperBound, long lowerBound, + long offPeak) throws IOException { + throw new NotImplementedException( + "updateCompactionServerTotalThroughput by pattern not supported in ThriftAdmin"); + } + @Override public boolean isCompactionOffloadEnabled() throws IOException { throw new NotImplementedException(