Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,30 @@ The following includes ThreadPool related parameters, which can be passed in thr
- RawKV default backoff in milliseconds
- default: 20000 (20 seconds)

#### tikv.rawkv.read_timeout_in_ms
- RawKV read timeout in milliseconds. This parameter controls the timeout of `get` `getKeyTTL`.
- default: 2000 (2 seconds)

#### tikv.rawkv.write_timeout_in_ms
- RawKV write timeout in milliseconds. This parameter controls the timeout of `put` `putAtomic` `putIfAbsent` `delete` `deleteAtomic`.
- default: 2000 (2 seconds)

#### tikv.rawkv.batch_read_timeout_in_ms
- RawKV batch read timeout in milliseconds. This parameter controls the timeout of `batchGet`.
- default: 2000 (2 seconds)

#### tikv.rawkv.batch_write_timeout_in_ms
- RawKV batch write timeout in milliseconds. This parameter controls the timeout of `batchPut` `batchDelete` `batchDeleteAtomic`.
- default: 2000 (2 seconds)

#### tikv.rawkv.scan_timeout_in_ms
- RawKV scan timeout in milliseconds. This parameter controls the timeout of `batchScan` `scan` `scanPrefix`.
- default: 10000 (10 seconds)

#### tikv.rawkv.clean_timeout_in_ms
- RawKV clean timeout in milliseconds. This parameter controls the timeout of `deleteRange` `deletePrefix`.
- default: 600000 (10 minutes)

## Metrics

