Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,17 @@ public enum Type {
ReadSizeExceeded,
RequestCapacityUnitExceeded,
ReadCapacityUnitExceeded,
WriteCapacityUnitExceeded
WriteCapacityUnitExceeded,
AtomicRequestNumberExceeded,
AtomicReadSizeExceeded,
AtomicWriteSizeExceeded,
}

private static final String[] MSG_TYPE =
new String[] { "number of requests exceeded", "request size limit exceeded",
"number of read requests exceeded", "number of write requests exceeded",
"write size limit exceeded", "read size limit exceeded", "request capacity unit exceeded",
"read capacity unit exceeded", "write capacity unit exceeded" };
private static final String[] MSG_TYPE = new String[] { "number of requests exceeded",
"request size limit exceeded", "number of read requests exceeded",
"number of write requests exceeded", "write size limit exceeded", "read size limit exceeded",
"request capacity unit exceeded", "read capacity unit exceeded", "write capacity unit exceeded",
"atomic request number exceeded", "atomic read size exceeded", "atomic write size exceeded" };

private static final String MSG_WAIT = " - wait ";

Expand Down Expand Up @@ -127,6 +130,21 @@ public static void throwWriteCapacityUnitExceeded(final long waitInterval)
throwThrottlingException(Type.WriteCapacityUnitExceeded, waitInterval);
}

public static void throwAtomicRequestNumberExceeded(final long waitInterval)
throws RpcThrottlingException {
throwThrottlingException(Type.AtomicRequestNumberExceeded, waitInterval);
}

public static void throwAtomicReadSizeExceeded(final long waitInterval)
throws RpcThrottlingException {
throwThrottlingException(Type.AtomicReadSizeExceeded, waitInterval);
}

public static void throwAtomicWriteSizeExceeded(final long waitInterval)
throws RpcThrottlingException {
throwThrottlingException(Type.AtomicWriteSizeExceeded, waitInterval);
}

