diff --git a/README.md b/README.md index 4f48e93bc8d..7243d78bc47 100644 --- a/README.md +++ b/README.md @@ -136,6 +136,30 @@ The following includes ThreadPool related parameters, which can be passed in thr - RawKV default backoff in milliseconds - default: 20000 (20 seconds) +#### tikv.rawkv.read_timeout_in_ms +- RawKV read timeout in milliseconds. This parameter controls the timeout of `get` `getKeyTTL`. +- default: 2000 (2 seconds) + +#### tikv.rawkv.write_timeout_in_ms +- RawKV write timeout in milliseconds. This parameter controls the timeout of `put` `putAtomic` `putIfAbsent` `delete` `deleteAtomic`. +- default: 2000 (2 seconds) + +#### tikv.rawkv.batch_read_timeout_in_ms +- RawKV batch read timeout in milliseconds. This parameter controls the timeout of `batchGet`. +- default: 2000 (2 seconds) + +#### tikv.rawkv.batch_write_timeout_in_ms +- RawKV batch write timeout in milliseconds. This parameter controls the timeout of `batchPut` `batchDelete` `batchDeleteAtomic`. +- default: 2000 (2 seconds) + +#### tikv.rawkv.scan_timeout_in_ms +- RawKV scan timeout in milliseconds. This parameter controls the timeout of `batchScan` `scan` `scanPrefix`. +- default: 10000 (10 seconds) + +#### tikv.rawkv.clean_timeout_in_ms +- RawKV clean timeout in milliseconds. This parameter controls the timeout of `deleteRange` `deletePrefix`. +- default: 600000 (10 minutes) + ## Metrics Client Java supports exporting metrics to Prometheus using poll mode and viewing on Grafana. The following steps shows how to enable this function. diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index 33c17f11c7b..33c1d4baea7 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -56,6 +56,14 @@ public class ConfigUtils { "tikv.health_check_period_duration"; public static final String TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS = "tikv.rawkv.default_backoff_in_ms"; + public static final String TIKV_RAWKV_READ_TIMEOUT_IN_MS = "tikv.rawkv.read_timeout_in_ms"; + public static final String TIKV_RAWKV_WRITE_TIMEOUT_IN_MS = "tikv.rawkv.write_timeout_in_ms"; + public static final String TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS = + "tikv.rawkv.batch_read_timeout_in_ms"; + public static final String TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS = + "tikv.rawkv.batch_write_timeout_in_ms"; + public static final String TIKV_RAWKV_SCAN_TIMEOUT_IN_MS = "tikv.rawkv.scan_timeout_in_ms"; + public static final String TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS = "tikv.rawkv.clean_timeout_in_ms"; public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379"; public static final String DEF_TIMEOUT = "200ms"; @@ -91,6 +99,13 @@ public class ConfigUtils { public static final int DEF_TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS = BackOffer.RAWKV_MAX_BACKOFF; + public static final int DEF_TIKV_RAWKV_READ_TIMEOUT_IN_MS = 2000; + public static final int DEF_TIKV_RAWKV_WRITE_TIMEOUT_IN_MS = 2000; + public static final int DEF_TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS = 2000; + public static final int DEF_TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS = 2000; + public static final int DEF_TIKV_RAWKV_SCAN_TIMEOUT_IN_MS = 10000; + public static final int DEF_TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS = 600000; + public static final String NORMAL_COMMAND_PRIORITY = "NORMAL"; public static final String LOW_COMMAND_PRIORITY = "LOW"; public static final String HIGH_COMMAND_PRIORITY = "HIGH"; diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 57d4b11877b..fa74e65eabc 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -82,6 +82,12 @@ private static void loadFromDefaultProperties() { setIfMissing(TIKV_GRPC_HEALTH_CHECK_TIMEOUT, DEF_CHECK_HEALTH_TIMEOUT); setIfMissing(TIKV_HEALTH_CHECK_PERIOD_DURATION, DEF_HEALTH_CHECK_PERIOD_DURATION); setIfMissing(TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS, DEF_TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS); + setIfMissing(TIKV_RAWKV_READ_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_READ_TIMEOUT_IN_MS); + setIfMissing(TIKV_RAWKV_WRITE_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_WRITE_TIMEOUT_IN_MS); + setIfMissing(TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS); + setIfMissing(TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS); + setIfMissing(TIKV_RAWKV_SCAN_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_SCAN_TIMEOUT_IN_MS); + setIfMissing(TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS); } public static void listAll() { @@ -272,6 +278,12 @@ private static ReplicaRead getReplicaRead(String key) { private HostMapping hostMapping = null; private int rawKVDefaultBackoffInMS = getInt(TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS); + private int rawKVReadTimeoutInMS = getInt(TIKV_RAWKV_READ_TIMEOUT_IN_MS); + private int rawKVWriteTimeoutInMS = getInt(TIKV_RAWKV_WRITE_TIMEOUT_IN_MS); + private int rawKVBatchReadTimeoutInMS = getInt(TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS); + private int rawKVBatchWriteTimeoutInMS = getInt(TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS); + private int rawKVScanTimeoutInMS = getInt(TIKV_RAWKV_SCAN_TIMEOUT_IN_MS); + private int rawKVCleanTimeoutInMS = getInt(TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS); public enum KVMode { TXN, @@ -586,4 +598,52 @@ public int getRawKVDefaultBackoffInMS() { public void setRawKVDefaultBackoffInMS(int rawKVDefaultBackoffInMS) { this.rawKVDefaultBackoffInMS = rawKVDefaultBackoffInMS; } + + public int getRawKVReadTimeoutInMS() { + return rawKVReadTimeoutInMS; + } + + public void setRawKVReadTimeoutInMS(int rawKVReadTimeoutInMS) { + this.rawKVReadTimeoutInMS = rawKVReadTimeoutInMS; + } + + public int getRawKVWriteTimeoutInMS() { + return rawKVWriteTimeoutInMS; + } + + public void setRawKVWriteTimeoutInMS(int rawKVWriteTimeoutInMS) { + this.rawKVWriteTimeoutInMS = rawKVWriteTimeoutInMS; + } + + public int getRawKVBatchReadTimeoutInMS() { + return rawKVBatchReadTimeoutInMS; + } + + public void setRawKVBatchReadTimeoutInMS(int rawKVBatchReadTimeoutInMS) { + this.rawKVBatchReadTimeoutInMS = rawKVBatchReadTimeoutInMS; + } + + public int getRawKVBatchWriteTimeoutInMS() { + return rawKVBatchWriteTimeoutInMS; + } + + public void setRawKVBatchWriteTimeoutInMS(int rawKVBatchWriteTimeoutInMS) { + this.rawKVBatchWriteTimeoutInMS = rawKVBatchWriteTimeoutInMS; + } + + public int getRawKVScanTimeoutInMS() { + return rawKVScanTimeoutInMS; + } + + public void setRawKVScanTimeoutInMS(int rawKVScanTimeoutInMS) { + this.rawKVScanTimeoutInMS = rawKVScanTimeoutInMS; + } + + public int getRawKVCleanTimeoutInMS() { + return rawKVCleanTimeoutInMS; + } + + public void setRawKVCleanTimeoutInMS(int rawKVCleanTimeoutInMS) { + this.rawKVCleanTimeoutInMS = rawKVCleanTimeoutInMS; + } } diff --git a/src/main/java/org/tikv/common/operation/iterator/RawScanIterator.java b/src/main/java/org/tikv/common/operation/iterator/RawScanIterator.java index 9c2fbb724e1..5cdfe90d954 100644 --- a/src/main/java/org/tikv/common/operation/iterator/RawScanIterator.java +++ b/src/main/java/org/tikv/common/operation/iterator/RawScanIterator.java @@ -25,10 +25,10 @@ import org.tikv.common.region.TiRegion; import org.tikv.common.util.BackOffFunction; import org.tikv.common.util.BackOffer; -import org.tikv.common.util.ConcreteBackOffer; import org.tikv.kvproto.Kvrpcpb; public class RawScanIterator extends ScanIterator { + private final BackOffer scanBackOffer; public RawScanIterator( TiConfiguration conf, @@ -36,15 +36,19 @@ public RawScanIterator( ByteString startKey, ByteString endKey, int limit, - boolean keyOnly) { + boolean keyOnly, + BackOffer scanBackOffer) { super(conf, builder, startKey, endKey, limit, keyOnly); + + this.scanBackOffer = scanBackOffer; } + @Override TiRegion loadCurrentRegionToCache() throws GrpcException { - BackOffer backOffer = ConcreteBackOffer.newScannerNextMaxBackOff(); + BackOffer backOffer = scanBackOffer; while (true) { - try (RegionStoreClient client = builder.build(startKey)) { - client.setTimeout(conf.getScanTimeout()); + try (RegionStoreClient client = builder.build(startKey, backOffer)) { + client.setTimeout(conf.getRawKVScanTimeoutInMS()); TiRegion region = client.getRegion(); if (limit <= 0) { currentCache = null; diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index ee41ee1786c..7ab25ea8b74 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -188,7 +188,10 @@ private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffe } private List getRegionStore(List peers, BackOffer backOffer) { - return peers.stream().map(p -> getStoreById(p.getStoreId())).collect(Collectors.toList()); + return peers + .stream() + .map(p -> getStoreById(p.getStoreId(), backOffer)) + .collect(Collectors.toList()); } public TiStore getStoreById(long id, BackOffer backOffer) { diff --git a/src/main/java/org/tikv/common/util/BackOffFunction.java b/src/main/java/org/tikv/common/util/BackOffFunction.java index 702fdd47d63..b80e9e8ec5e 100644 --- a/src/main/java/org/tikv/common/util/BackOffFunction.java +++ b/src/main/java/org/tikv/common/util/BackOffFunction.java @@ -1,7 +1,6 @@ package org.tikv.common.util; import java.util.concurrent.ThreadLocalRandom; -import org.tikv.common.exception.GrpcException; public class BackOffFunction { private final int base; @@ -25,7 +24,7 @@ public static BackOffFunction create(int base, int cap, BackOffer.BackOffStrateg * Do back off in exponential with optional jitters according to different back off strategies. * See http://www.awsarchitectureblog.com/2015/03/backoff.html */ - long doBackOff(long maxSleepMs) { + long getSleepMs(long maxSleepMs) { long sleep = 0; long v = expo(base, cap, attempts); switch (strategy) { @@ -47,11 +46,6 @@ long doBackOff(long maxSleepMs) { sleep = maxSleepMs; } - try { - Thread.sleep(sleep); - } catch (InterruptedException e) { - throw new GrpcException(e); - } attempts++; lastSleep = sleep; return lastSleep; diff --git a/src/main/java/org/tikv/common/util/ClientUtils.java b/src/main/java/org/tikv/common/util/ClientUtils.java index e85babf608b..6365c1325af 100644 --- a/src/main/java/org/tikv/common/util/ClientUtils.java +++ b/src/main/java/org/tikv/common/util/ClientUtils.java @@ -19,6 +19,7 @@ import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.tikv.common.exception.TiKVException; @@ -170,10 +171,14 @@ public static void getTasks( ExecutorCompletionService> completionService, Queue> taskQueue, List batches, - int backOff) { + long backOff) { try { for (int i = 0; i < batches.size(); i++) { - List task = completionService.take().get(backOff, TimeUnit.MILLISECONDS); + Future> future = completionService.poll(backOff, TimeUnit.MILLISECONDS); + if (future == null) { + throw new TiKVException("TimeOut Exceeded for current operation."); + } + List task = future.get(); if (!task.isEmpty()) { taskQueue.offer(task); } @@ -181,8 +186,6 @@ public static void getTasks( } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new TiKVException("Current thread interrupted.", e); - } catch (TimeoutException e) { - throw new TiKVException("TimeOut Exceeded for current operation. ", e); } catch (ExecutionException e) { throw new TiKVException("Execution exception met.", e); } @@ -192,11 +195,16 @@ public static List getTasksWithOutput( ExecutorCompletionService, List>> completionService, Queue> taskQueue, List batches, - int backOff) { + long backOff) { try { List result = new ArrayList<>(); for (int i = 0; i < batches.size(); i++) { - Pair, List> task = completionService.take().get(backOff, TimeUnit.MILLISECONDS); + Future, List>> future = + completionService.poll(backOff, TimeUnit.MILLISECONDS); + if (future == null) { + throw new TiKVException("TimeOut Exceeded for current operation."); + } + Pair, List> task = future.get(); if (!task.first.isEmpty()) { taskQueue.offer(task.first); } else { @@ -207,8 +215,6 @@ public static List getTasksWithOutput( } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new TiKVException("Current thread interrupted.", e); - } catch (TimeoutException e) { - throw new TiKVException("TimeOut Exceeded for current operation. ", e); } catch (ExecutionException e) { throw new TiKVException("Execution exception met.", e); } diff --git a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java index a7af12b85c1..6f6f9b12173 100644 --- a/src/main/java/org/tikv/common/util/ConcreteBackOffer.java +++ b/src/main/java/org/tikv/common/util/ConcreteBackOffer.java @@ -32,12 +32,17 @@ public class ConcreteBackOffer implements BackOffer { private final Map backOffFunctionMap; private final List errors; private int totalSleep; + private long deadline; - private ConcreteBackOffer(int maxSleep) { + private ConcreteBackOffer(int maxSleep, long deadline) { + Preconditions.checkArgument( + maxSleep == 0 || deadline == 0, "Max sleep time should be 0 or Deadline should be 0."); Preconditions.checkArgument(maxSleep >= 0, "Max sleep time cannot be less than 0."); + Preconditions.checkArgument(deadline >= 0, "Deadline cannot be less than 0."); this.maxSleep = maxSleep; this.errors = new ArrayList<>(); this.backOffFunctionMap = new HashMap<>(); + this.deadline = deadline; } private ConcreteBackOffer(ConcreteBackOffer source) { @@ -45,34 +50,40 @@ private ConcreteBackOffer(ConcreteBackOffer source) { this.totalSleep = source.totalSleep; this.errors = source.errors; this.backOffFunctionMap = source.backOffFunctionMap; + this.deadline = source.deadline; + } + + public static ConcreteBackOffer newDeadlineBackOff(int timeoutInMs) { + long deadline = System.currentTimeMillis() + timeoutInMs; + return new ConcreteBackOffer(0, deadline); } public static ConcreteBackOffer newCustomBackOff(int maxSleep) { - return new ConcreteBackOffer(maxSleep); + return new ConcreteBackOffer(maxSleep, 0); } public static ConcreteBackOffer newScannerNextMaxBackOff() { - return new ConcreteBackOffer(SCANNER_NEXT_MAX_BACKOFF); + return new ConcreteBackOffer(SCANNER_NEXT_MAX_BACKOFF, 0); } public static ConcreteBackOffer newBatchGetMaxBackOff() { - return new ConcreteBackOffer(BATCH_GET_MAX_BACKOFF); + return new ConcreteBackOffer(BATCH_GET_MAX_BACKOFF, 0); } public static ConcreteBackOffer newCopNextMaxBackOff() { - return new ConcreteBackOffer(COP_NEXT_MAX_BACKOFF); + return new ConcreteBackOffer(COP_NEXT_MAX_BACKOFF, 0); } public static ConcreteBackOffer newGetBackOff() { - return new ConcreteBackOffer(GET_MAX_BACKOFF); + return new ConcreteBackOffer(GET_MAX_BACKOFF, 0); } public static ConcreteBackOffer newRawKVBackOff() { - return new ConcreteBackOffer(RAWKV_MAX_BACKOFF); + return new ConcreteBackOffer(RAWKV_MAX_BACKOFF, 0); } public static ConcreteBackOffer newTsoBackOff() { - return new ConcreteBackOffer(TSO_MAX_BACKOFF); + return new ConcreteBackOffer(TSO_MAX_BACKOFF, 0); } public static ConcreteBackOffer create(BackOffer source) { @@ -125,27 +136,46 @@ public void doBackOffWithMaxSleep( BackOffFunction backOffFunction = backOffFunctionMap.computeIfAbsent(funcType, this::createBackOffFunc); - // Back off will be done here - totalSleep += backOffFunction.doBackOff(maxSleepMs); + // Back off will not be done here + long sleep = backOffFunction.getSleepMs(maxSleepMs); + totalSleep += sleep; + logger.debug( String.format( "%s, retry later(totalSleep %dms, maxSleep %dms)", err.getMessage(), totalSleep, maxSleep)); errors.add(err); + + // Check deadline + if (deadline > 0) { + long currentMs = System.currentTimeMillis(); + if (currentMs + sleep >= deadline) { + logThrowError(String.format("Deadline %d is exceeded, errors:", deadline), err); + } + } + + try { + Thread.sleep(sleep); + } catch (InterruptedException e) { + throw new GrpcException(e); + } + if (maxSleep > 0 && totalSleep >= maxSleep) { - StringBuilder errMsg = - new StringBuilder( - String.format("BackOffer.maxSleep %dms is exceeded, errors:", maxSleep)); - for (int i = 0; i < errors.size(); i++) { - Exception curErr = errors.get(i); - // Print only last 3 errors for non-DEBUG log levels. - if (logger.isDebugEnabled() || i >= errors.size() - 3) { - errMsg.append("\n").append(i).append(".").append(curErr.toString()); - } + logThrowError(String.format("BackOffer.maxSleep %dms is exceeded, errors:", maxSleep), err); + } + } + + private void logThrowError(String msg, Exception err) { + StringBuilder errMsg = new StringBuilder(msg); + for (int i = 0; i < errors.size(); i++) { + Exception curErr = errors.get(i); + // Print only last 3 errors for non-DEBUG log levels. + if (logger.isDebugEnabled() || i >= errors.size() - 3) { + errMsg.append("\n").append(i).append(".").append(curErr.toString()); } - logger.warn(errMsg.toString()); - // Use the last backoff type to generate an exception - throw new GrpcException("retry is exhausted.", err); } + logger.warn(errMsg.toString()); + // Use the last backoff type to generate an exception + throw new GrpcException("retry is exhausted.", err); } } diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index dbd7d3d80a8..fdd96600b96 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -130,7 +130,7 @@ private void put(ByteString key, ByteString value, long ttl, boolean atomic) { String label = "client_raw_put"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); try { - BackOffer backOffer = defaultBackOff(); + BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS()); while (true) { RegionStoreClient client = clientBuilder.build(key, backOffer); try { @@ -139,6 +139,7 @@ private void put(ByteString key, ByteString value, long ttl, boolean atomic) { return; } catch (final TiKVException e) { backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); + logger.warn("Retry for put error", e); } } } catch (Exception e) { @@ -174,7 +175,7 @@ public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) { String label = "client_raw_put_if_absent"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); try { - BackOffer backOffer = defaultBackOff(); + BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS()); while (true) { RegionStoreClient client = clientBuilder.build(key, backOffer); try { @@ -183,6 +184,7 @@ public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) { return result; } catch (final TiKVException e) { backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); + logger.warn("Retry for putIfAbsent error", e); } } } catch (Exception e) { @@ -235,7 +237,10 @@ private void batchPut(Map kvPairs, long ttl, boolean ato String label = "client_raw_batch_put"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); try { - doSendBatchPut(defaultBackOff(), kvPairs, ttl, atomic); + BackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchWriteTimeoutInMS()); + long deadline = System.currentTimeMillis() + conf.getRawKVBatchWriteTimeoutInMS(); + doSendBatchPut(backOffer, kvPairs, ttl, atomic, deadline); RAW_REQUEST_SUCCESS.labels(label).inc(); } catch (Exception e) { RAW_REQUEST_FAILURE.labels(label).inc(); @@ -255,15 +260,16 @@ public ByteString get(ByteString key) { String label = "client_raw_get"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); try { - BackOffer backOffer = defaultBackOff(); + BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS()); while (true) { RegionStoreClient client = clientBuilder.build(key, backOffer); try { - ByteString result = client.rawGet(defaultBackOff(), key); + ByteString result = client.rawGet(backOffer, key); RAW_REQUEST_SUCCESS.labels(label).inc(); return result; } catch (final TiKVException e) { backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); + logger.warn("Retry for get error", e); } } } catch (Exception e) { @@ -284,8 +290,10 @@ public List batchGet(List keys) { String label = "client_raw_batch_get"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); try { - BackOffer backOffer = defaultBackOff(); - List result = doSendBatchGet(backOffer, keys); + BackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchReadTimeoutInMS()); + long deadline = System.currentTimeMillis() + conf.getRawKVBatchReadTimeoutInMS(); + List result = doSendBatchGet(backOffer, keys, deadline); RAW_REQUEST_SUCCESS.labels(label).inc(); return result; } catch (Exception e) { @@ -318,8 +326,10 @@ private void batchDelete(List keys, boolean atomic) { String label = "client_raw_batch_delete"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); try { - BackOffer backOffer = defaultBackOff(); - doSendBatchDelete(backOffer, keys, atomic); + BackOffer backOffer = + ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchWriteTimeoutInMS()); + long deadline = System.currentTimeMillis() + conf.getRawKVBatchWriteTimeoutInMS(); + doSendBatchDelete(backOffer, keys, atomic, deadline); RAW_REQUEST_SUCCESS.labels(label).inc(); return; } catch (Exception e) { @@ -341,15 +351,16 @@ public Long getKeyTTL(ByteString key) { String label = "client_raw_get_key_ttl"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); try { - BackOffer backOffer = defaultBackOff(); + BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS()); while (true) { RegionStoreClient client = clientBuilder.build(key, backOffer); try { - Long result = client.rawGetKeyTTL(defaultBackOff(), key); + Long result = client.rawGetKeyTTL(backOffer, key); RAW_REQUEST_SUCCESS.labels(label).inc(); return result; } catch (final TiKVException e) { backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); + logger.warn("Retry for getKeyTTL error", e); } } } catch (Exception e) { @@ -363,6 +374,8 @@ public Long getKeyTTL(ByteString key) { public List> batchScan(List ranges) { String label = "client_raw_batch_scan"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); + long deadline = System.currentTimeMillis() + conf.getRawKVScanTimeoutInMS(); + List>>> futureList = new ArrayList<>(); try { if (ranges.isEmpty()) { return new ArrayList<>(); @@ -372,7 +385,7 @@ public List> batchScan(List ranges) { int num = 0; for (ScanOption scanOption : ranges) { int i = num; - completionService.submit(() -> Pair.create(i, scan(scanOption))); + futureList.add(completionService.submit(() -> Pair.create(i, scan(scanOption)))); ++num; } List> scanResults = new ArrayList<>(); @@ -381,14 +394,16 @@ public List> batchScan(List ranges) { } for (int i = 0; i < num; i++) { try { - Pair> scanResult = - completionService.take().get(BackOffer.RAWKV_MAX_BACKOFF, TimeUnit.SECONDS); + Future>> future = + completionService.poll(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + if (future == null) { + throw new TiKVException("TimeOut Exceeded for current operation."); + } + Pair> scanResult = future.get(); scanResults.set(scanResult.first, scanResult.second); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new TiKVException("Current thread interrupted.", e); - } catch (TimeoutException e) { - throw new TiKVException("TimeOut Exceeded for current operation. ", e); } catch (ExecutionException e) { throw new TiKVException("Execution exception met.", e); } @@ -397,6 +412,9 @@ public List> batchScan(List ranges) { return scanResults; } catch (Exception e) { RAW_REQUEST_FAILURE.labels(label).inc(); + for (Future>> future : futureList) { + future.cancel(true); + } throw e; } finally { requestTimer.observeDuration(); @@ -428,8 +446,9 @@ public List scan(ByteString startKey, ByteString endKey, int limit, bool String label = "client_raw_scan"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); try { + BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS()); Iterator iterator = - rawScanIterator(conf, clientBuilder, startKey, endKey, limit, keyOnly); + rawScanIterator(conf, clientBuilder, startKey, endKey, limit, keyOnly, backOffer); List result = new ArrayList<>(); iterator.forEachRemaining(result::add); RAW_REQUEST_SUCCESS.labels(label).inc(); @@ -488,11 +507,12 @@ public List scan(ByteString startKey, ByteString endKey, boolean keyOnly String label = "client_raw_scan_without_limit"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); try { + BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS()); List result = new ArrayList<>(); while (true) { Iterator iterator = rawScanIterator( - conf, clientBuilder, startKey, endKey, conf.getScanBatchSize(), keyOnly); + conf, clientBuilder, startKey, endKey, conf.getScanBatchSize(), keyOnly, backOffer); if (!iterator.hasNext()) { break; } @@ -559,15 +579,16 @@ private void delete(ByteString key, boolean atomic) { String label = "client_raw_delete"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); try { - BackOffer backOffer = defaultBackOff(); + BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS()); while (true) { RegionStoreClient client = clientBuilder.build(key, backOffer); try { - client.rawDelete(defaultBackOff(), key, atomic); + client.rawDelete(backOffer, key, atomic); RAW_REQUEST_SUCCESS.labels(label).inc(); return; } catch (final TiKVException e) { backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); + logger.warn("Retry for delete error", e); } } } catch (Exception e) { @@ -591,8 +612,9 @@ public synchronized void deleteRange(ByteString startKey, ByteString endKey) { String label = "client_raw_delete_range"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); try { - BackOffer backOffer = defaultBackOff(); - doSendDeleteRange(backOffer, startKey, endKey); + BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVCleanTimeoutInMS()); + long deadline = System.currentTimeMillis() + conf.getRawKVCleanTimeoutInMS(); + doSendDeleteRange(backOffer, startKey, endKey, deadline); RAW_REQUEST_SUCCESS.labels(label).inc(); } catch (Exception e) { RAW_REQUEST_FAILURE.labels(label).inc(); @@ -616,10 +638,16 @@ public synchronized void deletePrefix(ByteString key) { } private void doSendBatchPut( - BackOffer backOffer, Map kvPairs, long ttl, boolean atomic) { + BackOffer backOffer, + Map kvPairs, + long ttl, + boolean atomic, + long deadline) { ExecutorCompletionService> completionService = new ExecutorCompletionService<>(batchPutThreadPool); + List>> futureList = new ArrayList<>(); + Map> groupKeys = groupKeysByRegion(clientBuilder.getRegionManager(), kvPairs.keySet(), backOffer); List batches = new ArrayList<>(); @@ -640,23 +668,32 @@ private void doSendBatchPut( while (!taskQueue.isEmpty()) { List task = taskQueue.poll(); for (Batch batch : task) { - completionService.submit( - () -> doSendBatchPutInBatchesWithRetry(batch.getBackOffer(), batch, ttl, atomic)); + futureList.add( + completionService.submit( + () -> doSendBatchPutInBatchesWithRetry(batch.getBackOffer(), batch, ttl, atomic))); + } + + try { + getTasks(completionService, taskQueue, task, deadline - System.currentTimeMillis()); + } catch (Exception e) { + for (Future> future : futureList) { + future.cancel(true); + } + throw e; } - getTasks(completionService, taskQueue, task, BackOffer.RAWKV_MAX_BACKOFF); } } private List doSendBatchPutInBatchesWithRetry( BackOffer backOffer, Batch batch, long ttl, boolean atomic) { - try (RegionStoreClient client = clientBuilder.build(batch.getRegion())) { - client.setTimeout(conf.getScanTimeout()); + try (RegionStoreClient client = clientBuilder.build(batch.getRegion(), backOffer)) { + client.setTimeout(conf.getRawKVBatchWriteTimeoutInMS()); client.rawBatchPut(backOffer, batch, ttl, atomic); return new ArrayList<>(); } catch (final TiKVException e) { // TODO: any elegant way to re-split the ranges if fails? backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); - logger.debug("ReSplitting ranges for BatchPutRequest"); + logger.warn("ReSplitting ranges for BatchPutRequest", e); // retry return doSendBatchPutWithRefetchRegion(backOffer, batch); } @@ -681,10 +718,12 @@ private List doSendBatchPutWithRefetchRegion(BackOffer backOffer, Batch b return retryBatches; } - private List doSendBatchGet(BackOffer backOffer, List keys) { + private List doSendBatchGet(BackOffer backOffer, List keys, long deadline) { ExecutorCompletionService, List>> completionService = new ExecutorCompletionService<>(batchGetThreadPool); + List, List>>> futureList = new ArrayList<>(); + List batches = getBatches(backOffer, keys, RAW_BATCH_GET_SIZE, MAX_RAW_BATCH_LIMIT, this.clientBuilder); @@ -695,11 +734,20 @@ private List doSendBatchGet(BackOffer backOffer, List keys) while (!taskQueue.isEmpty()) { List task = taskQueue.poll(); for (Batch batch : task) { - completionService.submit( - () -> doSendBatchGetInBatchesWithRetry(batch.getBackOffer(), batch)); + futureList.add( + completionService.submit( + () -> doSendBatchGetInBatchesWithRetry(batch.getBackOffer(), batch))); + } + try { + result.addAll( + getTasksWithOutput( + completionService, taskQueue, task, deadline - System.currentTimeMillis())); + } catch (Exception e) { + for (Future, List>> future : futureList) { + future.cancel(true); + } + throw e; } - result.addAll( - getTasksWithOutput(completionService, taskQueue, task, BackOffer.RAWKV_MAX_BACKOFF)); } return result; @@ -714,7 +762,7 @@ private Pair, List> doSendBatchGetInBatchesWithRetry( } catch (final TiKVException e) { backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); clientBuilder.getRegionManager().invalidateRegion(batch.getRegion()); - logger.debug("ReSplitting ranges for BatchGetRequest", e); + logger.warn("ReSplitting ranges for BatchGetRequest", e); // retry return Pair.create(doSendBatchGetWithRefetchRegion(backOffer, batch), new ArrayList<>()); @@ -726,10 +774,13 @@ private List doSendBatchGetWithRefetchRegion(BackOffer backOffer, Batch b backOffer, batch.getKeys(), RAW_BATCH_GET_SIZE, MAX_RAW_BATCH_LIMIT, clientBuilder); } - private void doSendBatchDelete(BackOffer backOffer, List keys, boolean atomic) { + private void doSendBatchDelete( + BackOffer backOffer, List keys, boolean atomic, long deadline) { ExecutorCompletionService> completionService = new ExecutorCompletionService<>(batchDeleteThreadPool); + List>> futureList = new ArrayList<>(); + List batches = getBatches(backOffer, keys, RAW_BATCH_DELETE_SIZE, MAX_RAW_BATCH_LIMIT, this.clientBuilder); @@ -739,10 +790,18 @@ private void doSendBatchDelete(BackOffer backOffer, List keys, boole while (!taskQueue.isEmpty()) { List task = taskQueue.poll(); for (Batch batch : task) { - completionService.submit( - () -> doSendBatchDeleteInBatchesWithRetry(batch.getBackOffer(), batch, atomic)); + futureList.add( + completionService.submit( + () -> doSendBatchDeleteInBatchesWithRetry(batch.getBackOffer(), batch, atomic))); + } + try { + getTasks(completionService, taskQueue, task, deadline - System.currentTimeMillis()); + } catch (Exception e) { + for (Future> future : futureList) { + future.cancel(true); + } + throw e; } - getTasks(completionService, taskQueue, task, BackOffer.RAWKV_MAX_BACKOFF); } } @@ -755,7 +814,7 @@ private List doSendBatchDeleteInBatchesWithRetry( } catch (final TiKVException e) { backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); clientBuilder.getRegionManager().invalidateRegion(batch.getRegion()); - logger.debug("ReSplitting ranges for BatchGetRequest", e); + logger.warn("ReSplitting ranges for BatchGetRequest", e); // retry return doSendBatchDeleteWithRefetchRegion(backOffer, batch); @@ -774,10 +833,13 @@ private ByteString calcKeyByCondition(boolean condition, ByteString key1, ByteSt return key2; } - private void doSendDeleteRange(BackOffer backOffer, ByteString startKey, ByteString endKey) { + private void doSendDeleteRange( + BackOffer backOffer, ByteString startKey, ByteString endKey, long deadline) { ExecutorCompletionService> completionService = new ExecutorCompletionService<>(deleteRangeThreadPool); + List>> futureList = new ArrayList<>(); + List regions = fetchRegionsFromRange(backOffer, startKey, endKey); List ranges = new ArrayList<>(); for (int i = 0; i < regions.size(); i++) { @@ -791,9 +853,18 @@ private void doSendDeleteRange(BackOffer backOffer, ByteString startKey, ByteStr while (!taskQueue.isEmpty()) { List task = taskQueue.poll(); for (DeleteRange range : task) { - completionService.submit(() -> doSendDeleteRangeWithRetry(range.getBackOffer(), range)); + futureList.add( + completionService.submit( + () -> doSendDeleteRangeWithRetry(range.getBackOffer(), range))); + } + try { + getTasks(completionService, taskQueue, task, deadline - System.currentTimeMillis()); + } catch (Exception e) { + for (Future> future : futureList) { + future.cancel(true); + } + throw e; } - getTasks(completionService, taskQueue, task, BackOffer.RAWKV_MAX_BACKOFF); } } @@ -805,7 +876,7 @@ private List doSendDeleteRangeWithRetry(BackOffer backOffer, Delete } catch (final TiKVException e) { backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); clientBuilder.getRegionManager().invalidateRegion(range.getRegion()); - logger.debug("ReSplitting ranges for BatchDeleteRangeRequest", e); + logger.warn("ReSplitting ranges for BatchDeleteRangeRequest", e); // retry return doSendDeleteRangeWithRefetchRegion(backOffer, range); @@ -858,14 +929,11 @@ private Iterator rawScanIterator( ByteString startKey, ByteString endKey, int limit, - boolean keyOnly) { + boolean keyOnly, + BackOffer backOffer) { if (limit > MAX_RAW_SCAN_LIMIT) { throw ERR_MAX_SCAN_LIMIT_EXCEEDED; } - return new RawScanIterator(conf, builder, startKey, endKey, limit, keyOnly); - } - - private BackOffer defaultBackOff() { - return ConcreteBackOffer.newCustomBackOff(conf.getRawKVDefaultBackoffInMS()); + return new RawScanIterator(conf, builder, startKey, endKey, limit, keyOnly, backOffer); } } diff --git a/src/test/java/org/tikv/raw/RawKVClientTest.java b/src/test/java/org/tikv/raw/RawKVClientTest.java index ba2f8d40a5f..26fa171e471 100644 --- a/src/test/java/org/tikv/raw/RawKVClientTest.java +++ b/src/test/java/org/tikv/raw/RawKVClientTest.java @@ -19,6 +19,9 @@ import org.tikv.common.codec.KeyUtils; import org.tikv.common.exception.TiKVException; import org.tikv.common.key.Key; +import org.tikv.common.util.BackOffFunction; +import org.tikv.common.util.BackOffer; +import org.tikv.common.util.ConcreteBackOffer; import org.tikv.common.util.FastByteComparisons; import org.tikv.common.util.ScanOption; import org.tikv.kvproto.Kvrpcpb; @@ -160,6 +163,46 @@ private String generateType() { "%s%02d", RandomStringUtils.randomAlphabetic(3).toUpperCase(Locale.ROOT), r.nextInt(10000)); } + @Test + public void testCustomBackOff() { + int timeout = 2000; + int sleep = 150; + BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(timeout); + long s = System.currentTimeMillis(); + try { + while (true) { + Thread.sleep(sleep); + backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, new Exception("t")); + } + } catch (Exception ignored) { + } finally { + long e = System.currentTimeMillis(); + long duration = e - s; + logger.info("duration = " + duration); + assert (duration >= 2900); + } + } + + @Test + public void testDeadlineBackOff() { + int timeout = 2000; + int sleep = 150; + BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(timeout); + long s = System.currentTimeMillis(); + try { + while (true) { + Thread.sleep(sleep); + backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, new Exception("t")); + } + } catch (Exception ignored) { + } finally { + long e = System.currentTimeMillis(); + long duration = e - s; + logger.info("duration = " + duration); + assert (duration <= timeout + sleep); + } + } + @Test public void batchPutTest() { if (!initialized) return;