From feb4b55a9f0056ed54ffeeec41e687b1c3baa03b Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Wed, 9 Apr 2025 08:36:10 -0400 Subject: [PATCH] HBASE-29229 Throttles should support specific restrictions for atomic workloads (#6866) (#6882) Signed-off-by: Nick Dimiduk Co-authored-by: Ray Mattingly --- .../hbase/quotas/RpcThrottlingException.java | 30 +++- .../hadoop/hbase/quotas/ThrottleSettings.java | 3 + .../hadoop/hbase/quotas/ThrottleType.java | 9 + .../hbase/shaded/protobuf/ProtobufUtil.java | 12 ++ .../src/main/protobuf/Quota.proto | 7 + .../hbase/quotas/DefaultOperationQuota.java | 23 ++- .../hbase/quotas/ExceedOperationQuota.java | 21 +-- .../hbase/quotas/GlobalQuotaSettingsImpl.java | 36 ++++ .../hbase/quotas/NoopOperationQuota.java | 3 +- .../hadoop/hbase/quotas/NoopQuotaLimiter.java | 10 +- .../hadoop/hbase/quotas/OperationQuota.java | 2 +- .../hadoop/hbase/quotas/QuotaCache.java | 4 + .../hadoop/hbase/quotas/QuotaLimiter.java | 9 +- .../apache/hadoop/hbase/quotas/QuotaUtil.java | 12 ++ .../quotas/RegionServerRpcQuotaManager.java | 17 +- .../hadoop/hbase/quotas/RpcQuotaManager.java | 2 +- .../hadoop/hbase/quotas/TimeBasedLimiter.java | 80 ++++++++- .../regionserver/RegionCoprocessorHost.java | 2 +- .../hbase/quotas/TestAtomicReadQuota.java | 163 ++++++++++++++---- .../hbase/quotas/TestDefaultAtomicQuota.java | 160 +++++++++++++++++ .../quotas/TestDefaultOperationQuota.java | 42 ++--- .../hbase/quotas/TestNoopOperationQuota.java | 3 +- .../hadoop/hbase/quotas/TestQuotaAdmin.java | 8 + .../hadoop/hbase/quotas/TestQuotaState.java | 8 +- .../hbase/quotas/ThrottleQuotaTestUtil.java | 20 ++- .../regionserver/TestScannerLeaseCount.java | 4 +- 26 files changed, 576 insertions(+), 114 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultAtomicQuota.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java index 2c1f13e94e66..dfa8eacb13b9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java @@ -40,14 +40,17 @@ public enum Type { ReadSizeExceeded, RequestCapacityUnitExceeded, ReadCapacityUnitExceeded, - WriteCapacityUnitExceeded + WriteCapacityUnitExceeded, + AtomicRequestNumberExceeded, + AtomicReadSizeExceeded, + AtomicWriteSizeExceeded, } - private static final String[] MSG_TYPE = - new String[] { "number of requests exceeded", "request size limit exceeded", - "number of read requests exceeded", "number of write requests exceeded", - "write size limit exceeded", "read size limit exceeded", "request capacity unit exceeded", - "read capacity unit exceeded", "write capacity unit exceeded" }; + private static final String[] MSG_TYPE = new String[] { "number of requests exceeded", + "request size limit exceeded", "number of read requests exceeded", + "number of write requests exceeded", "write size limit exceeded", "read size limit exceeded", + "request capacity unit exceeded", "read capacity unit exceeded", "write capacity unit exceeded", + "atomic request number exceeded", "atomic read size exceeded", "atomic write size exceeded" }; private static final String MSG_WAIT = " - wait "; @@ -127,6 +130,21 @@ public static void throwWriteCapacityUnitExceeded(final long waitInterval) throwThrottlingException(Type.WriteCapacityUnitExceeded, waitInterval); } + public static void throwAtomicRequestNumberExceeded(final long waitInterval) + throws RpcThrottlingException { + throwThrottlingException(Type.AtomicRequestNumberExceeded, waitInterval); + } + + public static void throwAtomicReadSizeExceeded(final long waitInterval) + throws RpcThrottlingException { + throwThrottlingException(Type.AtomicReadSizeExceeded, waitInterval); + } + + public static void throwAtomicWriteSizeExceeded(final long waitInterval) + throws RpcThrottlingException { + throwThrottlingException(Type.AtomicWriteSizeExceeded, waitInterval); + } + private static void throwThrottlingException(final Type type, final long waitInterval) throws RpcThrottlingException { String msg = MSG_TYPE[type.ordinal()] + MSG_WAIT + stringFromMillis(waitInterval); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java index 01dfc3709ae6..efde451c1222 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java @@ -93,11 +93,14 @@ public String toString() { case REQUEST_NUMBER: case WRITE_NUMBER: case READ_NUMBER: + case ATOMIC_REQUEST_NUMBER: builder.append(String.format("%dreq", timedQuota.getSoftLimit())); break; case REQUEST_SIZE: case WRITE_SIZE: case READ_SIZE: + case ATOMIC_READ_SIZE: + case ATOMIC_WRITE_SIZE: builder.append(sizeToString(timedQuota.getSoftLimit())); break; case REQUEST_CAPACITY_UNIT: diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java index 80827dafe6d5..2c5a25acc2c4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java @@ -50,4 +50,13 @@ public enum ThrottleType { /** Throttling based on the read data capacity unit */ READ_CAPACITY_UNIT, + + /** Throttling based on the IO footprint of an atomic request */ + ATOMIC_READ_SIZE, + + /** Throttling based on the number of atomic requests per time-unit */ + ATOMIC_REQUEST_NUMBER, + + /** Throttling based on the size of atomic write requests */ + ATOMIC_WRITE_SIZE, } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index c104fdcfa33a..46eb86aeb336 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -2433,6 +2433,12 @@ public static ThrottleType toThrottleType(final QuotaProtos.ThrottleType proto) return ThrottleType.READ_CAPACITY_UNIT; case WRITE_CAPACITY_UNIT: return ThrottleType.WRITE_CAPACITY_UNIT; + case ATOMIC_READ_SIZE: + return ThrottleType.ATOMIC_READ_SIZE; + case ATOMIC_REQUEST_NUMBER: + return ThrottleType.ATOMIC_REQUEST_NUMBER; + case ATOMIC_WRITE_SIZE: + return ThrottleType.ATOMIC_WRITE_SIZE; default: throw new RuntimeException("Invalid ThrottleType " + proto); } @@ -2462,6 +2468,12 @@ public static QuotaProtos.ThrottleType toProtoThrottleType(final ThrottleType ty return QuotaProtos.ThrottleType.READ_CAPACITY_UNIT; case WRITE_CAPACITY_UNIT: return QuotaProtos.ThrottleType.WRITE_CAPACITY_UNIT; + case ATOMIC_READ_SIZE: + return QuotaProtos.ThrottleType.ATOMIC_READ_SIZE; + case ATOMIC_REQUEST_NUMBER: + return QuotaProtos.ThrottleType.ATOMIC_REQUEST_NUMBER; + case ATOMIC_WRITE_SIZE: + return QuotaProtos.ThrottleType.ATOMIC_WRITE_SIZE; default: throw new RuntimeException("Invalid ThrottleType " + type); } diff --git a/hbase-protocol-shaded/src/main/protobuf/Quota.proto b/hbase-protocol-shaded/src/main/protobuf/Quota.proto index 5b00d74980b5..e524e015b625 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Quota.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Quota.proto @@ -49,6 +49,9 @@ enum ThrottleType { REQUEST_CAPACITY_UNIT = 7; WRITE_CAPACITY_UNIT = 8; READ_CAPACITY_UNIT = 9; + ATOMIC_READ_SIZE = 10; + ATOMIC_REQUEST_NUMBER = 11; + ATOMIC_WRITE_SIZE = 12; } message Throttle { @@ -64,6 +67,10 @@ message Throttle { optional TimedQuota req_capacity_unit = 7; optional TimedQuota write_capacity_unit = 8; optional TimedQuota read_capacity_unit = 9; + + optional TimedQuota atomic_read_size = 10; + optional TimedQuota atomic_req_num = 11; + optional TimedQuota atomic_write_size = 12; } message ThrottleRequest { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index 29c3667fb352..f153eca2e5a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -62,6 +62,7 @@ public class DefaultOperationQuota implements OperationQuota { private boolean useResultSizeBytes; private long blockSizeBytes; private long maxScanEstimate; + private boolean isAtomic = false; public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes, final QuotaLimiter... limiters) { @@ -92,9 +93,10 @@ public DefaultOperationQuota(final Configuration conf, final List } @Override - public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException { + public void checkBatchQuota(int numWrites, int numReads, boolean isAtomic) + throws RpcThrottlingException { updateEstimateConsumeBatchQuota(numWrites, numReads); - checkQuota(numWrites, numReads); + checkQuota(numWrites, numReads, isAtomic); } @Override @@ -102,10 +104,15 @@ public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScanner long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException { updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned, prevBlockBytesScannedDifference); - checkQuota(0, 1); + checkQuota(0, 1, false); } - private void checkQuota(long numWrites, long numReads) throws RpcThrottlingException { + private void checkQuota(long numWrites, long numReads, boolean isAtomic) + throws RpcThrottlingException { + if (isAtomic) { + // Remember this flag for later use in close() + this.isAtomic = true; + } readAvailable = Long.MAX_VALUE; for (final QuotaLimiter limiter : limiters) { if (limiter.isBypass()) { @@ -121,13 +128,13 @@ private void checkQuota(long numWrites, long numReads) throws RpcThrottlingExcep limiter.checkQuota(Math.min(maxWritesToEstimate, numWrites), Math.min(maxWriteSizeToEstimate, writeConsumed), Math.min(maxReadsToEstimate, numReads), Math.min(maxReadSizeToEstimate, readConsumed), writeCapacityUnitConsumed, - readCapacityUnitConsumed); + readCapacityUnitConsumed, isAtomic); readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); } for (final QuotaLimiter limiter : limiters) { limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed, - readCapacityUnitConsumed); + readCapacityUnitConsumed, isAtomic); } } @@ -154,10 +161,10 @@ public void close() { for (final QuotaLimiter limiter : limiters) { if (writeDiff != 0) { - limiter.consumeWrite(writeDiff, writeCapacityUnitDiff); + limiter.consumeWrite(writeDiff, writeCapacityUnitDiff, isAtomic); } if (readDiff != 0) { - limiter.consumeRead(readDiff, readCapacityUnitDiff); + limiter.consumeRead(readDiff, readCapacityUnitDiff, isAtomic); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java index 3077d6dac537..7dcfec6b0623 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java @@ -49,10 +49,11 @@ public ExceedOperationQuota(final Configuration conf, int blockSizeBytes, } @Override - public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException { + public void checkBatchQuota(int numWrites, int numReads, boolean isAtomic) + throws RpcThrottlingException { Runnable estimateQuota = () -> updateEstimateConsumeBatchQuota(numWrites, numReads); - CheckQuotaRunnable checkQuota = () -> super.checkBatchQuota(numWrites, numReads); - checkQuota(estimateQuota, checkQuota, numWrites, numReads, 0); + CheckQuotaRunnable checkQuota = () -> super.checkBatchQuota(numWrites, numReads, isAtomic); + checkQuota(estimateQuota, checkQuota, numWrites, numReads, 0, isAtomic); } @Override @@ -62,11 +63,11 @@ public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScanner maxBlockBytesScanned, prevBlockBytesScannedDifference); CheckQuotaRunnable checkQuota = () -> super.checkScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned, prevBlockBytesScannedDifference); - checkQuota(estimateQuota, checkQuota, 0, 0, 1); + checkQuota(estimateQuota, checkQuota, 0, 0, 1, false); } private void checkQuota(Runnable estimateQuota, CheckQuotaRunnable checkQuota, int numWrites, - int numReads, int numScans) throws RpcThrottlingException { + int numReads, int numScans, boolean isAtomic) throws RpcThrottlingException { if (regionServerLimiter.isBypass()) { // If region server limiter is bypass, which means no region server quota is set, check and // throttle by all other quotas. In this condition, exceed throttle quota will not work. @@ -77,7 +78,7 @@ private void checkQuota(Runnable estimateQuota, CheckQuotaRunnable checkQuota, i estimateQuota.run(); // 2. Check if region server limiter is enough. If not, throw RpcThrottlingException. regionServerLimiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, - writeCapacityUnitConsumed, readCapacityUnitConsumed); + writeCapacityUnitConsumed, readCapacityUnitConsumed, isAtomic); // 3. Check if other limiters are enough. If not, exceed other limiters because region server // limiter is enough. boolean exceed = false; @@ -93,13 +94,13 @@ private void checkQuota(Runnable estimateQuota, CheckQuotaRunnable checkQuota, i // 4. Region server limiter is enough and grab estimated consume quota. readAvailable = Math.max(readAvailable, regionServerLimiter.getReadAvailable()); regionServerLimiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, - writeCapacityUnitConsumed, writeCapacityUnitConsumed); + writeCapacityUnitConsumed, writeCapacityUnitConsumed, isAtomic); if (exceed) { // 5. Other quota limiter is exceeded and has not been grabbed (because throw // RpcThrottlingException in Step 3), so grab it. for (final QuotaLimiter limiter : limiters) { limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, - writeCapacityUnitConsumed, writeCapacityUnitConsumed); + writeCapacityUnitConsumed, writeCapacityUnitConsumed, isAtomic); } } } @@ -109,10 +110,10 @@ private void checkQuota(Runnable estimateQuota, CheckQuotaRunnable checkQuota, i public void close() { super.close(); if (writeDiff != 0) { - regionServerLimiter.consumeWrite(writeDiff, writeCapacityUnitDiff); + regionServerLimiter.consumeWrite(writeDiff, writeCapacityUnitDiff, false); } if (readDiff != 0) { - regionServerLimiter.consumeRead(readDiff, readCapacityUnitDiff); + regionServerLimiter.consumeRead(readDiff, readCapacityUnitDiff, false); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java index ebde3ed80dc9..6afbebc6e861 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java @@ -159,6 +159,21 @@ private boolean hasThrottle(QuotaProtos.ThrottleType quotaType, hasThrottle = true; } break; + case ATOMIC_READ_SIZE: + if (throttleBuilder.hasAtomicReadSize()) { + hasThrottle = true; + } + break; + case ATOMIC_REQUEST_NUMBER: + if (throttleBuilder.hasAtomicReqNum()) { + hasThrottle = true; + } + break; + case ATOMIC_WRITE_SIZE: + if (throttleBuilder.hasAtomicWriteSize()) { + hasThrottle = true; + } + break; default: } return hasThrottle; @@ -212,6 +227,15 @@ protected GlobalQuotaSettingsImpl merge(QuotaSettings other) throws IOException case WRITE_CAPACITY_UNIT: throttleBuilder.clearWriteCapacityUnit(); break; + case ATOMIC_READ_SIZE: + throttleBuilder.clearAtomicReadSize(); + break; + case ATOMIC_REQUEST_NUMBER: + throttleBuilder.clearAtomicReqNum(); + break; + case ATOMIC_WRITE_SIZE: + throttleBuilder.clearAtomicWriteSize(); + break; default: } boolean hasThrottle = false; @@ -262,6 +286,15 @@ protected GlobalQuotaSettingsImpl merge(QuotaSettings other) throws IOException case WRITE_CAPACITY_UNIT: throttleBuilder.setWriteCapacityUnit(otherProto.getTimedQuota()); break; + case ATOMIC_READ_SIZE: + throttleBuilder.setAtomicReadSize(otherProto.getTimedQuota()); + break; + case ATOMIC_REQUEST_NUMBER: + throttleBuilder.setAtomicReqNum(otherProto.getTimedQuota()); + break; + case ATOMIC_WRITE_SIZE: + throttleBuilder.setAtomicWriteSize(otherProto.getTimedQuota()); + break; default: } } @@ -341,11 +374,14 @@ public String toString() { case REQUEST_NUMBER: case WRITE_NUMBER: case READ_NUMBER: + case ATOMIC_REQUEST_NUMBER: builder.append(String.format("%dreq", timedQuota.getSoftLimit())); break; case REQUEST_SIZE: case WRITE_SIZE: case READ_SIZE: + case ATOMIC_READ_SIZE: + case ATOMIC_WRITE_SIZE: builder.append(sizeToString(timedQuota.getSoftLimit())); break; case REQUEST_CAPACITY_UNIT: diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java index 63cf97188d86..9143e12de004 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java @@ -43,7 +43,8 @@ public static OperationQuota get() { } @Override - public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException { + public void checkBatchQuota(int numWrites, int numReads, boolean isAtomic) + throws RpcThrottlingException { // no-op } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java index 5ece0be2b5aa..7c02dbc1134f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java @@ -34,24 +34,24 @@ private NoopQuotaLimiter() { @Override public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, - long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit) - throws RpcThrottlingException { + long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit, + boolean isAtomic) throws RpcThrottlingException { // no-op } @Override public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize, - long writeCapacityUnit, long readCapacityUnit) { + long writeCapacityUnit, long readCapacityUnit, boolean isAtomic) { // no-op } @Override - public void consumeWrite(final long size, long capacityUnit) { + public void consumeWrite(final long size, long capacityUnit, boolean isAtomic) { // no-op } @Override - public void consumeRead(final long size, long capacityUnit) { + public void consumeRead(final long size, long capacityUnit, boolean isAtomic) { // no-op } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java index 0d9b48b6074b..b95a617e127f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java @@ -57,7 +57,7 @@ public enum OperationType { * @throws RpcThrottlingException if the operation cannot be performed because RPC quota is * exceeded. */ - void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException; + void checkBatchQuota(int numWrites, int numReads, boolean isAtomic) throws RpcThrottlingException; /** * Checks if it is possible to execute the scan. The quota will be estimated based on the diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java index 760703a428b2..cecda2a154c6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java @@ -218,6 +218,10 @@ void triggerCacheRefresh() { refreshChore.triggerNow(); } + void forceSynchronousCacheRefresh() { + refreshChore.chore(); + } + long getLastUpdate() { return refreshChore.lastUpdate; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java index 12e4c4a7c6a9..1b5a1302a207 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java @@ -42,7 +42,8 @@ public interface QuotaLimiter { * @throws RpcThrottlingException thrown if not enough available resources to perform operation. */ void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize, - long estimateWriteCapacityUnit, long estimateReadCapacityUnit) throws RpcThrottlingException; + long estimateWriteCapacityUnit, long estimateReadCapacityUnit, boolean isAtomic) + throws RpcThrottlingException; /** * Removes the specified write and read amount from the quota. At this point the write and read @@ -56,19 +57,19 @@ void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long esti * @param readCapacityUnit the read capacity unit num that will be removed from the current quota */ void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize, - long writeCapacityUnit, long readCapacityUnit); + long writeCapacityUnit, long readCapacityUnit, boolean isAtomic); /** * Removes or add back some write amount to the quota. (called at the end of an operation in case * the estimate quota was off) */ - void consumeWrite(long size, long capacityUnit); + void consumeWrite(long size, long capacityUnit, boolean isAtomic); /** * Removes or add back some read amount to the quota. (called at the end of an operation in case * the estimate quota was off) */ - void consumeRead(long size, long capacityUnit); + void consumeRead(long size, long capacityUnit, boolean isAtomic); /** Returns true if the limiter is a noop */ boolean isBypass(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java index b4887392196d..ba65cec01d7e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java @@ -95,6 +95,12 @@ public class QuotaUtil extends QuotaTableUtil { "hbase.quota.default.user.machine.write.num"; public static final String QUOTA_DEFAULT_USER_MACHINE_WRITE_SIZE = "hbase.quota.default.user.machine.write.size"; + public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_READ_SIZE = + "hbase.quota.default.user.machine.atomic.read.size"; + public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_REQUEST_NUM = + "hbase.quota.default.user.machine.atomic.request.num"; + public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_WRITE_SIZE = + "hbase.quota.default.user.machine.atomic.write.size"; /** Table descriptor for Quota internal table */ public static final HTableDescriptor QUOTA_TABLE_DESC = new HTableDescriptor(QUOTA_TABLE_NAME); @@ -388,6 +394,12 @@ protected static UserQuotaState buildDefaultUserQuotaState(Configuration conf, l .ifPresent(throttleBuilder::setWriteNum); buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_WRITE_SIZE) .ifPresent(throttleBuilder::setWriteSize); + buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_READ_SIZE) + .ifPresent(throttleBuilder::setAtomicReadSize); + buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_REQUEST_NUM) + .ifPresent(throttleBuilder::setAtomicReqNum); + buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_WRITE_SIZE) + .ifPresent(throttleBuilder::setAtomicWriteSize); UserQuotaState state = new UserQuotaState(nowTs); QuotaProtos.Quotas defaultQuotas = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java index f9a7ccba401b..d847a9eb3dc2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java @@ -186,11 +186,11 @@ public OperationQuota checkBatchQuota(final Region region, final OperationQuota.OperationType type) throws IOException, RpcThrottlingException { switch (type) { case GET: - return this.checkBatchQuota(region, 0, 1); + return this.checkBatchQuota(region, 0, 1, false); case MUTATE: - return this.checkBatchQuota(region, 1, 0); + return this.checkBatchQuota(region, 1, 0, false); case CHECK_AND_MUTATE: - return this.checkBatchQuota(region, 1, 1); + return this.checkBatchQuota(region, 1, 1, true); } throw new RuntimeException("Invalid operation type: " + type); } @@ -201,6 +201,7 @@ public OperationQuota checkBatchQuota(final Region region, throws IOException, RpcThrottlingException { int numWrites = 0; int numReads = 0; + boolean isAtomic = false; for (final ClientProtos.Action action : actions) { if (action.hasMutation()) { numWrites++; @@ -208,12 +209,16 @@ public OperationQuota checkBatchQuota(final Region region, QuotaUtil.getQuotaOperationType(action, hasCondition); if (operationType == OperationQuota.OperationType.CHECK_AND_MUTATE) { numReads++; + // If any mutations in this batch are atomic, we will count the entire batch as atomic. + // This is a conservative approach, but it is the best that we can do without knowing + // the block bytes scanned of each individual action. + isAtomic = true; } } else if (action.hasGet()) { numReads++; } } - return checkBatchQuota(region, numWrites, numReads); + return checkBatchQuota(region, numWrites, numReads, isAtomic); } /** @@ -227,7 +232,7 @@ public OperationQuota checkBatchQuota(final Region region, */ @Override public OperationQuota checkBatchQuota(final Region region, final int numWrites, - final int numReads) throws IOException, RpcThrottlingException { + final int numReads, boolean isAtomic) throws IOException, RpcThrottlingException { Optional user = RpcServer.getRequestUser(); UserGroupInformation ugi; if (user.isPresent()) { @@ -240,7 +245,7 @@ public OperationQuota checkBatchQuota(final Region region, final int numWrites, OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes()); try { - quota.checkBatchQuota(numWrites, numReads); + quota.checkBatchQuota(numWrites, numReads, isAtomic); } catch (RpcThrottlingException e) { LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table + " numWrites=" + numWrites + " numReads=" + numReads + ": " + e.getMessage()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcQuotaManager.java index 60392ca3b3f6..3f84f11a7e5e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcQuotaManager.java @@ -87,6 +87,6 @@ OperationQuota checkBatchQuota(final Region region, final List 0) { RpcThrottlingException.throwNumRequestsExceeded(waitInterval); @@ -156,6 +184,12 @@ public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, if (waitInterval > 0) { RpcThrottlingException.throwRequestCapacityUnitExceeded(waitInterval); } + if (isAtomic) { + waitInterval = atomicReqLimiter.getWaitIntervalMs(writeReqs + readReqs); + if (waitInterval > 0) { + RpcThrottlingException.throwAtomicRequestNumberExceeded(waitInterval); + } + } if (estimateWriteSize > 0) { waitInterval = writeReqsLimiter.getWaitIntervalMs(writeReqs); @@ -170,6 +204,12 @@ public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, if (waitInterval > 0) { RpcThrottlingException.throwWriteCapacityUnitExceeded(waitInterval); } + if (isAtomic) { + waitInterval = atomicWriteSizeLimiter.getWaitIntervalMs(writeReqs); + if (waitInterval > 0) { + RpcThrottlingException.throwAtomicWriteSizeExceeded(waitInterval); + } + } } if (estimateReadSize > 0) { @@ -185,12 +225,18 @@ public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, if (waitInterval > 0) { RpcThrottlingException.throwReadCapacityUnitExceeded(waitInterval); } + if (isAtomic) { + waitInterval = atomicReadSizeLimiter.getWaitIntervalMs(writeReqs + readReqs); + if (waitInterval > 0) { + RpcThrottlingException.throwAtomicReadSizeExceeded(waitInterval); + } + } } } @Override public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize, - long writeCapacityUnit, long readCapacityUnit) { + long writeCapacityUnit, long readCapacityUnit, boolean isAtomic) { assert writeSize != 0 || readSize != 0; reqsLimiter.consume(writeReqs + readReqs); @@ -212,22 +258,37 @@ public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSi reqCapacityUnitLimiter.consume(readCapacityUnit); readCapacityUnitLimiter.consume(readCapacityUnit); } + if (isAtomic) { + atomicReqLimiter.consume(writeReqs + readReqs); + if (readSize > 0) { + atomicReadSizeLimiter.consume(readSize); + } + if (writeSize > 0) { + atomicWriteSizeLimiter.consume(writeSize); + } + } } @Override - public void consumeWrite(final long size, long capacityUnit) { + public void consumeWrite(final long size, long capacityUnit, boolean isAtomic) { reqSizeLimiter.consume(size); writeSizeLimiter.consume(size); reqCapacityUnitLimiter.consume(capacityUnit); writeCapacityUnitLimiter.consume(capacityUnit); + if (isAtomic) { + atomicWriteSizeLimiter.consume(size); + } } @Override - public void consumeRead(final long size, long capacityUnit) { + public void consumeRead(final long size, long capacityUnit, boolean isAtomic) { reqSizeLimiter.consume(size); readSizeLimiter.consume(size); reqCapacityUnitLimiter.consume(capacityUnit); readCapacityUnitLimiter.consume(capacityUnit); + if (isAtomic) { + atomicReadSizeLimiter.consume(size); + } } @Override @@ -307,6 +368,15 @@ public String toString() { if (!readCapacityUnitLimiter.isBypass()) { builder.append(" readCapacityUnit=" + readCapacityUnitLimiter); } + if (!atomicReqLimiter.isBypass()) { + builder.append(" atomicReqLimiter=" + atomicReqLimiter); + } + if (!atomicReadSizeLimiter.isBypass()) { + builder.append(" atomicReadSizeLimiter=" + atomicReadSizeLimiter); + } + if (!atomicWriteSizeLimiter.isBypass()) { + builder.append(" atomicWriteSizeLimiter=" + atomicWriteSizeLimiter); + } builder.append(')'); return builder.toString(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 929b24e521a2..52b3b54f4b24 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -229,7 +229,7 @@ public OperationQuota checkBatchQuota(Region region, OperationQuota.OperationTyp @Override public OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads) throws IOException, RpcThrottlingException { - return rpcQuotaManager.checkBatchQuota(region, numWrites, numReads); + return rpcQuotaManager.checkBatchQuota(region, numWrites, numReads, false); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java index f2beb8f5d27f..12bbc26d364a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java @@ -28,8 +28,10 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Table; @@ -81,7 +83,7 @@ public static void setUpBeforeClass() throws Exception { @Test public void testIncrementCountedAgainstReadCapacity() throws Exception { - setupQuota(); + setupGenericQuota(); Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString())); inc.addColumn(FAMILY, QUALIFIER, 1); @@ -90,7 +92,7 @@ public void testIncrementCountedAgainstReadCapacity() throws Exception { @Test public void testConditionalRowMutationsCountedAgainstReadCapacity() throws Exception { - setupQuota(); + setupGenericQuota(); byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); Increment inc = new Increment(row); @@ -106,7 +108,7 @@ public void testConditionalRowMutationsCountedAgainstReadCapacity() throws Excep @Test public void testNonConditionalRowMutationsOmittedFromReadCapacity() throws Exception { - setupQuota(); + setupGenericQuota(); byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); Put put = new Put(row); @@ -123,44 +125,19 @@ public void testNonConditionalRowMutationsOmittedFromReadCapacity() throws Excep @Test public void testNonAtomicPutOmittedFromReadCapacity() throws Exception { - setupQuota(); - - byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); - Put put = new Put(row); - put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); - try (Table table = getTable()) { - for (int i = 0; i < 100; i++) { - table.put(put); - } - } + setupGenericQuota(); + runNonAtomicPuts(); } @Test public void testNonAtomicMultiPutOmittedFromReadCapacity() throws Exception { - setupQuota(); - - Put put1 = new Put(Bytes.toBytes(UUID.randomUUID().toString())); - put1.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); - Put put2 = new Put(Bytes.toBytes(UUID.randomUUID().toString())); - put2.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); - - Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString())); - inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); - - List puts = new ArrayList<>(2); - puts.add(put1); - puts.add(put2); - - try (Table table = getTable()) { - for (int i = 0; i < 100; i++) { - table.put(puts); - } - } + setupGenericQuota(); + runNonAtomicPuts(); } @Test public void testCheckAndMutateCountedAgainstReadCapacity() throws Exception { - setupQuota(); + setupGenericQuota(); byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); byte[] value = Bytes.toBytes("v"); @@ -174,7 +151,49 @@ public void testCheckAndMutateCountedAgainstReadCapacity() throws Exception { @Test public void testAtomicBatchCountedAgainstReadCapacity() throws Exception { - setupQuota(); + setupGenericQuota(); + + byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); + Increment inc = new Increment(row); + inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); + + List incs = new ArrayList<>(2); + incs.add(inc); + incs.add(inc); + + testThrottle(table -> { + List results = new ArrayList<>(incs.size()); + for (Increment increment : incs) { + results.add(table.increment(increment)); + } + return results; + }); + } + + @Test + public void testAtomicBatchCountedAgainstAtomicOnlyReqNum() throws Exception { + setupAtomicOnlyReqNumQuota(); + + byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); + Increment inc = new Increment(row); + inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); + + List incs = new ArrayList<>(2); + incs.add(inc); + incs.add(inc); + + testThrottle(table -> { + List results = new ArrayList<>(incs.size()); + for (Increment increment : incs) { + results.add(table.increment(increment)); + } + return results; + }); + } + + @Test + public void testAtomicBatchCountedAgainstAtomicOnlyReadSize() throws Exception { + setupAtomicOnlyReadSizeQuota(); byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); Increment inc = new Increment(row); @@ -185,13 +204,67 @@ public void testAtomicBatchCountedAgainstReadCapacity() throws Exception { incs.add(inc); testThrottle(table -> { - Object[] results = new Object[incs.size()]; - table.batch(incs, results); + List results = new ArrayList<>(incs.size()); + for (Increment increment : incs) { + results.add(table.increment(increment)); + } return results; }); } - private void setupQuota() throws Exception { + @Test + public void testNonAtomicWritesIgnoredByAtomicOnlyReqNum() throws Exception { + setupAtomicOnlyReqNumQuota(); + runNonAtomicPuts(); + } + + @Test + public void testNonAtomicWritesIgnoredByAtomicOnlyReadSize() throws Exception { + setupAtomicOnlyReadSizeQuota(); + runNonAtomicPuts(); + } + + @Test + public void testNonAtomicReadsIgnoredByAtomicOnlyReqNum() throws Exception { + setupAtomicOnlyReqNumQuota(); + runNonAtomicReads(); + } + + @Test + public void testNonAtomicReadsIgnoredByAtomicOnlyReadSize() throws Exception { + setupAtomicOnlyReadSizeQuota(); + runNonAtomicReads(); + } + + private void runNonAtomicPuts() throws Exception { + Put put1 = new Put(Bytes.toBytes(UUID.randomUUID().toString())); + put1.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); + Put put2 = new Put(Bytes.toBytes(UUID.randomUUID().toString())); + put2.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); + + Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString())); + inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); + + List puts = new ArrayList<>(2); + puts.add(put1); + puts.add(put2); + + try (Table table = getTable()) { + for (int i = 0; i < 100; i++) { + table.put(puts); + } + } + } + + private void runNonAtomicReads() throws Exception { + try (Table table = getTable()) { + byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); + Get get = new Get(row); + table.get(get); + } + } + + private void setupGenericQuota() throws Exception { try (Admin admin = TEST_UTIL.getAdmin()) { admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(), ThrottleType.READ_NUMBER, 1, TimeUnit.MINUTES)); @@ -199,6 +272,22 @@ private void setupQuota() throws Exception { ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); } + private void setupAtomicOnlyReqNumQuota() throws Exception { + try (Admin admin = TEST_UTIL.getAdmin()) { + admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(), + ThrottleType.ATOMIC_REQUEST_NUMBER, 1, TimeUnit.MINUTES)); + } + ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + } + + private void setupAtomicOnlyReadSizeQuota() throws Exception { + try (Admin admin = TEST_UTIL.getAdmin()) { + admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(), + ThrottleType.ATOMIC_READ_SIZE, 1, TimeUnit.MINUTES)); + } + ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + } + private void cleanupQuota() throws Exception { try (Admin admin = TEST_UTIL.getAdmin()) { admin.setQuota(QuotaSettingsFactory.unthrottleUser(User.getCurrent().getShortName())); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultAtomicQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultAtomicQuota.java new file mode 100644 index 000000000000..966bce6bcdb9 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultAtomicQuota.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerUserCacheRefresh; +import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.waitMinuteQuota; + +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestDefaultAtomicQuota { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestDefaultAtomicQuota.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final TableName TABLE_NAME = TableName.valueOf(UUID.randomUUID().toString()); + private static final int REFRESH_TIME = 5; + private static final byte[] FAMILY = Bytes.toBytes("cf"); + private static final byte[] QUALIFIER = Bytes.toBytes("q"); + + @AfterClass + public static void tearDown() throws Exception { + ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL); + EnvironmentEdgeManager.reset(); + TEST_UTIL.deleteTable(TABLE_NAME); + TEST_UTIL.shutdownMiniCluster(); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // quotas enabled, using block bytes scanned + TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); + TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, REFRESH_TIME); + TEST_UTIL.getConfiguration().setInt(QuotaUtil.QUOTA_DEFAULT_USER_MACHINE_ATOMIC_READ_SIZE, 1); + TEST_UTIL.getConfiguration().setInt(QuotaUtil.QUOTA_DEFAULT_USER_MACHINE_ATOMIC_REQUEST_NUM, 1); + TEST_UTIL.getConfiguration().setInt(QuotaUtil.QUOTA_DEFAULT_USER_MACHINE_ATOMIC_WRITE_SIZE, 1); + + // don't cache blocks to make IO predictable + TEST_UTIL.getConfiguration().setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); + + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + QuotaCache.TEST_FORCE_REFRESH = true; + TEST_UTIL.flush(TABLE_NAME); + } + + @Test + public void testDefaultAtomicReadLimits() throws Exception { + // No write throttling + configureLenientThrottle(ThrottleType.ATOMIC_WRITE_SIZE); + refreshQuotas(); + + // Should have a strict throttle by default + TEST_UTIL.waitFor(60_000, () -> runIncTest(100) < 100); + + // Add big quota and should be effectively unlimited + configureLenientThrottle(ThrottleType.ATOMIC_READ_SIZE); + configureLenientThrottle(ThrottleType.ATOMIC_REQUEST_NUMBER); + refreshQuotas(); + // Should run without error + TEST_UTIL.waitFor(60_000, () -> runIncTest(100) == 100); + + // Remove all the limits, and should revert to strict default + unsetQuota(); + TEST_UTIL.waitFor(60_000, () -> runIncTest(100) < 100); + } + + @Test + public void testDefaultAtomicWriteLimits() throws Exception { + // No read throttling + configureLenientThrottle(ThrottleType.ATOMIC_REQUEST_NUMBER); + configureLenientThrottle(ThrottleType.ATOMIC_READ_SIZE); + refreshQuotas(); + + // Should have a strict throttle by default + TEST_UTIL.waitFor(60_000, () -> runIncTest(100) < 100); + + // Add big quota and should be effectively unlimited + configureLenientThrottle(ThrottleType.ATOMIC_WRITE_SIZE); + refreshQuotas(); + // Should run without error + TEST_UTIL.waitFor(60_000, () -> runIncTest(100) == 100); + + // Remove all the limits, and should revert to strict default + unsetQuota(); + TEST_UTIL.waitFor(60_000, () -> runIncTest(100) < 100); + } + + private void configureLenientThrottle(ThrottleType throttleType) throws IOException { + try (Admin admin = TEST_UTIL.getAdmin()) { + admin.setQuota( + QuotaSettingsFactory.throttleUser(getUserName(), throttleType, 100_000, TimeUnit.SECONDS)); + } + } + + private static String getUserName() throws IOException { + return User.getCurrent().getShortName(); + } + + private void refreshQuotas() throws Exception { + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + waitMinuteQuota(); + } + + private void unsetQuota() throws Exception { + try (Admin admin = TEST_UTIL.getAdmin()) { + admin.setQuota(QuotaSettingsFactory.unthrottleUser(getUserName())); + } + refreshQuotas(); + } + + private long runIncTest(int attempts) throws Exception { + refreshQuotas(); + try (Table table = getTable()) { + return ThrottleQuotaTestUtil.doIncrements(attempts, FAMILY, QUALIFIER, table); + } + } + + private Table getTable() throws IOException { + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 100); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + return TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).setOperationTimeout(250) + .build(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java index a6b7ba6fee59..beeab8aef5c4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java @@ -153,14 +153,14 @@ public void testLargeBatchSaturatesReadNumLimit() DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); // use the whole limit - quota.checkBatchQuota(0, limit); + quota.checkBatchQuota(0, limit, false); // the next request should be rejected - assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false)); envEdge.incValue(1000); // after the TimeUnit, the limit should be refilled - quota.checkBatchQuota(0, limit); + quota.checkBatchQuota(0, limit, false); } @Test @@ -174,14 +174,14 @@ public void testLargeBatchSaturatesReadWriteLimit() DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); // use the whole limit - quota.checkBatchQuota(limit, 0); + quota.checkBatchQuota(limit, 0, false); // the next request should be rejected - assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0)); + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0, false)); envEdge.incValue(1000); // after the TimeUnit, the limit should be refilled - quota.checkBatchQuota(limit, 0); + quota.checkBatchQuota(limit, 0, false); } @Test @@ -195,14 +195,14 @@ public void testTooLargeReadBatchIsNotBlocked() DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); // use more than the limit, which should succeed rather than being indefinitely blocked - quota.checkBatchQuota(0, 10 + limit); + quota.checkBatchQuota(0, 10 + limit, false); // the next request should be blocked - assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false)); envEdge.incValue(1000); // even after the TimeUnit, the limit should not be refilled because we oversubscribed - assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, limit)); + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, limit, false)); } @Test @@ -216,14 +216,14 @@ public void testTooLargeWriteBatchIsNotBlocked() DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); // use more than the limit, which should succeed rather than being indefinitely blocked - quota.checkBatchQuota(10 + limit, 0); + quota.checkBatchQuota(10 + limit, 0, false); // the next request should be blocked - assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0)); + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0, false)); envEdge.incValue(1000); // even after the TimeUnit, the limit should not be refilled because we oversubscribed - assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0)); + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0, false)); } @Test @@ -237,14 +237,14 @@ public void testTooLargeWriteSizeIsNotBlocked() DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); // writes are estimated a 100 bytes, so this will use 2x the limit but should not be blocked - quota.checkBatchQuota(1, 0); + quota.checkBatchQuota(1, 0, false); // the next request should be blocked - assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0)); + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0, false)); envEdge.incValue(1000); // even after the TimeUnit, the limit should not be refilled because we oversubscribed - assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0)); + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0, false)); } @Test @@ -260,14 +260,14 @@ public void testTooLargeReadSizeIsNotBlocked() new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter); // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked - quota.checkBatchQuota(0, 1); + quota.checkBatchQuota(0, 1, false); // the next request should be blocked - assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false)); envEdge.incValue(1000); // even after the TimeUnit, the limit should not be refilled because we oversubscribed - assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1)); + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1, false)); } @Test @@ -283,13 +283,13 @@ public void testTooLargeRequestSizeIsNotBlocked() new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter); // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked - quota.checkBatchQuota(0, 1); + quota.checkBatchQuota(0, 1, false); // the next request should be blocked - assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false)); envEdge.incValue(1000); // even after the TimeUnit, the limit should not be refilled because we oversubscribed - assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1)); + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1, false)); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNoopOperationQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNoopOperationQuota.java index ad2b79075a31..7fd686de94b8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNoopOperationQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNoopOperationQuota.java @@ -28,7 +28,8 @@ public class TestNoopOperationQuota implements OperationQuota { public static final TestNoopOperationQuota INSTANCE = new TestNoopOperationQuota(); @Override - public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException { + public void checkBatchQuota(int numWrites, int numReads, boolean isAtomic) + throws RpcThrottlingException { } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java index 5b560129ecea..ac037909bbc8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java @@ -773,6 +773,14 @@ private void assertRPCQuota(ThrottleType type, long limit, TimeUnit tu, QuotaSco assertTrue(rpcQuota.hasWriteCapacityUnit()); t = rpcQuota.getWriteCapacityUnit(); break; + case ATOMIC_READ_SIZE: + assertTrue(rpcQuota.hasAtomicReadSize()); + t = rpcQuota.getAtomicReadSize(); + break; + case ATOMIC_REQUEST_NUMBER: + assertTrue(rpcQuota.hasAtomicReqNum()); + t = rpcQuota.getAtomicReqNum(); + break; default: } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java index cbd40f7bd81c..d64b1002b1e5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java @@ -224,7 +224,7 @@ public void testTableThrottleWithBatch() { assertFalse(quotaInfo.isBypass()); QuotaLimiter limiter = quotaInfo.getTableLimiter(TABLE_A); try { - limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0, 1, 0); + limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0, 1, 0, false); fail("Should have thrown RpcThrottlingException"); } catch (RpcThrottlingException e) { // expected @@ -241,7 +241,7 @@ private Quotas buildReqNumThrottle(final long limit) { private void assertThrottleException(final QuotaLimiter limiter, final int availReqs) { assertNoThrottleException(limiter, availReqs); try { - limiter.checkQuota(1, 1, 0, 0, 1, 0); + limiter.checkQuota(1, 1, 0, 0, 1, 0, false); fail("Should have thrown RpcThrottlingException"); } catch (RpcThrottlingException e) { // expected @@ -251,11 +251,11 @@ private void assertThrottleException(final QuotaLimiter limiter, final int avail private void assertNoThrottleException(final QuotaLimiter limiter, final int availReqs) { for (int i = 0; i < availReqs; ++i) { try { - limiter.checkQuota(1, 1, 0, 0, 1, 0); + limiter.checkQuota(1, 1, 0, 0, 1, 0, false); } catch (RpcThrottlingException e) { fail("Unexpected RpcThrottlingException after " + i + " requests. limit=" + availReqs); } - limiter.grabQuota(1, 1, 0, 0, 1, 0); + limiter.grabQuota(1, 1, 0, 0, 1, 0, false); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java index adfc46bb4a57..b343799b89db 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -129,6 +130,23 @@ static long doGets(int maxOps, byte[] family, byte[] qualifier, final Table... t return count; } + static long doIncrements(int maxOps, byte[] family, byte[] qualifier, final Table... tables) { + int count = 0; + try { + while (count < maxOps) { + Increment inc = new Increment(Bytes.toBytes("row-" + count)); + inc.addColumn(family, qualifier, 1L); + for (final Table table : tables) { + table.increment(inc); + } + count += tables.length; + } + } catch (IOException e) { + LOG.error("increment failed after nRetries=" + count, e); + } + return count; + } + static long doMultiGets(int maxOps, int batchSize, int rowCount, byte[] family, byte[] qualifier, final Table... tables) { int opCount = 0; @@ -202,7 +220,7 @@ private static void triggerCacheRefresh(HBaseTestingUtility testUtil, boolean by RegionServerRpcQuotaManager quotaManager = rst.getRegionServer().getRegionServerRpcQuotaManager(); QuotaCache quotaCache = quotaManager.getQuotaCache(); - quotaCache.triggerCacheRefresh(); + quotaCache.forceSynchronousCacheRefresh(); Thread.sleep(250); testUtil.waitFor(60000, 250, new ExplainingPredicate() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerLeaseCount.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerLeaseCount.java index cf99c53e1d9f..fc7387b48069 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerLeaseCount.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerLeaseCount.java @@ -199,8 +199,8 @@ public OperationQuota checkBatchQuota(Region region, List a } @Override - public OperationQuota checkBatchQuota(Region region, int numWrites, int numReads) - throws IOException, RpcThrottlingException { + public OperationQuota checkBatchQuota(Region region, int numWrites, int numReads, + boolean isAtomic) throws IOException, RpcThrottlingException { if (SHOULD_THROW) { throw EX; }