private static void throwThrottlingException(final Type type, final long waitInterval)
throws RpcThrottlingException {
String msg = MSG_TYPE[type.ordinal()] + MSG_WAIT + stringFromMillis(waitInterval);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,14 @@ public String toString() {
case REQUEST_NUMBER:
case WRITE_NUMBER:
case READ_NUMBER:
case ATOMIC_REQUEST_NUMBER:
builder.append(String.format("%dreq", timedQuota.getSoftLimit()));
break;
case REQUEST_SIZE:
case WRITE_SIZE:
case READ_SIZE:
case ATOMIC_READ_SIZE:
case ATOMIC_WRITE_SIZE:
builder.append(sizeToString(timedQuota.getSoftLimit()));
break;
case REQUEST_CAPACITY_UNIT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,13 @@ public enum ThrottleType {

/** Throttling based on the read data capacity unit */
READ_CAPACITY_UNIT,

/** Throttling based on the IO footprint of an atomic request */
ATOMIC_READ_SIZE,

/** Throttling based on the number of atomic requests per time-unit */
ATOMIC_REQUEST_NUMBER,

/** Throttling based on the size of atomic write requests */
ATOMIC_WRITE_SIZE,
}
Original file line number Diff line number Diff line change
Expand Up @@ -2433,6 +2433,12 @@ public static ThrottleType toThrottleType(final QuotaProtos.ThrottleType proto)
return ThrottleType.READ_CAPACITY_UNIT;
case WRITE_CAPACITY_UNIT:
return ThrottleType.WRITE_CAPACITY_UNIT;
case ATOMIC_READ_SIZE:
return ThrottleType.ATOMIC_READ_SIZE;
case ATOMIC_REQUEST_NUMBER:
return ThrottleType.ATOMIC_REQUEST_NUMBER;
case ATOMIC_WRITE_SIZE:
return ThrottleType.ATOMIC_WRITE_SIZE;
default:
throw new RuntimeException("Invalid ThrottleType " + proto);
}
Expand Down Expand Up @@ -2462,6 +2468,12 @@ public static QuotaProtos.ThrottleType toProtoThrottleType(final ThrottleType ty
return QuotaProtos.ThrottleType.READ_CAPACITY_UNIT;
case WRITE_CAPACITY_UNIT:
return QuotaProtos.ThrottleType.WRITE_CAPACITY_UNIT;
case ATOMIC_READ_SIZE:
return QuotaProtos.ThrottleType.ATOMIC_READ_SIZE;
case ATOMIC_REQUEST_NUMBER:
return QuotaProtos.ThrottleType.ATOMIC_REQUEST_NUMBER;
case ATOMIC_WRITE_SIZE:
return QuotaProtos.ThrottleType.ATOMIC_WRITE_SIZE;
default:
throw new RuntimeException("Invalid ThrottleType " + type);
}
Expand Down
7 changes: 7 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/Quota.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ enum ThrottleType {
REQUEST_CAPACITY_UNIT = 7;
WRITE_CAPACITY_UNIT = 8;
READ_CAPACITY_UNIT = 9;
ATOMIC_READ_SIZE = 10;
ATOMIC_REQUEST_NUMBER = 11;
ATOMIC_WRITE_SIZE = 12;
}

message Throttle {
Expand All @@ -64,6 +67,10 @@ message Throttle {
optional TimedQuota req_capacity_unit = 7;
optional TimedQuota write_capacity_unit = 8;
optional TimedQuota read_capacity_unit = 9;

optional TimedQuota atomic_read_size = 10;
optional TimedQuota atomic_req_num = 11;
optional TimedQuota atomic_write_size = 12;
}

message ThrottleRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class DefaultOperationQuota implements OperationQuota {
private boolean useResultSizeBytes;
private long blockSizeBytes;
private long maxScanEstimate;
private boolean isAtomic = false;

public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes,
final QuotaLimiter... limiters) {
Expand Down Expand Up @@ -92,20 +93,26 @@ public DefaultOperationQuota(final Configuration conf, final List<QuotaLimiter>
}

@Override
public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException {
public void checkBatchQuota(int numWrites, int numReads, boolean isAtomic)
throws RpcThrottlingException {
updateEstimateConsumeBatchQuota(numWrites, numReads);
checkQuota(numWrites, numReads);
checkQuota(numWrites, numReads, isAtomic);
}

@Override
public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException {
updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned,
prevBlockBytesScannedDifference);
checkQuota(0, 1);
checkQuota(0, 1, false);
}

private void checkQuota(long numWrites, long numReads) throws RpcThrottlingException {
private void checkQuota(long numWrites, long numReads, boolean isAtomic)
throws RpcThrottlingException {
if (isAtomic) {
// Remember this flag for later use in close()
this.isAtomic = true;
}
readAvailable = Long.MAX_VALUE;
for (final QuotaLimiter limiter : limiters) {
if (limiter.isBypass()) {
Expand All @@ -121,13 +128,13 @@ private void checkQuota(long numWrites, long numReads) throws RpcThrottlingExcep
limiter.checkQuota(Math.min(maxWritesToEstimate, numWrites),
Math.min(maxWriteSizeToEstimate, writeConsumed), Math.min(maxReadsToEstimate, numReads),
Math.min(maxReadSizeToEstimate, readConsumed), writeCapacityUnitConsumed,
readCapacityUnitConsumed);
readCapacityUnitConsumed, isAtomic);
readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
}

for (final QuotaLimiter limiter : limiters) {
limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed,
readCapacityUnitConsumed);
readCapacityUnitConsumed, isAtomic);
}
}

Expand All @@ -154,10 +161,10 @@ public void close() {

for (final QuotaLimiter limiter : limiters) {
if (writeDiff != 0) {
limiter.consumeWrite(writeDiff, writeCapacityUnitDiff);
limiter.consumeWrite(writeDiff, writeCapacityUnitDiff, isAtomic);
}
if (readDiff != 0) {
limiter.consumeRead(readDiff, readCapacityUnitDiff);
limiter.consumeRead(readDiff, readCapacityUnitDiff, isAtomic);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ public ExceedOperationQuota(final Configuration conf, int blockSizeBytes,
}

@Override
public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException {
public void checkBatchQuota(int numWrites, int numReads, boolean isAtomic)
throws RpcThrottlingException {
Runnable estimateQuota = () -> updateEstimateConsumeBatchQuota(numWrites, numReads);
CheckQuotaRunnable checkQuota = () -> super.checkBatchQuota(numWrites, numReads);
checkQuota(estimateQuota, checkQuota, numWrites, numReads, 0);
CheckQuotaRunnable checkQuota = () -> super.checkBatchQuota(numWrites, numReads, isAtomic);
checkQuota(estimateQuota, checkQuota, numWrites, numReads, 0, isAtomic);
}

@Override
Expand All @@ -62,11 +63,11 @@ public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScanner
maxBlockBytesScanned, prevBlockBytesScannedDifference);
CheckQuotaRunnable checkQuota = () -> super.checkScanQuota(scanRequest, maxScannerResultSize,
maxBlockBytesScanned, prevBlockBytesScannedDifference);
checkQuota(estimateQuota, checkQuota, 0, 0, 1);
checkQuota(estimateQuota, checkQuota, 0, 0, 1, false);
}

private void checkQuota(Runnable estimateQuota, CheckQuotaRunnable checkQuota, int numWrites,
int numReads, int numScans) throws RpcThrottlingException {
int numReads, int numScans, boolean isAtomic) throws RpcThrottlingException {
if (regionServerLimiter.isBypass()) {
// If region server limiter is bypass, which means no region server quota is set, check and
// throttle by all other quotas. In this condition, exceed throttle quota will not work.
Expand All @@ -77,7 +78,7 @@ private void checkQuota(Runnable estimateQuota, CheckQuotaRunnable checkQuota, i
estimateQuota.run();
// 2. Check if region server limiter is enough. If not, throw RpcThrottlingException.
regionServerLimiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
writeCapacityUnitConsumed, readCapacityUnitConsumed);
writeCapacityUnitConsumed, readCapacityUnitConsumed, isAtomic);
// 3. Check if other limiters are enough. If not, exceed other limiters because region server
// limiter is enough.
boolean exceed = false;
Expand All @@ -93,13 +94,13 @@ private void checkQuota(Runnable estimateQuota, CheckQuotaRunnable checkQuota, i
// 4. Region server limiter is enough and grab estimated consume quota.
readAvailable = Math.max(readAvailable, regionServerLimiter.getReadAvailable());
regionServerLimiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
writeCapacityUnitConsumed, writeCapacityUnitConsumed);
writeCapacityUnitConsumed, writeCapacityUnitConsumed, isAtomic);
if (exceed) {
// 5. Other quota limiter is exceeded and has not been grabbed (because throw
// RpcThrottlingException in Step 3), so grab it.
for (final QuotaLimiter limiter : limiters) {
limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
writeCapacityUnitConsumed, writeCapacityUnitConsumed);
writeCapacityUnitConsumed, writeCapacityUnitConsumed, isAtomic);
}
}
}
Expand All @@ -109,10 +110,10 @@ private void checkQuota(Runnable estimateQuota, CheckQuotaRunnable checkQuota, i
public void close() {
super.close();
if (writeDiff != 0) {
regionServerLimiter.consumeWrite(writeDiff, writeCapacityUnitDiff);
regionServerLimiter.consumeWrite(writeDiff, writeCapacityUnitDiff, false);
}
if (readDiff != 0) {
regionServerLimiter.consumeRead(readDiff, readCapacityUnitDiff);
regionServerLimiter.consumeRead(readDiff, readCapacityUnitDiff, false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,21 @@ private boolean hasThrottle(QuotaProtos.ThrottleType quotaType,
hasThrottle = true;
}
break;
case ATOMIC_READ_SIZE:
if (throttleBuilder.hasAtomicReadSize()) {
hasThrottle = true;
}
break;
case ATOMIC_REQUEST_NUMBER:
if (throttleBuilder.hasAtomicReqNum()) {
hasThrottle = true;
}
break;
case ATOMIC_WRITE_SIZE:
if (throttleBuilder.hasAtomicWriteSize()) {
hasThrottle = true;
}
break;
default:
}
return hasThrottle;
Expand Down Expand Up @@ -212,6 +227,15 @@ protected GlobalQuotaSettingsImpl merge(QuotaSettings other) throws IOException
case WRITE_CAPACITY_UNIT:
throttleBuilder.clearWriteCapacityUnit();
break;
case ATOMIC_READ_SIZE:
throttleBuilder.clearAtomicReadSize();
break;
case ATOMIC_REQUEST_NUMBER:
throttleBuilder.clearAtomicReqNum();
break;
case ATOMIC_WRITE_SIZE:
throttleBuilder.clearAtomicWriteSize();
break;
default:
}
boolean hasThrottle = false;
Expand Down Expand Up @@ -262,6 +286,15 @@ protected GlobalQuotaSettingsImpl merge(QuotaSettings other) throws IOException
case WRITE_CAPACITY_UNIT:
throttleBuilder.setWriteCapacityUnit(otherProto.getTimedQuota());
break;
case ATOMIC_READ_SIZE:
throttleBuilder.setAtomicReadSize(otherProto.getTimedQuota());
break;
case ATOMIC_REQUEST_NUMBER:
throttleBuilder.setAtomicReqNum(otherProto.getTimedQuota());
break;
case ATOMIC_WRITE_SIZE:
throttleBuilder.setAtomicWriteSize(otherProto.getTimedQuota());
break;
default:
}
}
Expand Down Expand Up @@ -341,11 +374,14 @@ public String toString() {
case REQUEST_NUMBER:
case WRITE_NUMBER:
case READ_NUMBER:
case ATOMIC_REQUEST_NUMBER:
builder.append(String.format("%dreq", timedQuota.getSoftLimit()));
break;
case REQUEST_SIZE:
case WRITE_SIZE:
case READ_SIZE:
case ATOMIC_READ_SIZE:
case ATOMIC_WRITE_SIZE:
builder.append(sizeToString(timedQuota.getSoftLimit()));
break;
case REQUEST_CAPACITY_UNIT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public static OperationQuota get() {
}

@Override
public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException {
public void checkBatchQuota(int numWrites, int numReads, boolean isAtomic)
throws RpcThrottlingException {
// no-op
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,24 @@ private NoopQuotaLimiter() {

@Override
public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit)
throws RpcThrottlingException {
long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit,
boolean isAtomic) throws RpcThrottlingException {
// no-op
}

@Override
public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
long writeCapacityUnit, long readCapacityUnit) {
long writeCapacityUnit, long readCapacityUnit, boolean isAtomic) {
// no-op
}

@Override
public void consumeWrite(final long size, long capacityUnit) {
public void consumeWrite(final long size, long capacityUnit, boolean isAtomic) {
// no-op
}

@Override
public void consumeRead(final long size, long capacityUnit) {
public void consumeRead(final long size, long capacityUnit, boolean isAtomic) {
// no-op
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public enum OperationType {
* @throws RpcThrottlingException if the operation cannot be performed because RPC quota is
* exceeded.
*/
void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException;
void checkBatchQuota(int numWrites, int numReads, boolean isAtomic) throws RpcThrottlingException;

/**
* Checks if it is possible to execute the scan. The quota will be estimated based on the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ void triggerCacheRefresh() {
refreshChore.triggerNow();
}

void forceSynchronousCacheRefresh() {
refreshChore.chore();
}

long getLastUpdate() {
return refreshChore.lastUpdate;
}
Expand Down
Loading