Client Java supports exporting metrics to Prometheus using poll mode and viewing on Grafana. The following steps shows how to enable this function.
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/org/tikv/common/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public class ConfigUtils {
"tikv.health_check_period_duration";

public static final String TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS = "tikv.rawkv.default_backoff_in_ms";
public static final String TIKV_RAWKV_READ_TIMEOUT_IN_MS = "tikv.rawkv.read_timeout_in_ms";
public static final String TIKV_RAWKV_WRITE_TIMEOUT_IN_MS = "tikv.rawkv.write_timeout_in_ms";
public static final String TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS =
"tikv.rawkv.batch_read_timeout_in_ms";
public static final String TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS =
"tikv.rawkv.batch_write_timeout_in_ms";
public static final String TIKV_RAWKV_SCAN_TIMEOUT_IN_MS = "tikv.rawkv.scan_timeout_in_ms";
public static final String TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS = "tikv.rawkv.clean_timeout_in_ms";

public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "200ms";
Expand Down Expand Up @@ -91,6 +99,13 @@ public class ConfigUtils {

public static final int DEF_TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS = BackOffer.RAWKV_MAX_BACKOFF;

public static final int DEF_TIKV_RAWKV_READ_TIMEOUT_IN_MS = 2000;
public static final int DEF_TIKV_RAWKV_WRITE_TIMEOUT_IN_MS = 2000;
public static final int DEF_TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS = 2000;
public static final int DEF_TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS = 2000;
public static final int DEF_TIKV_RAWKV_SCAN_TIMEOUT_IN_MS = 10000;
public static final int DEF_TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS = 600000;

public static final String NORMAL_COMMAND_PRIORITY = "NORMAL";
public static final String LOW_COMMAND_PRIORITY = "LOW";
public static final String HIGH_COMMAND_PRIORITY = "HIGH";
Expand Down
60 changes: 60 additions & 0 deletions src/main/java/org/tikv/common/TiConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ private static void loadFromDefaultProperties() {
setIfMissing(TIKV_GRPC_HEALTH_CHECK_TIMEOUT, DEF_CHECK_HEALTH_TIMEOUT);
setIfMissing(TIKV_HEALTH_CHECK_PERIOD_DURATION, DEF_HEALTH_CHECK_PERIOD_DURATION);
setIfMissing(TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS, DEF_TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS);
setIfMissing(TIKV_RAWKV_READ_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_READ_TIMEOUT_IN_MS);
setIfMissing(TIKV_RAWKV_WRITE_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_WRITE_TIMEOUT_IN_MS);
setIfMissing(TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS);
setIfMissing(TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS);
setIfMissing(TIKV_RAWKV_SCAN_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_SCAN_TIMEOUT_IN_MS);
setIfMissing(TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS);
}

public static void listAll() {
Expand Down Expand Up @@ -272,6 +278,12 @@ private static ReplicaRead getReplicaRead(String key) {
private HostMapping hostMapping = null;

private int rawKVDefaultBackoffInMS = getInt(TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS);
private int rawKVReadTimeoutInMS = getInt(TIKV_RAWKV_READ_TIMEOUT_IN_MS);
private int rawKVWriteTimeoutInMS = getInt(TIKV_RAWKV_WRITE_TIMEOUT_IN_MS);
private int rawKVBatchReadTimeoutInMS = getInt(TIKV_RAWKV_BATCH_READ_TIMEOUT_IN_MS);
private int rawKVBatchWriteTimeoutInMS = getInt(TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS);
private int rawKVScanTimeoutInMS = getInt(TIKV_RAWKV_SCAN_TIMEOUT_IN_MS);
private int rawKVCleanTimeoutInMS = getInt(TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS);

public enum KVMode {
TXN,
Expand Down Expand Up @@ -586,4 +598,52 @@ public int getRawKVDefaultBackoffInMS() {
public void setRawKVDefaultBackoffInMS(int rawKVDefaultBackoffInMS) {
this.rawKVDefaultBackoffInMS = rawKVDefaultBackoffInMS;
}

public int getRawKVReadTimeoutInMS() {
return rawKVReadTimeoutInMS;
}

public void setRawKVReadTimeoutInMS(int rawKVReadTimeoutInMS) {
this.rawKVReadTimeoutInMS = rawKVReadTimeoutInMS;
}

public int getRawKVWriteTimeoutInMS() {
return rawKVWriteTimeoutInMS;
}

public void setRawKVWriteTimeoutInMS(int rawKVWriteTimeoutInMS) {
this.rawKVWriteTimeoutInMS = rawKVWriteTimeoutInMS;
}

public int getRawKVBatchReadTimeoutInMS() {
return rawKVBatchReadTimeoutInMS;
}

public void setRawKVBatchReadTimeoutInMS(int rawKVBatchReadTimeoutInMS) {
this.rawKVBatchReadTimeoutInMS = rawKVBatchReadTimeoutInMS;
}

public int getRawKVBatchWriteTimeoutInMS() {
return rawKVBatchWriteTimeoutInMS;
}

public void setRawKVBatchWriteTimeoutInMS(int rawKVBatchWriteTimeoutInMS) {
this.rawKVBatchWriteTimeoutInMS = rawKVBatchWriteTimeoutInMS;
}

public int getRawKVScanTimeoutInMS() {
return rawKVScanTimeoutInMS;
}

public void setRawKVScanTimeoutInMS(int rawKVScanTimeoutInMS) {
this.rawKVScanTimeoutInMS = rawKVScanTimeoutInMS;
}

public int getRawKVCleanTimeoutInMS() {
return rawKVCleanTimeoutInMS;
}

public void setRawKVCleanTimeoutInMS(int rawKVCleanTimeoutInMS) {
this.rawKVCleanTimeoutInMS = rawKVCleanTimeoutInMS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,30 @@
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.Kvrpcpb;

public class RawScanIterator extends ScanIterator {
private final BackOffer scanBackOffer;

public RawScanIterator(
TiConfiguration conf,
RegionStoreClientBuilder builder,
ByteString startKey,
ByteString endKey,
int limit,
boolean keyOnly) {
boolean keyOnly,
BackOffer scanBackOffer) {
super(conf, builder, startKey, endKey, limit, keyOnly);

this.scanBackOffer = scanBackOffer;
}

@Override
TiRegion loadCurrentRegionToCache() throws GrpcException {
BackOffer backOffer = ConcreteBackOffer.newScannerNextMaxBackOff();
BackOffer backOffer = scanBackOffer;
while (true) {
try (RegionStoreClient client = builder.build(startKey)) {
client.setTimeout(conf.getScanTimeout());
try (RegionStoreClient client = builder.build(startKey, backOffer)) {
client.setTimeout(conf.getRawKVScanTimeoutInMS());
TiRegion region = client.getRegion();
if (limit <= 0) {
currentCache = null;
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/org/tikv/common/region/RegionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,10 @@ private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffe
}

private List<TiStore> getRegionStore(List<Metapb.Peer> peers, BackOffer backOffer) {
return peers.stream().map(p -> getStoreById(p.getStoreId())).collect(Collectors.toList());
return peers
.stream()
.map(p -> getStoreById(p.getStoreId(), backOffer))
.collect(Collectors.toList());
}

public TiStore getStoreById(long id, BackOffer backOffer) {
Expand Down
8 changes: 1 addition & 7 deletions src/main/java/org/tikv/common/util/BackOffFunction.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.tikv.common.util;

import java.util.concurrent.ThreadLocalRandom;
import org.tikv.common.exception.GrpcException;

public class BackOffFunction {
private final int base;
Expand All @@ -25,7 +24,7 @@ public static BackOffFunction create(int base, int cap, BackOffer.BackOffStrateg
* Do back off in exponential with optional jitters according to different back off strategies.
* See http://www.awsarchitectureblog.com/2015/03/backoff.html
*/
long doBackOff(long maxSleepMs) {
long getSleepMs(long maxSleepMs) {
long sleep = 0;
long v = expo(base, cap, attempts);
switch (strategy) {
Expand All @@ -47,11 +46,6 @@ long doBackOff(long maxSleepMs) {
sleep = maxSleepMs;
}

try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
throw new GrpcException(e);
}
attempts++;
lastSleep = sleep;
return lastSleep;
Expand Down
22 changes: 14 additions & 8 deletions src/main/java/org/tikv/common/util/ClientUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.tikv.common.exception.TiKVException;
Expand Down Expand Up @@ -170,19 +171,21 @@ public static <T> void getTasks(
ExecutorCompletionService<List<T>> completionService,
Queue<List<T>> taskQueue,
List<T> batches,
int backOff) {
long backOff) {
try {
for (int i = 0; i < batches.size(); i++) {
List<T> task = completionService.take().get(backOff, TimeUnit.MILLISECONDS);
Future<List<T>> future = completionService.poll(backOff, TimeUnit.MILLISECONDS);
if (future == null) {
throw new TiKVException("TimeOut Exceeded for current operation.");
}
List<T> task = future.get();
if (!task.isEmpty()) {
taskQueue.offer(task);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TiKVException("Current thread interrupted.", e);
} catch (TimeoutException e) {
throw new TiKVException("TimeOut Exceeded for current operation. ", e);
} catch (ExecutionException e) {
throw new TiKVException("Execution exception met.", e);
}
Expand All @@ -192,11 +195,16 @@ public static <T, U> List<U> getTasksWithOutput(
ExecutorCompletionService<Pair<List<T>, List<U>>> completionService,
Queue<List<T>> taskQueue,
List<T> batches,
int backOff) {
long backOff) {
try {
List<U> result = new ArrayList<>();
for (int i = 0; i < batches.size(); i++) {
Pair<List<T>, List<U>> task = completionService.take().get(backOff, TimeUnit.MILLISECONDS);
Future<Pair<List<T>, List<U>>> future =
completionService.poll(backOff, TimeUnit.MILLISECONDS);
if (future == null) {
throw new TiKVException("TimeOut Exceeded for current operation.");
}
Pair<List<T>, List<U>> task = future.get();
if (!task.first.isEmpty()) {
taskQueue.offer(task.first);
} else {
Expand All @@ -207,8 +215,6 @@ public static <T, U> List<U> getTasksWithOutput(
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TiKVException("Current thread interrupted.", e);
} catch (TimeoutException e) {
throw new TiKVException("TimeOut Exceeded for current operation. ", e);
} catch (ExecutionException e) {
throw new TiKVException("Execution exception met.", e);
}
Expand Down
Loading