Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ The following includes JVM related parameters.
- time to wait for scattering regions
- default: 300 (5min)

#### tikv.rawkv.default_backoff_in_ms
- RawKV default backoff in milliseconds
- default: 20000 (20 seconds)

### Metrics Parameter

#### tikv.metrics.enable
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/tikv/common/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package org.tikv.common;

import org.tikv.common.util.BackOffer;
import org.tikv.kvproto.Kvrpcpb;

public class ConfigUtils {
Expand Down Expand Up @@ -61,6 +62,8 @@ public class ConfigUtils {

public static final String TIKV_SCATTER_WAIT_SECONDS = "tikv.scatter_wait_seconds";

public static final String TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS = "tikv.rawkv.default_backoff_in_ms";

public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "200ms";
public static final String DEF_FORWARD_TIMEOUT = "300ms";
Expand Down Expand Up @@ -97,6 +100,7 @@ public class ConfigUtils {
public static final int DEF_TIKV_IMPORTER_MAX_KV_BATCH_BYTES = 1024 * 1024;
public static final int DEF_TIKV_IMPORTER_MAX_KV_BATCH_SIZE = 1024 * 32;
public static final int DEF_TIKV_SCATTER_WAIT_SECONDS = 300;
public static final int DEF_TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS = BackOffer.RAWKV_MAX_BACKOFF;

public static final String NORMAL_COMMAND_PRIORITY = "NORMAL";
public static final String LOW_COMMAND_PRIORITY = "LOW";
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/org/tikv/common/TiConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ private static void loadFromDefaultProperties() {
setIfMissing(TIKV_IMPORTER_MAX_KV_BATCH_BYTES, DEF_TIKV_IMPORTER_MAX_KV_BATCH_BYTES);
setIfMissing(TIKV_IMPORTER_MAX_KV_BATCH_SIZE, DEF_TIKV_IMPORTER_MAX_KV_BATCH_SIZE);
setIfMissing(TIKV_SCATTER_WAIT_SECONDS, DEF_TIKV_SCATTER_WAIT_SECONDS);
setIfMissing(TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS, DEF_TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS);
}

public static void listAll() {
Expand Down Expand Up @@ -282,6 +283,8 @@ private static ReplicaRead getReplicaRead(String key) {

private int scatterWaitSeconds = getInt(TIKV_SCATTER_WAIT_SECONDS);

private int rawKVDefaultBackoffInMS = getInt(TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS);

public enum KVMode {
TXN,
RAW
Expand Down Expand Up @@ -627,4 +630,12 @@ public int getScatterWaitSeconds() {
public void setScatterWaitSeconds(int scatterWaitSeconds) {
this.scatterWaitSeconds = scatterWaitSeconds;
}

public int getRawKVDefaultBackoffInMS() {
return rawKVDefaultBackoffInMS;
}

public void setRawKVDefaultBackoffInMS(int rawKVDefaultBackoffInMS) {
this.rawKVDefaultBackoffInMS = rawKVDefaultBackoffInMS;
}
}
7 changes: 3 additions & 4 deletions src/main/java/org/tikv/raw/RawKVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public void batchPut(Map<ByteString, ByteString> kvPairs, long ttl) {
String label = "client_raw_batch_put";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
doSendBatchPut(ConcreteBackOffer.newRawKVBackOff(), kvPairs, ttl);
doSendBatchPut(defaultBackOff(), kvPairs, ttl);
RAW_REQUEST_SUCCESS.labels(label).inc();
} catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc();
Expand Down Expand Up @@ -699,8 +699,7 @@ public synchronized void ingest(List<Pair<ByteString, ByteString>> list, Long tt
// group keys by region
List<ByteString> keyList = list.stream().map(pair -> pair.first).collect(Collectors.toList());
Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(
clientBuilder.getRegionManager(), keyList, ConcreteBackOffer.newRawKVBackOff());
groupKeysByRegion(clientBuilder.getRegionManager(), keyList, defaultBackOff());

// ingest for each region
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
Expand Down Expand Up @@ -979,7 +978,7 @@ private Iterator<KvPair> rawScanIterator(
}

private BackOffer defaultBackOff() {
return ConcreteBackOffer.newRawKVBackOff();
return ConcreteBackOffer.newCustomBackOff(conf.getRawKVDefaultBackoffInMS());
}

/**
Expand Down