From 5cbf9add1c8df7593618309b05430cd164ea76e7 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Mon, 26 Feb 2024 11:29:56 -0500 Subject: [PATCH 1/7] track BBS in RSH, use it in scan quota estimation --- .../hbase/quotas/DefaultOperationQuota.java | 76 ++++++++++++++++--- .../hbase/quotas/ExceedOperationQuota.java | 61 ++++++++++++--- .../hbase/quotas/NoopOperationQuota.java | 10 ++- .../hadoop/hbase/quotas/OperationQuota.java | 17 ++++- .../hadoop/hbase/quotas/RateLimiter.java | 7 +- .../quotas/RegionServerRpcQuotaManager.java | 75 +++++++++++++----- .../hbase/regionserver/RSRpcServices.java | 21 +++-- .../quotas/TestBlockBytesScannedQuota.java | 31 +++++--- .../hbase/quotas/ThrottleQuotaTestUtil.java | 7 +- 9 files changed, 246 insertions(+), 59 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index a4ff8b2a859e..ea188f8e4acc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -27,6 +27,8 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; + @InterfaceAudience.Private @InterfaceStability.Evolving public class DefaultOperationQuota implements OperationQuota { @@ -80,21 +82,41 @@ public DefaultOperationQuota(final Configuration conf, final List } @Override - public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException { - updateEstimateConsumeQuota(numWrites, numReads, numScans); + public void checkQuota(int numWrites, int numReads) throws RpcThrottlingException { + updateEstimateConsumeBatchQuota(numWrites, numReads); readAvailable = Long.MAX_VALUE; for (final QuotaLimiter limiter : limiters) { if (limiter.isBypass()) continue; - limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, + limiter.checkQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed, readCapacityUnitConsumed); readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); } for (final QuotaLimiter limiter : limiters) { - limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, - writeCapacityUnitConsumed, readCapacityUnitConsumed); + limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed, + readCapacityUnitConsumed); + } + } + + @Override + public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, + long maxBlockBytesScanned) throws RpcThrottlingException { + updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned); + + readAvailable = Long.MAX_VALUE; + for (final QuotaLimiter limiter : limiters) { + if (limiter.isBypass()) continue; + + limiter.checkQuota(0, writeConsumed, 1, readConsumed, writeCapacityUnitConsumed, + readCapacityUnitConsumed); + readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); + } + + for (final QuotaLimiter limiter : limiters) { + limiter.grabQuota(0, writeConsumed, 1, readConsumed, writeCapacityUnitConsumed, + readCapacityUnitConsumed); } } @@ -158,24 +180,60 @@ public void addMutation(final Mutation mutation) { * Update estimate quota(read/write size/capacityUnits) which will be consumed * @param numWrites the number of write requests * @param numReads the number of read requests - * @param numScans the number of scan requests */ - protected void updateEstimateConsumeQuota(int numWrites, int numReads, int numScans) { + protected void updateEstimateConsumeBatchQuota(int numWrites, int numReads) { writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100); if (useResultSizeBytes) { readConsumed = estimateConsume(OperationType.GET, numReads, 100); - readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000); } else { // assume 1 block required for reads. this is probably a low estimate, which is okay readConsumed = numReads > 0 ? blockSizeBytes : 0; - readConsumed += numScans > 0 ? blockSizeBytes : 0; } writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed); readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed); } + /** + * Update estimate quota(read/write size/capacityUnits) which will be consumed + * @param scanRequest the scan to be executed + * @param maxScannerResultSize the maximum bytes to be returned by the scanner + * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the scanner + */ + protected void updateEstimateConsumeScanQuota(ClientProtos.ScanRequest scanRequest, + long maxScannerResultSize, long maxBlockBytesScanned) { + if (useResultSizeBytes) { + readConsumed = estimateConsume(OperationType.GET, 1, 1000); + } else { + /* + * Estimating scan workload is more complicated, and if we severely underestimate workloads + * then throttled clients will exhaust retries too quickly, and could saturate the RPC layer. + * We have access to the ScanRequest's nextCallSeq number, the maxScannerResultSize, and the + * maxBlockBytesScanned by every relevant Scanner#next call. With these inputs we can make a + * more informed estimate about the scan's workload. + */ + long estimate; + if (scanRequest.getNextCallSeq() == 0) { + // start scanners with an optimistic 1 block IO estimate + // it is better to underestimate a large scan in the beginning + // than to overestimate, and block, a small scan + estimate = blockSizeBytes; + } else { + // scanner result sizes will be limited by quota availability, regardless of + // maxScannerResultSize. This means that we cannot safely assume that a long-running + // scan with a small maxBlockBytesScanned would not prefer to pull down + // a larger payload. So we should estimate with the assumption that long-running scans + // are appropriately configured to approach their maxScannerResultSize per RPC call + estimate = + Math.min(maxScannerResultSize, scanRequest.getNextCallSeq() * maxBlockBytesScanned); + } + readConsumed = estimate; + } + + readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed); + } + private long estimateConsume(final OperationType type, int numReqs, long avgSize) { if (numReqs > 0) { return avgSize * numReqs; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java index 1788e550f22a..972584449f0c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java @@ -23,6 +23,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; + /* * Internal class used to check and consume quota if exceed throttle quota is enabled. Exceed * throttle quota means, user can over consume user/namespace/table quota if region server has @@ -47,45 +49,86 @@ public ExceedOperationQuota(final Configuration conf, int blockSizeBytes, } @Override - public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException { + public void checkQuota(int numWrites, int numReads) throws RpcThrottlingException { if (regionServerLimiter.isBypass()) { // If region server limiter is bypass, which means no region server quota is set, check and // throttle by all other quotas. In this condition, exceed throttle quota will not work. LOG.warn("Exceed throttle quota is enabled but no region server quotas found"); - super.checkQuota(numWrites, numReads, numScans); + super.checkQuota(numWrites, numReads); } else { // 1. Update estimate quota which will be consumed - updateEstimateConsumeQuota(numWrites, numReads, numScans); + updateEstimateConsumeBatchQuota(numWrites, numReads); // 2. Check if region server limiter is enough. If not, throw RpcThrottlingException. - regionServerLimiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, + regionServerLimiter.checkQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed, readCapacityUnitConsumed); // 3. Check if other limiters are enough. If not, exceed other limiters because region server // limiter is enough. boolean exceed = false; try { - super.checkQuota(numWrites, numReads, numScans); + super.checkQuota(numWrites, numReads); } catch (RpcThrottlingException e) { exceed = true; if (LOG.isDebugEnabled()) { - LOG.debug("Read/Write requests num exceeds quota: writes:{} reads:{} scan:{}, " - + "try use region server quota", numWrites, numReads, numScans); + LOG.debug("Read/Write requests num exceeds quota: writes:{} reads:{} scans:0, " + + "try use region server quota", numWrites, numReads); } } // 4. Region server limiter is enough and grab estimated consume quota. readAvailable = Math.max(readAvailable, regionServerLimiter.getReadAvailable()); - regionServerLimiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, + regionServerLimiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed, writeCapacityUnitConsumed); if (exceed) { // 5. Other quota limiter is exceeded and has not been grabbed (because throw // RpcThrottlingException in Step 3), so grab it. for (final QuotaLimiter limiter : limiters) { - limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, + limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed, writeCapacityUnitConsumed); } } } } + @Override + public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, + long maxBlockBytesScanned) throws RpcThrottlingException { + if (regionServerLimiter.isBypass()) { + // If region server limiter is bypass, which means no region server quota is set, check and + // throttle by all other quotas. In this condition, exceed throttle quota will not work. + LOG.warn("Exceed throttle quota is enabled but no region server quotas found"); + super.checkScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned); + } else { + // 1. Update estimate quota which will be consumed + updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned); + // 2. Check if region server limiter is enough. If not, throw RpcThrottlingException. + regionServerLimiter.checkQuota(0, writeConsumed, 1, readConsumed, writeCapacityUnitConsumed, + readCapacityUnitConsumed); + // 3. Check if other limiters are enough. If not, exceed other limiters because region server + // limiter is enough. + boolean exceed = false; + try { + super.checkScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned); + } catch (RpcThrottlingException e) { + exceed = true; + if (LOG.isDebugEnabled()) { + LOG.debug("Read/Write requests num exceeds quota: writes:0 reads:0, scans:1, " + + "try use region server quota"); + } + } + // 4. Region server limiter is enough and grab estimated consume quota. + readAvailable = Math.max(readAvailable, regionServerLimiter.getReadAvailable()); + regionServerLimiter.grabQuota(0, writeConsumed, 1, readConsumed, writeCapacityUnitConsumed, + writeCapacityUnitConsumed); + if (exceed) { + // 5. Other quota limiter is exceeded and has not been grabbed (because throw + // RpcThrottlingException in Step 3), so grab it. + for (final QuotaLimiter limiter : limiters) { + limiter.grabQuota(0, writeConsumed, 1, readConsumed, writeCapacityUnitConsumed, + writeCapacityUnitConsumed); + } + } + } + } + @Override public void close() { super.close(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java index b64429d9adc8..f5bb0827cdf4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java @@ -23,6 +23,8 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; + /** * Noop operation quota returned when no quota is associated to the user/table */ @@ -40,7 +42,13 @@ public static OperationQuota get() { } @Override - public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException { + public void checkQuota(int numWrites, int numReads) throws RpcThrottlingException { + // no-op + } + + @Override + public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, + long maxBlockBytesScanned) throws RpcThrottlingException { // no-op } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java index bedad5e98673..4a9fac977700 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java @@ -23,6 +23,8 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; + /** * Interface that allows to check the quota available for an operation. */ @@ -51,11 +53,22 @@ public enum OperationType { * on the number of operations to perform and the average size accumulated during time. * @param numWrites number of write operation that will be performed * @param numReads number of small-read operation that will be performed - * @param numScans number of long-read operation that will be performed * @throws RpcThrottlingException if the operation cannot be performed because RPC quota is * exceeded. */ - void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException; + void checkQuota(int numWrites, int numReads) throws RpcThrottlingException; + + /** + * Checks if it is possible to execute the scan. The quota will be estimated based on the + * composition of the scan. + * @param scanRequest the given scan operation + * @param maxScannerResultSize the maximum bytes to be returned by the scanner + * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the scanner + * @throws RpcThrottlingException if the operation cannot be performed because RPC quota is + * exceeded. + */ + void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, + long maxBlockBytesScanned) throws RpcThrottlingException; /** Cleanup method on operation completion */ void close(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java index 5c69ad5d6cd5..273ab85838da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java @@ -35,6 +35,7 @@ + "are mostly synchronized...but to me it looks like they are totally synchronized") public abstract class RateLimiter { public static final String QUOTA_RATE_LIMITER_CONF_KEY = "hbase.quota.rate.limiter"; + private static final long QUOTA_RATE_LIMITER_MINIMUM_WAIT_INTERVAL_MS = 100L; private long tunit = 1000; // Timeunit factor for translating to ms. private long limit = Long.MAX_VALUE; // The max value available resource units can be refilled to. private long avail = Long.MAX_VALUE; // Currently available resource units @@ -217,7 +218,11 @@ public long waitInterval() { */ public synchronized long waitInterval(final long amount) { // TODO Handle over quota? - return (amount <= avail) ? 0 : getWaitInterval(getLimit(), avail, amount); + long waitInterval = (amount <= avail) ? 0 : getWaitInterval(getLimit(), avail, amount); + if (waitInterval > 0) { + return Math.max(waitInterval, QUOTA_RATE_LIMITER_MINIMUM_WAIT_INTERVAL_MS); + } + return waitInterval; } // These two method are for strictly testing purpose only diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java index 3c72c662887b..7ed95555c28f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java @@ -156,38 +156,77 @@ public OperationQuota getQuota(final UserGroupInformation ugi, final TableName t /** * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the - * available quota and to report the data/usage of the operation. + * available quota and to report the data/usage of the operation. This method is specific to scans + * because estimating a scan's workload is more complicated than estimating the workload of a + * get/put. + * @param region the region where the operation will be performed + * @param scanRequest the scan to be estimated against the quota + * @param maxScannerResultSize the maximum bytes to be returned by the scanner + * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the scanner + * @return the OperationQuota + * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. + */ + public OperationQuota checkScanQuota(final Region region, + final ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, + long maxBlockBytesScanned) throws IOException, RpcThrottlingException { + Optional user = RpcServer.getRequestUser(); + UserGroupInformation ugi; + if (user.isPresent()) { + ugi = user.get().getUGI(); + } else { + ugi = User.getCurrent().getUGI(); + } + TableDescriptor tableDescriptor = region.getTableDescriptor(); + TableName table = tableDescriptor.getTableName(); + + OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes()); + try { + quota.checkScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned); + } catch (RpcThrottlingException e) { + LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table + " scan=" + + scanRequest.getScannerId() + ": " + e.getMessage()); + throw e; + } + return quota; + } + + /** + * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the + * available quota and to report the data/usage of the operation. This method does not support + * scans because estimating a scan's workload is more complicated than estimating the workload of + * a get/put. * @param region the region where the operation will be performed * @param type the operation type * @return the OperationQuota * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. */ - public OperationQuota checkQuota(final Region region, final OperationQuota.OperationType type) - throws IOException, RpcThrottlingException { + public OperationQuota checkBatchQuota(final Region region, + final OperationQuota.OperationType type) throws IOException, RpcThrottlingException { switch (type) { - case SCAN: - return checkQuota(region, 0, 0, 1); case GET: - return checkQuota(region, 0, 1, 0); + return this.checkBatchQuota(region, 0, 1); case MUTATE: - return checkQuota(region, 1, 0, 0); + return this.checkBatchQuota(region, 1, 0); case CHECK_AND_MUTATE: - return checkQuota(region, 1, 1, 0); + return this.checkBatchQuota(region, 1, 1); } throw new RuntimeException("Invalid operation type: " + type); } /** * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the - * available quota and to report the data/usage of the operation. + * available quota and to report the data/usage of the operation. This method does not support + * scans because estimating a scan's workload is more complicated than estimating the workload of + * a get/put. * @param region the region where the operation will be performed * @param actions the "multi" actions to perform * @param hasCondition whether the RegionAction has a condition * @return the OperationQuota * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. */ - public OperationQuota checkQuota(final Region region, final List actions, - boolean hasCondition) throws IOException, RpcThrottlingException { + public OperationQuota checkBatchQuota(final Region region, + final List actions, boolean hasCondition) + throws IOException, RpcThrottlingException { int numWrites = 0; int numReads = 0; for (final ClientProtos.Action action : actions) { @@ -202,7 +241,7 @@ public OperationQuota checkQuota(final Region region, final List user = RpcServer.getRequestUser(); UserGroupInformation ugi; if (user.isPresent()) { @@ -229,11 +267,10 @@ private OperationQuota checkQuota(final Region region, final int numWrites, fina OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes()); try { - quota.checkQuota(numWrites, numReads, numScans); + quota.checkQuota(numWrites, numReads); } catch (RpcThrottlingException e) { - LOG.debug( - "Throttling exception for user=" + ugi.getUserName() + " table=" + table + " numWrites=" - + numWrites + " numReads=" + numReads + " numScans=" + numScans + ": " + e.getMessage()); + LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table + + " numWrites=" + numWrites + " numReads=" + numReads + ": " + e.getMessage()); throw e; } return quota; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 25b229fd0c4e..998fe67f4622 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -429,6 +429,7 @@ static final class RegionScannerHolder { private boolean fullRegionScan; private final String clientIPAndPort; private final String userName; + private final AtomicLong maxBlockBytesScanned = new AtomicLong(0); RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack, RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan, @@ -452,6 +453,14 @@ boolean incNextCallSeq(long currentSeq) { return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); } + long getMaxBlockBytesScanned() { + return maxBlockBytesScanned.get(); + } + + synchronized void setMaxBlockBytesScanned(long blockBytesScanned) { + maxBlockBytesScanned.set(Math.max(getMaxBlockBytesScanned(), blockBytesScanned)); + } + // Should be called only when we need to print lease expired messages otherwise // cache the String once made. @Override @@ -2464,7 +2473,7 @@ public GetResponse get(final RpcController controller, final GetRequest request) } Boolean existence = null; Result r = null; - quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET); + quota = getRpcQuotaManager().checkBatchQuota(region, OperationQuota.OperationType.GET); Get clientGet = ProtobufUtil.toGet(get); if (get.getExistenceOnly() && region.getCoprocessorHost() != null) { @@ -2681,7 +2690,7 @@ public MultiResponse multi(final RpcController rpcc, final MultiRequest request) try { region = getRegion(regionSpecifier); - quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList(), + quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(), regionAction.hasCondition()); } catch (IOException e) { failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); @@ -2744,7 +2753,7 @@ public MultiResponse multi(final RpcController rpcc, final MultiRequest request) try { region = getRegion(regionSpecifier); - quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList(), + quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(), regionAction.hasCondition()); } catch (IOException e) { failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); @@ -2929,7 +2938,7 @@ public MutateResponse mutate(final RpcController rpcc, final MutateRequest reque } long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; OperationQuota.OperationType operationType = QuotaUtil.getQuotaOperationType(request); - quota = getRpcQuotaManager().checkQuota(region, operationType); + quota = getRpcQuotaManager().checkBatchQuota(region, operationType); ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); @@ -3485,6 +3494,7 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan if (rpcCall != null) { responseCellSize = rpcCall.getResponseCellSize(); blockBytesScanned = rpcCall.getBlockBytesScanned(); + rsh.setMaxBlockBytesScanned(blockBytesScanned); } region.getMetrics().updateScan(); final MetricsRegionServer metricsRegionServer = server.getMetrics(); @@ -3588,7 +3598,8 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque } OperationQuota quota; try { - quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN); + quota = getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize, + rsh.getMaxBlockBytesScanned()); } catch (IOException e) { addScannerLeaseBack(lease); throw new ServiceException(e); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java index 5de9a2d1a900..3afa15d96211 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java @@ -140,27 +140,41 @@ public void testBBSScan() throws Exception { waitMinuteQuota(); // should execute 1 request - testTraffic(() -> doScans(5, table), 1, 0); + testTraffic(() -> doScans(5, table, 1), 1, 0); // Remove all the limits admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); - testTraffic(() -> doScans(100, table), 100, 0); - testTraffic(() -> doScans(100, table), 100, 0); + testTraffic(() -> doScans(100, table, 1), 100, 0); + testTraffic(() -> doScans(100, table, 1), 100, 0); // Add ~3 block/sec limit. This should support >1 scans admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, Math.round(3.1 * blockSize), TimeUnit.SECONDS)); triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + waitMinuteQuota(); + + // Add 50 block/sec limit. This should support >1 scans + admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, + Math.round(50 * blockSize), TimeUnit.SECONDS)); + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + waitMinuteQuota(); - // should execute some requests, but not all - testTraffic(() -> doScans(100, table), 100, 90); + // This will produce some throttling exceptions, but all/most should succeed within the timeout + testTraffic(() -> doScans(100, table, 1), 75, 25); + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + waitMinuteQuota(); + + // With large caching, a big scan should succeed + testTraffic(() -> doScans(10_000, table, 10_000), 10_000, 0); + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + waitMinuteQuota(); // Remove all the limits admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); - testTraffic(() -> doScans(100, table), 100, 0); - testTraffic(() -> doScans(100, table), 100, 0); + testTraffic(() -> doScans(100, table, 1), 100, 0); + testTraffic(() -> doScans(100, table, 1), 100, 0); } @Test @@ -223,9 +237,8 @@ private void testTraffic(Callable trafficCallable, long expectedSuccess, l boolean success = (actualSuccess >= expectedSuccess - marginOfError) && (actualSuccess <= expectedSuccess + marginOfError); if (!success) { - triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); waitMinuteQuota(); - Thread.sleep(15_000L); } return success; }); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java index ff34c52386bf..8da2989921aa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java @@ -152,22 +152,21 @@ static long doMultiGets(int maxOps, int batchSize, int rowCount, byte[] family, return opCount; } - static long doScans(int maxOps, Table table) { + static long doScans(int desiredRows, Table table, int caching) { int count = 0; - int caching = 100; try { Scan scan = new Scan(); scan.setCaching(caching); scan.setCacheBlocks(false); ResultScanner scanner = table.getScanner(scan); - while (count < (maxOps * caching)) { + while (count < desiredRows) { scanner.next(); count += 1; } } catch (IOException e) { LOG.error("scan failed after nRetries=" + count, e); } - return count / caching; + return count; } static void triggerUserCacheRefresh(HBaseTestingUtil testUtil, boolean bypass, From f6bce7709dd58e96175b38479786e19ce61d4704 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Tue, 27 Feb 2024 13:39:09 -0500 Subject: [PATCH 2/7] maxScanEstimate --- .../hbase/quotas/DefaultOperationQuota.java | 13 ++++++- .../hadoop/hbase/quotas/NoopQuotaLimiter.java | 5 +++ .../hadoop/hbase/quotas/QuotaLimiter.java | 3 ++ .../hadoop/hbase/quotas/TimeBasedLimiter.java | 5 +++ .../quotas/TestBlockBytesScannedQuota.java | 38 +++++++++++++++++++ 5 files changed, 63 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index ea188f8e4acc..dcc71e6bc7ab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -33,6 +33,11 @@ @InterfaceStability.Evolving public class DefaultOperationQuota implements OperationQuota { + // a single scan estimate can consume no more than this proportion of the limiter's limit + // this prevents a long-running scan from being estimated at, say, 100MB of IO against + // a <100MB/IO throttle (because this would never succeed) + private static final double MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION = 0.9; + protected final List limiters; private final long writeCapacityUnit; private final long readCapacityUnit; @@ -55,6 +60,7 @@ public class DefaultOperationQuota implements OperationQuota { protected long readCapacityUnitDiff = 0; private boolean useResultSizeBytes; private long blockSizeBytes; + private long maxScanEstimate; public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes, final QuotaLimiter... limiters) { @@ -62,6 +68,11 @@ public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes, this.useResultSizeBytes = conf.getBoolean(OperationQuota.USE_RESULT_SIZE_BYTES, USE_RESULT_SIZE_BYTES_DEFAULT); this.blockSizeBytes = blockSizeBytes; + long readSizeLimit = Arrays.stream(limiters) + .mapToLong(QuotaLimiter::getReadLimit) + .min() + .orElse(Long.MAX_VALUE); + maxScanEstimate = Math.round(MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION * readSizeLimit); } /** @@ -228,7 +239,7 @@ protected void updateEstimateConsumeScanQuota(ClientProtos.ScanRequest scanReque estimate = Math.min(maxScannerResultSize, scanRequest.getNextCallSeq() * maxBlockBytesScanned); } - readConsumed = estimate; + readConsumed = Math.min(maxScanEstimate, estimate); } readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java index 63d7610115af..cf1e49c12e5c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java @@ -70,6 +70,11 @@ public long getReadAvailable() { throw new UnsupportedOperationException(); } + @Override + public long getReadLimit() { + return Long.MAX_VALUE; + } + @Override public String toString() { return "NoopQuotaLimiter"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java index 14326e4e0d25..8d00a702e253 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java @@ -76,6 +76,9 @@ void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize, /** Returns the number of bytes available to read to avoid exceeding the quota */ long getReadAvailable(); + /** Returns the maximum number of bytes ever available to read */ + long getReadLimit(); + /** Returns the number of bytes available to write to avoid exceeding the quota */ long getWriteAvailable(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java index 8ae2cae01881..483edbcd3a4f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java @@ -243,6 +243,11 @@ public long getReadAvailable() { return readSizeLimiter.getAvailable(); } + @Override + public long getReadLimit() { + return Math.min(readSizeLimiter.getLimit(), reqSizeLimiter.getLimit()); + } + @Override public String toString() { StringBuilder builder = new StringBuilder(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java index 3afa15d96211..d67459ec68b6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doScans; import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerUserCacheRefresh; import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.waitMinuteQuota; +import static org.junit.Assert.assertTrue; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @@ -60,12 +61,16 @@ public class TestBlockBytesScannedQuota { private static final byte[] QUALIFIER = Bytes.toBytes("q"); private static final TableName TABLE_NAME = TableName.valueOf("BlockBytesScannedQuotaTest"); + private static final long MAX_SCANNER_RESULT_SIZE = 100 * 1024 * 1024; @BeforeClass public static void setUpBeforeClass() throws Exception { // client should fail fast TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 10); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, + MAX_SCANNER_RESULT_SIZE); + TEST_UTIL.getConfiguration().setClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY, AverageIntervalRateLimiter.class, RateLimiter.class); // quotas enabled, using block bytes scanned TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); @@ -177,6 +182,39 @@ public void testBBSScan() throws Exception { testTraffic(() -> doScans(100, table, 1), 100, 0); } + @Test + public void testSmallScanNeverBlockedByLargeEstimate() throws Exception { + final Admin admin = TEST_UTIL.getAdmin(); + final String userName = User.getCurrent().getShortName(); + Table table = admin.getConnection().getTable(TABLE_NAME); + + doPuts(10_000, FAMILY, QUALIFIER, table); + TEST_UTIL.flush(TABLE_NAME); + + // Add 99MB/sec limit. + // This should never be blocked, but with a sequence number approaching 10k, without + // other intervention, we would estimate a scan workload approaching 625MB or the + // maxScannerResultSize (both larger than the 90MB limit). This test ensures that all + // requests succeed, so the estimate never becomes large enough to cause read downtime + long limit = 99 * 1024 * 1024; + assertTrue(limit <= MAX_SCANNER_RESULT_SIZE); // always true, but protecting against code changes + admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, limit, + TimeUnit.SECONDS)); + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + waitMinuteQuota(); + + // should execute all requests + testTraffic(() -> doScans(10_000, table, 1), 10_000, 0); + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + waitMinuteQuota(); + + // Remove all the limits + admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); + triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); + testTraffic(() -> doScans(100, table, 1), 100, 0); + testTraffic(() -> doScans(100, table, 1), 100, 0); + } + @Test public void testBBSMultiGet() throws Exception { final Admin admin = TEST_UTIL.getAdmin(); From f4a04e5ff6862c114e9bcbf81768086b1a9a3789 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Wed, 28 Feb 2024 11:44:59 -0500 Subject: [PATCH 3/7] checkstyle etc --- .../hadoop/hbase/quotas/DefaultOperationQuota.java | 14 ++++++++------ .../hadoop/hbase/regionserver/RSRpcServices.java | 2 +- .../hbase/quotas/TestBlockBytesScannedQuota.java | 8 +++++--- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index dcc71e6bc7ab..5e37d9c39fe2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -68,10 +68,8 @@ public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes, this.useResultSizeBytes = conf.getBoolean(OperationQuota.USE_RESULT_SIZE_BYTES, USE_RESULT_SIZE_BYTES_DEFAULT); this.blockSizeBytes = blockSizeBytes; - long readSizeLimit = Arrays.stream(limiters) - .mapToLong(QuotaLimiter::getReadLimit) - .min() - .orElse(Long.MAX_VALUE); + long readSizeLimit = + Arrays.stream(limiters).mapToLong(QuotaLimiter::getReadLimit).min().orElse(Long.MAX_VALUE); maxScanEstimate = Math.round(MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION * readSizeLimit); } @@ -98,7 +96,9 @@ public void checkQuota(int numWrites, int numReads) throws RpcThrottlingExceptio readAvailable = Long.MAX_VALUE; for (final QuotaLimiter limiter : limiters) { - if (limiter.isBypass()) continue; + if (limiter.isBypass()) { + continue; + } limiter.checkQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed, readCapacityUnitConsumed); @@ -118,7 +118,9 @@ public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScanner readAvailable = Long.MAX_VALUE; for (final QuotaLimiter limiter : limiters) { - if (limiter.isBypass()) continue; + if (limiter.isBypass()) { + continue; + } limiter.checkQuota(0, writeConsumed, 1, readConsumed, writeCapacityUnitConsumed, readCapacityUnitConsumed); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 998fe67f4622..eac20e77901f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -453,7 +453,7 @@ boolean incNextCallSeq(long currentSeq) { return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); } - long getMaxBlockBytesScanned() { + synchronized long getMaxBlockBytesScanned() { return maxBlockBytesScanned.get(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java index d67459ec68b6..c058abe214c5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java @@ -70,7 +70,8 @@ public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, MAX_SCANNER_RESULT_SIZE); - TEST_UTIL.getConfiguration().setClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY, AverageIntervalRateLimiter.class, RateLimiter.class); + TEST_UTIL.getConfiguration().setClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY, + AverageIntervalRateLimiter.class, RateLimiter.class); // quotas enabled, using block bytes scanned TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); @@ -161,7 +162,7 @@ public void testBBSScan() throws Exception { // Add 50 block/sec limit. This should support >1 scans admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, - Math.round(50 * blockSize), TimeUnit.SECONDS)); + Math.round(50.1 * blockSize), TimeUnit.SECONDS)); triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); waitMinuteQuota(); @@ -197,7 +198,8 @@ public void testSmallScanNeverBlockedByLargeEstimate() throws Exception { // maxScannerResultSize (both larger than the 90MB limit). This test ensures that all // requests succeed, so the estimate never becomes large enough to cause read downtime long limit = 99 * 1024 * 1024; - assertTrue(limit <= MAX_SCANNER_RESULT_SIZE); // always true, but protecting against code changes + assertTrue(limit <= MAX_SCANNER_RESULT_SIZE); // always true, but protecting against code + // changes admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, limit, TimeUnit.SECONDS)); triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); From a2e58b408b27660da110ed7923a55152e1053163 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Fri, 8 Mar 2024 09:35:59 -0500 Subject: [PATCH 4/7] remove min wait interval, to be addressed separately --- .../java/org/apache/hadoop/hbase/quotas/RateLimiter.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java index 273ab85838da..5c69ad5d6cd5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java @@ -35,7 +35,6 @@ + "are mostly synchronized...but to me it looks like they are totally synchronized") public abstract class RateLimiter { public static final String QUOTA_RATE_LIMITER_CONF_KEY = "hbase.quota.rate.limiter"; - private static final long QUOTA_RATE_LIMITER_MINIMUM_WAIT_INTERVAL_MS = 100L; private long tunit = 1000; // Timeunit factor for translating to ms. private long limit = Long.MAX_VALUE; // The max value available resource units can be refilled to. private long avail = Long.MAX_VALUE; // Currently available resource units @@ -218,11 +217,7 @@ public long waitInterval() { */ public synchronized long waitInterval(final long amount) { // TODO Handle over quota? - long waitInterval = (amount <= avail) ? 0 : getWaitInterval(getLimit(), avail, amount); - if (waitInterval > 0) { - return Math.max(waitInterval, QUOTA_RATE_LIMITER_MINIMUM_WAIT_INTERVAL_MS); - } - return waitInterval; + return (amount <= avail) ? 0 : getWaitInterval(getLimit(), avail, amount); } // These two method are for strictly testing purpose only From 6eac2c791d61c7f971ffad882cfbb0d2361e91d7 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Sun, 10 Mar 2024 10:59:06 -0400 Subject: [PATCH 5/7] DRY cleanup --- .../hbase/quotas/DefaultOperationQuota.java | 28 +++----- .../hbase/quotas/ExceedOperationQuota.java | 72 +++++++------------ .../hbase/quotas/NoopOperationQuota.java | 2 +- .../hadoop/hbase/quotas/OperationQuota.java | 2 +- .../quotas/RegionServerRpcQuotaManager.java | 2 +- 5 files changed, 37 insertions(+), 69 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index 5e37d9c39fe2..50b2b76839bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -91,44 +91,32 @@ public DefaultOperationQuota(final Configuration conf, final List } @Override - public void checkQuota(int numWrites, int numReads) throws RpcThrottlingException { + public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException { updateEstimateConsumeBatchQuota(numWrites, numReads); - - readAvailable = Long.MAX_VALUE; - for (final QuotaLimiter limiter : limiters) { - if (limiter.isBypass()) { - continue; - } - - limiter.checkQuota(numWrites, writeConsumed, numReads, readConsumed, - writeCapacityUnitConsumed, readCapacityUnitConsumed); - readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); - } - - for (final QuotaLimiter limiter : limiters) { - limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed, - readCapacityUnitConsumed); - } + checkQuota(numWrites, numReads); } @Override public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, long maxBlockBytesScanned) throws RpcThrottlingException { updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned); + checkQuota(0, 1); + } + private void checkQuota(long numWrites, long numReads) throws RpcThrottlingException { readAvailable = Long.MAX_VALUE; for (final QuotaLimiter limiter : limiters) { if (limiter.isBypass()) { continue; } - limiter.checkQuota(0, writeConsumed, 1, readConsumed, writeCapacityUnitConsumed, - readCapacityUnitConsumed); + limiter.checkQuota(numWrites, writeConsumed, numReads, readConsumed, + writeCapacityUnitConsumed, readCapacityUnitConsumed); readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); } for (final QuotaLimiter limiter : limiters) { - limiter.grabQuota(0, writeConsumed, 1, readConsumed, writeCapacityUnitConsumed, + limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed, readCapacityUnitConsumed); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java index 972584449f0c..009f8d22042d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java @@ -49,64 +49,40 @@ public ExceedOperationQuota(final Configuration conf, int blockSizeBytes, } @Override - public void checkQuota(int numWrites, int numReads) throws RpcThrottlingException { - if (regionServerLimiter.isBypass()) { - // If region server limiter is bypass, which means no region server quota is set, check and - // throttle by all other quotas. In this condition, exceed throttle quota will not work. - LOG.warn("Exceed throttle quota is enabled but no region server quotas found"); - super.checkQuota(numWrites, numReads); - } else { - // 1. Update estimate quota which will be consumed - updateEstimateConsumeBatchQuota(numWrites, numReads); - // 2. Check if region server limiter is enough. If not, throw RpcThrottlingException. - regionServerLimiter.checkQuota(numWrites, writeConsumed, numReads, readConsumed, - writeCapacityUnitConsumed, readCapacityUnitConsumed); - // 3. Check if other limiters are enough. If not, exceed other limiters because region server - // limiter is enough. - boolean exceed = false; - try { - super.checkQuota(numWrites, numReads); - } catch (RpcThrottlingException e) { - exceed = true; - if (LOG.isDebugEnabled()) { - LOG.debug("Read/Write requests num exceeds quota: writes:{} reads:{} scans:0, " - + "try use region server quota", numWrites, numReads); - } - } - // 4. Region server limiter is enough and grab estimated consume quota. - readAvailable = Math.max(readAvailable, regionServerLimiter.getReadAvailable()); - regionServerLimiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, - writeCapacityUnitConsumed, writeCapacityUnitConsumed); - if (exceed) { - // 5. Other quota limiter is exceeded and has not been grabbed (because throw - // RpcThrottlingException in Step 3), so grab it. - for (final QuotaLimiter limiter : limiters) { - limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, - writeCapacityUnitConsumed, writeCapacityUnitConsumed); - } - } - } + public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException { + Runnable estimateQuota = () -> updateEstimateConsumeBatchQuota(numWrites, numReads); + CheckQuotaRunnable checkQuota = () -> super.checkBatchQuota(numWrites, numReads); + checkQuota(estimateQuota, checkQuota, numWrites, numReads); } @Override public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, long maxBlockBytesScanned) throws RpcThrottlingException { + Runnable estimateQuota = + () -> updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned); + CheckQuotaRunnable checkQuota = + () -> super.checkScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned); + checkQuota(estimateQuota, checkQuota, 0, 1); + } + + private void checkQuota(Runnable estimateQuota, CheckQuotaRunnable checkQuota, int numWrites, + int numReads) throws RpcThrottlingException { if (regionServerLimiter.isBypass()) { // If region server limiter is bypass, which means no region server quota is set, check and // throttle by all other quotas. In this condition, exceed throttle quota will not work. LOG.warn("Exceed throttle quota is enabled but no region server quotas found"); - super.checkScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned); + checkQuota.run(); } else { // 1. Update estimate quota which will be consumed - updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned); + estimateQuota.run(); // 2. Check if region server limiter is enough. If not, throw RpcThrottlingException. - regionServerLimiter.checkQuota(0, writeConsumed, 1, readConsumed, writeCapacityUnitConsumed, - readCapacityUnitConsumed); + regionServerLimiter.checkQuota(numWrites, writeConsumed, numReads, readConsumed, + writeCapacityUnitConsumed, readCapacityUnitConsumed); // 3. Check if other limiters are enough. If not, exceed other limiters because region server // limiter is enough. boolean exceed = false; try { - super.checkScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned); + checkQuota.run(); } catch (RpcThrottlingException e) { exceed = true; if (LOG.isDebugEnabled()) { @@ -116,14 +92,14 @@ public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScanner } // 4. Region server limiter is enough and grab estimated consume quota. readAvailable = Math.max(readAvailable, regionServerLimiter.getReadAvailable()); - regionServerLimiter.grabQuota(0, writeConsumed, 1, readConsumed, writeCapacityUnitConsumed, - writeCapacityUnitConsumed); + regionServerLimiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, + writeCapacityUnitConsumed, writeCapacityUnitConsumed); if (exceed) { // 5. Other quota limiter is exceeded and has not been grabbed (because throw // RpcThrottlingException in Step 3), so grab it. for (final QuotaLimiter limiter : limiters) { - limiter.grabQuota(0, writeConsumed, 1, readConsumed, writeCapacityUnitConsumed, - writeCapacityUnitConsumed); + limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, + writeCapacityUnitConsumed, writeCapacityUnitConsumed); } } } @@ -139,4 +115,8 @@ public void close() { regionServerLimiter.consumeRead(readDiff, readCapacityUnitDiff); } } + + private interface CheckQuotaRunnable { + void run() throws RpcThrottlingException; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java index f5bb0827cdf4..ecb3388cf695 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java @@ -42,7 +42,7 @@ public static OperationQuota get() { } @Override - public void checkQuota(int numWrites, int numReads) throws RpcThrottlingException { + public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException { // no-op } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java index 4a9fac977700..4dc7b967f7d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java @@ -56,7 +56,7 @@ public enum OperationType { * @throws RpcThrottlingException if the operation cannot be performed because RPC quota is * exceeded. */ - void checkQuota(int numWrites, int numReads) throws RpcThrottlingException; + void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException; /** * Checks if it is possible to execute the scan. The quota will be estimated based on the diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java index 7ed95555c28f..19e56f6a0ec1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java @@ -267,7 +267,7 @@ private OperationQuota checkBatchQuota(final Region region, final int numWrites, OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes()); try { - quota.checkQuota(numWrites, numReads); + quota.checkBatchQuota(numWrites, numReads); } catch (RpcThrottlingException e) { LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table + " numWrites=" + numWrites + " numReads=" + numReads + ": " + e.getMessage()); From 02745009555f9b9adc78cf0ad8c04c333cd56b35 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Mon, 11 Mar 2024 15:43:10 -0400 Subject: [PATCH 6/7] consider growing/shrinking/flat scan trend in estimate --- .../hbase/quotas/DefaultOperationQuota.java | 68 ++++++---- .../hbase/quotas/ExceedOperationQuota.java | 10 +- .../hbase/quotas/NoopOperationQuota.java | 2 +- .../hadoop/hbase/quotas/OperationQuota.java | 11 +- .../quotas/RegionServerRpcQuotaManager.java | 17 ++- .../hbase/regionserver/RSRpcServices.java | 24 +++- .../quotas/TestDefaultOperationQuota.java | 128 ++++++++++++++++++ 7 files changed, 208 insertions(+), 52 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index 50b2b76839bb..2e26765a6a19 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -98,8 +98,9 @@ public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingExc @Override public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, - long maxBlockBytesScanned) throws RpcThrottlingException { - updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned); + long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException { + updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned, + prevBlockBytesScannedDifference); checkQuota(0, 1); } @@ -198,43 +199,52 @@ protected void updateEstimateConsumeBatchQuota(int numWrites, int numReads) { /** * Update estimate quota(read/write size/capacityUnits) which will be consumed - * @param scanRequest the scan to be executed - * @param maxScannerResultSize the maximum bytes to be returned by the scanner - * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the scanner + * @param scanRequest the scan to be executed + * @param maxScannerResultSize the maximum bytes to be returned by the scanner + * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the + * scanner + * @param prevBlockBytesScannedDifference the difference between BBS of the previous two next + * calls */ protected void updateEstimateConsumeScanQuota(ClientProtos.ScanRequest scanRequest, - long maxScannerResultSize, long maxBlockBytesScanned) { + long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) { if (useResultSizeBytes) { - readConsumed = estimateConsume(OperationType.GET, 1, 1000); + readConsumed = estimateConsume(OperationType.SCAN, 1, 1000); } else { - /* - * Estimating scan workload is more complicated, and if we severely underestimate workloads - * then throttled clients will exhaust retries too quickly, and could saturate the RPC layer. - * We have access to the ScanRequest's nextCallSeq number, the maxScannerResultSize, and the - * maxBlockBytesScanned by every relevant Scanner#next call. With these inputs we can make a - * more informed estimate about the scan's workload. - */ - long estimate; - if (scanRequest.getNextCallSeq() == 0) { - // start scanners with an optimistic 1 block IO estimate - // it is better to underestimate a large scan in the beginning - // than to overestimate, and block, a small scan - estimate = blockSizeBytes; - } else { - // scanner result sizes will be limited by quota availability, regardless of - // maxScannerResultSize. This means that we cannot safely assume that a long-running - // scan with a small maxBlockBytesScanned would not prefer to pull down - // a larger payload. So we should estimate with the assumption that long-running scans - // are appropriately configured to approach their maxScannerResultSize per RPC call - estimate = - Math.min(maxScannerResultSize, scanRequest.getNextCallSeq() * maxBlockBytesScanned); - } + long estimate = getScanReadConsumeEstimate(blockSizeBytes, scanRequest.getNextCallSeq(), + maxScannerResultSize, maxBlockBytesScanned, prevBlockBytesScannedDifference); readConsumed = Math.min(maxScanEstimate, estimate); } readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed); } + protected static long getScanReadConsumeEstimate(long blockSizeBytes, long nextCallSeq, + long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) { + /* + * Estimating scan workload is more complicated, and if we severely underestimate workloads then + * throttled clients will exhaust retries too quickly, and could saturate the RPC layer + */ + if (nextCallSeq == 0) { + // start scanners with an optimistic 1 block IO estimate + // it is better to underestimate a large scan in the beginning + // than to overestimate, and block, a small scan + return blockSizeBytes; + } + + boolean isWorkloadGrowing = prevBlockBytesScannedDifference > blockSizeBytes; + if (isWorkloadGrowing) { + // if nextCallSeq > 0 and the workload is growing then our estimate + // should consider that the workload may continue to increase + return Math.min(maxScannerResultSize, nextCallSeq * maxBlockBytesScanned); + } else { + // if nextCallSeq > 0 and the workload is shrinking or flat + // then our workload has likely plateaued. We can just rely on the existing + // maxBlockBytesScanned as our estimate in this case. + return maxBlockBytesScanned; + } + } + private long estimateConsume(final OperationType type, int numReqs, long avgSize) { if (numReqs > 0) { return avgSize * numReqs; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java index 009f8d22042d..cd271c82ff7c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java @@ -57,11 +57,11 @@ public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingExc @Override public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, - long maxBlockBytesScanned) throws RpcThrottlingException { - Runnable estimateQuota = - () -> updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned); - CheckQuotaRunnable checkQuota = - () -> super.checkScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned); + long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException { + Runnable estimateQuota = () -> updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, + maxBlockBytesScanned, prevBlockBytesScannedDifference); + CheckQuotaRunnable checkQuota = () -> super.checkScanQuota(scanRequest, maxScannerResultSize, + maxBlockBytesScanned, prevBlockBytesScannedDifference); checkQuota(estimateQuota, checkQuota, 0, 1); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java index ecb3388cf695..736560e6fd17 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java @@ -48,7 +48,7 @@ public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingExc @Override public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, - long maxBlockBytesScanned) throws RpcThrottlingException { + long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException { // no-op } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java index 4dc7b967f7d3..ef0a35fa5892 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java @@ -61,14 +61,17 @@ public enum OperationType { /** * Checks if it is possible to execute the scan. The quota will be estimated based on the * composition of the scan. - * @param scanRequest the given scan operation - * @param maxScannerResultSize the maximum bytes to be returned by the scanner - * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the scanner + * @param scanRequest the given scan operation + * @param maxScannerResultSize the maximum bytes to be returned by the scanner + * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the + * scanner + * @param prevBlockBytesScannedDifference the difference between BBS of the previous two next + * calls * @throws RpcThrottlingException if the operation cannot be performed because RPC quota is * exceeded. */ void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, - long maxBlockBytesScanned) throws RpcThrottlingException; + long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException; /** Cleanup method on operation completion */ void close(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java index 19e56f6a0ec1..92a0cfd5c135 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java @@ -159,16 +159,20 @@ public OperationQuota getQuota(final UserGroupInformation ugi, final TableName t * available quota and to report the data/usage of the operation. This method is specific to scans * because estimating a scan's workload is more complicated than estimating the workload of a * get/put. - * @param region the region where the operation will be performed - * @param scanRequest the scan to be estimated against the quota - * @param maxScannerResultSize the maximum bytes to be returned by the scanner - * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the scanner + * @param region the region where the operation will be performed + * @param scanRequest the scan to be estimated against the quota + * @param maxScannerResultSize the maximum bytes to be returned by the scanner + * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the + * scanner + * @param prevBlockBytesScannedDifference the difference between BBS of the previous two next + * calls * @return the OperationQuota * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. */ public OperationQuota checkScanQuota(final Region region, final ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, - long maxBlockBytesScanned) throws IOException, RpcThrottlingException { + long maxBlockBytesScanned, long prevBlockBytesScannedDifference) + throws IOException, RpcThrottlingException { Optional user = RpcServer.getRequestUser(); UserGroupInformation ugi; if (user.isPresent()) { @@ -181,7 +185,8 @@ public OperationQuota checkScanQuota(final Region region, OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes()); try { - quota.checkScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned); + quota.checkScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned, + prevBlockBytesScannedDifference); } catch (RpcThrottlingException e) { LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table + " scan=" + scanRequest.getScannerId() + ": " + e.getMessage()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index eac20e77901f..b8709db0da21 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -429,7 +429,9 @@ static final class RegionScannerHolder { private boolean fullRegionScan; private final String clientIPAndPort; private final String userName; - private final AtomicLong maxBlockBytesScanned = new AtomicLong(0); + private volatile long maxBlockBytesScanned = 0; + private volatile long prevBlockBytesScanned = 0; + private volatile long prevBlockBytesScannedDifference = 0; RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack, RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan, @@ -453,12 +455,20 @@ boolean incNextCallSeq(long currentSeq) { return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); } - synchronized long getMaxBlockBytesScanned() { - return maxBlockBytesScanned.get(); + long getMaxBlockBytesScanned() { + return maxBlockBytesScanned; } - synchronized void setMaxBlockBytesScanned(long blockBytesScanned) { - maxBlockBytesScanned.set(Math.max(getMaxBlockBytesScanned(), blockBytesScanned)); + long getPrevBlockBytesScannedDifference() { + return prevBlockBytesScannedDifference; + } + + void updateBlockBytesScanned(long blockBytesScanned) { + prevBlockBytesScannedDifference = blockBytesScanned - prevBlockBytesScanned; + prevBlockBytesScanned = blockBytesScanned; + if (blockBytesScanned > maxBlockBytesScanned) { + maxBlockBytesScanned = blockBytesScanned; + } } // Should be called only when we need to print lease expired messages otherwise @@ -3494,7 +3504,7 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan if (rpcCall != null) { responseCellSize = rpcCall.getResponseCellSize(); blockBytesScanned = rpcCall.getBlockBytesScanned(); - rsh.setMaxBlockBytesScanned(blockBytesScanned); + rsh.updateBlockBytesScanned(blockBytesScanned); } region.getMetrics().updateScan(); final MetricsRegionServer metricsRegionServer = server.getMetrics(); @@ -3599,7 +3609,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque OperationQuota quota; try { quota = getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize, - rsh.getMaxBlockBytesScanned()); + rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference()); } catch (IOException e) { addScannerLeaseBack(lease); throw new ServiceException(e); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java new file mode 100644 index 000000000000..4684be02d69d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestDefaultOperationQuota { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestDefaultOperationQuota.class); + + @Test + public void testScanEstimateNewScanner() { + long blockSize = 64 * 1024; + long nextCallSeq = 0; + long maxScannerResultSize = 100 * 1024 * 1024; + long maxBlockBytesScanned = 0; + long prevBBSDifference = 0; + long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, + maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); + + // new scanner should estimate scan read as 1 block + assertEquals(blockSize, estimate); + } + + @Test + public void testScanEstimateSecondNextCall() { + long blockSize = 64 * 1024; + long nextCallSeq = 1; + long maxScannerResultSize = 100 * 1024 * 1024; + long maxBlockBytesScanned = 10 * blockSize; + long prevBBSDifference = 10 * blockSize; + long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, + maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); + + // 2nd next call should be estimated at maxBBS + assertEquals(maxBlockBytesScanned, estimate); + } + + @Test + public void testScanEstimateFlatWorkload() { + long blockSize = 64 * 1024; + long nextCallSeq = 100; + long maxScannerResultSize = 100 * 1024 * 1024; + long maxBlockBytesScanned = 10 * blockSize; + long prevBBSDifference = 0; + long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, + maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); + + // flat workload should not overestimate + assertEquals(maxBlockBytesScanned, estimate); + } + + @Test + public void testScanEstimateVariableFlatWorkload() { + long blockSize = 64 * 1024; + long nextCallSeq = 1; + long maxScannerResultSize = 100 * 1024 * 1024; + long maxBlockBytesScanned = 10 * blockSize; + long prevBBSDifference = 0; + for (int i = 0; i < 100; i++) { + long variation = Math.round(Math.random() * blockSize); + if (variation % 2 == 0) { + variation *= -1; + } + // despite +/- <1 block variation, we consider this workload flat + prevBBSDifference = variation; + + long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq + i, + maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); + + // flat workload should not overestimate + assertEquals(maxBlockBytesScanned, estimate); + } + } + + @Test + public void testScanEstimateGrowingWorkload() { + long blockSize = 64 * 1024; + long nextCallSeq = 100; + long maxScannerResultSize = 100 * 1024 * 1024; + long maxBlockBytesScanned = 20 * blockSize; + long prevBBSDifference = 10 * blockSize; + long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, + maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); + + // growing workload should overestimate + assertTrue(nextCallSeq * maxBlockBytesScanned == estimate || maxScannerResultSize == estimate); + } + + @Test + public void testScanEstimateShrinkingWorkload() { + long blockSize = 64 * 1024; + long nextCallSeq = 100; + long maxScannerResultSize = 100 * 1024 * 1024; + long maxBlockBytesScanned = 20 * blockSize; + long prevBBSDifference = -10 * blockSize; + long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, + maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); + + // shrinking workload should only shrink estimate to maxBBS + assertEquals(maxBlockBytesScanned, estimate); + } +} From 324855cfc2afd70c8eaf94050b7f005738a1672d Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Wed, 13 Mar 2024 16:13:10 -0500 Subject: [PATCH 7/7] Update debug line --- .../hbase/quotas/ExceedOperationQuota.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java index cd271c82ff7c..3077d6dac537 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java @@ -52,7 +52,7 @@ public ExceedOperationQuota(final Configuration conf, int blockSizeBytes, public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException { Runnable estimateQuota = () -> updateEstimateConsumeBatchQuota(numWrites, numReads); CheckQuotaRunnable checkQuota = () -> super.checkBatchQuota(numWrites, numReads); - checkQuota(estimateQuota, checkQuota, numWrites, numReads); + checkQuota(estimateQuota, checkQuota, numWrites, numReads, 0); } @Override @@ -62,11 +62,11 @@ public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScanner maxBlockBytesScanned, prevBlockBytesScannedDifference); CheckQuotaRunnable checkQuota = () -> super.checkScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned, prevBlockBytesScannedDifference); - checkQuota(estimateQuota, checkQuota, 0, 1); + checkQuota(estimateQuota, checkQuota, 0, 0, 1); } private void checkQuota(Runnable estimateQuota, CheckQuotaRunnable checkQuota, int numWrites, - int numReads) throws RpcThrottlingException { + int numReads, int numScans) throws RpcThrottlingException { if (regionServerLimiter.isBypass()) { // If region server limiter is bypass, which means no region server quota is set, check and // throttle by all other quotas. In this condition, exceed throttle quota will not work. @@ -76,7 +76,7 @@ private void checkQuota(Runnable estimateQuota, CheckQuotaRunnable checkQuota, i // 1. Update estimate quota which will be consumed estimateQuota.run(); // 2. Check if region server limiter is enough. If not, throw RpcThrottlingException. - regionServerLimiter.checkQuota(numWrites, writeConsumed, numReads, readConsumed, + regionServerLimiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, writeCapacityUnitConsumed, readCapacityUnitConsumed); // 3. Check if other limiters are enough. If not, exceed other limiters because region server // limiter is enough. @@ -86,19 +86,19 @@ private void checkQuota(Runnable estimateQuota, CheckQuotaRunnable checkQuota, i } catch (RpcThrottlingException e) { exceed = true; if (LOG.isDebugEnabled()) { - LOG.debug("Read/Write requests num exceeds quota: writes:0 reads:0, scans:1, " - + "try use region server quota"); + LOG.debug("Read/Write requests num exceeds quota: writes:{} reads:{}, scans:{}, " + + "try use region server quota", numWrites, numReads, numScans); } } // 4. Region server limiter is enough and grab estimated consume quota. readAvailable = Math.max(readAvailable, regionServerLimiter.getReadAvailable()); - regionServerLimiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, + regionServerLimiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, writeCapacityUnitConsumed, writeCapacityUnitConsumed); if (exceed) { // 5. Other quota limiter is exceeded and has not been grabbed (because throw // RpcThrottlingException in Step 3), so grab it. for (final QuotaLimiter limiter : limiters) { - limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, + limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, writeCapacityUnitConsumed, writeCapacityUnitConsumed); } }