Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,25 @@ The following includes JVM related parameters.
- timeout of scan/delete range grpc request
- default: 20s

<<<<<<< HEAD
=======
#### tikv.importer.max_kv_batch_bytes
- Maximal package size transporting from clients to TiKV Server (ingest API)
- default: 1048576 (1M)

#### tikv.importer.max_kv_batch_size
- Maximal batch size transporting from clients to TiKV Server (ingest API)
- default: 32768 (32K)

#### tikv.scatter_wait_seconds
- time to wait for scattering regions
- default: 300 (5min)

#### tikv.rawkv.default_backoff_in_ms
- RawKV default backoff in milliseconds
- default: 20000 (20 seconds)

>>>>>>> 5aebd12... add configration parameter for RawKV timeout (#246)
### Metrics Parameter

#### tikv.metrics.enable
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/org/tikv/common/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package org.tikv.common;

import org.tikv.common.util.BackOffer;
import org.tikv.kvproto.Kvrpcpb;

public class ConfigUtils {
Expand Down Expand Up @@ -54,6 +55,18 @@ public class ConfigUtils {
public static final String TIKV_HEALTH_CHECK_PERIOD_DURATION =
"tikv.health_check_period_duration";

<<<<<<< HEAD
=======
public static final String TIKV_ENABLE_ATOMIC_FOR_CAS = "tikv.enable_atomic_for_cas";

public static final String TIKV_IMPORTER_MAX_KV_BATCH_BYTES = "tikv.importer.max_kv_batch_bytes";
public static final String TIKV_IMPORTER_MAX_KV_BATCH_SIZE = "tikv.importer.max_kv_batch_size";

public static final String TIKV_SCATTER_WAIT_SECONDS = "tikv.scatter_wait_seconds";

public static final String TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS = "tikv.rawkv.default_backoff_in_ms";

>>>>>>> 5aebd12... add configration parameter for RawKV timeout (#246)
public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "200ms";
public static final String DEF_FORWARD_TIMEOUT = "300ms";
Expand Down Expand Up @@ -85,6 +98,15 @@ public class ConfigUtils {
public static final int DEF_METRICS_PORT = 3140;
public static final String DEF_TIKV_NETWORK_MAPPING_NAME = "";
public static final boolean DEF_GRPC_FORWARD_ENABLE = true;
<<<<<<< HEAD
=======
public static final boolean DEF_TIKV_ENABLE_ATOMIC_FOR_CAS = false;

public static final int DEF_TIKV_IMPORTER_MAX_KV_BATCH_BYTES = 1024 * 1024;
public static final int DEF_TIKV_IMPORTER_MAX_KV_BATCH_SIZE = 1024 * 32;
public static final int DEF_TIKV_SCATTER_WAIT_SECONDS = 300;
public static final int DEF_TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS = BackOffer.RAWKV_MAX_BACKOFF;
>>>>>>> 5aebd12... add configration parameter for RawKV timeout (#246)

public static final String NORMAL_COMMAND_PRIORITY = "NORMAL";
public static final String LOW_COMMAND_PRIORITY = "LOW";
Expand Down
64 changes: 64 additions & 0 deletions src/main/java/org/tikv/common/TiConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ private static void loadFromDefaultProperties() {
setIfMissing(TIKV_ENABLE_GRPC_FORWARD, DEF_GRPC_FORWARD_ENABLE);
setIfMissing(TIKV_GRPC_HEALTH_CHECK_TIMEOUT, DEF_CHECK_HEALTH_TIMEOUT);
setIfMissing(TIKV_HEALTH_CHECK_PERIOD_DURATION, DEF_HEALTH_CHECK_PERIOD_DURATION);
<<<<<<< HEAD
=======
setIfMissing(TIKV_ENABLE_ATOMIC_FOR_CAS, DEF_TIKV_ENABLE_ATOMIC_FOR_CAS);
setIfMissing(TIKV_IMPORTER_MAX_KV_BATCH_BYTES, DEF_TIKV_IMPORTER_MAX_KV_BATCH_BYTES);
setIfMissing(TIKV_IMPORTER_MAX_KV_BATCH_SIZE, DEF_TIKV_IMPORTER_MAX_KV_BATCH_SIZE);
setIfMissing(TIKV_SCATTER_WAIT_SECONDS, DEF_TIKV_SCATTER_WAIT_SECONDS);
setIfMissing(TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS, DEF_TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS);
>>>>>>> 5aebd12... add configration parameter for RawKV timeout (#246)
}

public static void listAll() {
Expand Down Expand Up @@ -270,6 +278,19 @@ private static ReplicaRead getReplicaRead(String key) {
private final String networkMappingName = get(TIKV_NETWORK_MAPPING_NAME);
private HostMapping hostMapping = null;

<<<<<<< HEAD
=======
private boolean enableAtomicForCAS = getBoolean(TIKV_ENABLE_ATOMIC_FOR_CAS);

private int importerMaxKVBatchBytes = getInt(TIKV_IMPORTER_MAX_KV_BATCH_BYTES);

private int importerMaxKVBatchSize = getInt(TIKV_IMPORTER_MAX_KV_BATCH_SIZE);

private int scatterWaitSeconds = getInt(TIKV_SCATTER_WAIT_SECONDS);

private int rawKVDefaultBackoffInMS = getInt(TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS);

>>>>>>> 5aebd12... add configration parameter for RawKV timeout (#246)
public enum KVMode {
TXN,
RAW
Expand Down Expand Up @@ -575,4 +596,47 @@ public long getGrpcHealthCheckTimeout() {
public long getHealthCheckPeriodDuration() {
return this.healthCheckPeriodDuration;
}
<<<<<<< HEAD
=======

public boolean isEnableAtomicForCAS() {
return enableAtomicForCAS;
}

public void setEnableAtomicForCAS(boolean enableAtomicForCAS) {
this.enableAtomicForCAS = enableAtomicForCAS;
}

public int getImporterMaxKVBatchBytes() {
return importerMaxKVBatchBytes;
}

public void setImporterMaxKVBatchBytes(int importerMaxKVBatchBytes) {
this.importerMaxKVBatchBytes = importerMaxKVBatchBytes;
}

public int getImporterMaxKVBatchSize() {
return importerMaxKVBatchSize;
}

public void setImporterMaxKVBatchSize(int importerMaxKVBatchSize) {
this.importerMaxKVBatchSize = importerMaxKVBatchSize;
}

public int getScatterWaitSeconds() {
return scatterWaitSeconds;
}

public void setScatterWaitSeconds(int scatterWaitSeconds) {
this.scatterWaitSeconds = scatterWaitSeconds;
}

public int getRawKVDefaultBackoffInMS() {
return rawKVDefaultBackoffInMS;
}

public void setRawKVDefaultBackoffInMS(int rawKVDefaultBackoffInMS) {
this.rawKVDefaultBackoffInMS = rawKVDefaultBackoffInMS;
}
>>>>>>> 5aebd12... add configration parameter for RawKV timeout (#246)
}
96 changes: 95 additions & 1 deletion src/main/java/org/tikv/raw/RawKVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,11 @@ private void batchPut(Map<ByteString, ByteString> kvPairs, long ttl, boolean ato
String label = "client_raw_batch_put";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
<<<<<<< HEAD
doSendBatchPut(ConcreteBackOffer.newRawKVBackOff(), kvPairs, ttl, atomic);
=======
doSendBatchPut(defaultBackOff(), kvPairs, ttl);
>>>>>>> 5aebd12... add configration parameter for RawKV timeout (#246)
RAW_REQUEST_SUCCESS.labels(label).inc();
} catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc();
Expand Down Expand Up @@ -615,8 +619,98 @@ public synchronized void deletePrefix(ByteString key) {
deleteRange(key, endKey);
}

<<<<<<< HEAD
private void doSendBatchPut(
BackOffer backOffer, Map<ByteString, ByteString> kvPairs, long ttl, boolean atomic) {
=======
/**
* Ingest KV pairs to RawKV using StreamKV API.
*
* @param list
*/
public synchronized void ingest(List<Pair<ByteString, ByteString>> list) {
ingest(list, null);
}

/**
* Ingest KV pairs to RawKV using StreamKV API.
*
* @param list
* @param ttl the ttl of the key (in seconds), 0 means the key will never be outdated
*/
public synchronized void ingest(List<Pair<ByteString, ByteString>> list, Long ttl)
throws GrpcException {
if (list.isEmpty()) {
return;
}

Key min = Key.MAX;
Key max = Key.MIN;
Map<ByteString, ByteString> map = new HashMap<>(list.size());

for (Pair<ByteString, ByteString> pair : list) {
map.put(pair.first, pair.second);
Key key = Key.toRawKey(pair.first.toByteArray());
if (key.compareTo(min) < 0) {
min = key;
}
if (key.compareTo(max) > 0) {
max = key;
}
}

SwitchTiKVModeClient switchTiKVModeClient = tiSession.getSwitchTiKVModeClient();

try {
// switch to normal mode
switchTiKVModeClient.switchTiKVToNormalMode();

// region split
List<byte[]> splitKeys = new ArrayList<>(2);
splitKeys.add(min.getBytes());
splitKeys.add(max.next().getBytes());
tiSession.splitRegionAndScatter(splitKeys);
tiSession.getRegionManager().invalidateAll();

// switch to import mode
switchTiKVModeClient.keepTiKVToImportMode();

// group keys by region
List<ByteString> keyList = list.stream().map(pair -> pair.first).collect(Collectors.toList());
Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion(clientBuilder.getRegionManager(), keyList, defaultBackOff());

// ingest for each region
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
TiRegion region = entry.getKey();
List<ByteString> keys = entry.getValue();
List<Pair<ByteString, ByteString>> kvs =
keys.stream().map(k -> Pair.create(k, map.get(k))).collect(Collectors.toList());
doIngest(region, kvs, ttl);
}
} finally {
// swith tikv to normal mode
switchTiKVModeClient.stopKeepTiKVToImportMode();
switchTiKVModeClient.switchTiKVToNormalMode();
}
}

private void doIngest(TiRegion region, List<Pair<ByteString, ByteString>> sortedList, Long ttl)
throws GrpcException {
if (sortedList.isEmpty()) {
return;
}

ByteString uuid = ByteString.copyFrom(genUUID());
Key minKey = Key.toRawKey(sortedList.get(0).first);
Key maxKey = Key.toRawKey(sortedList.get(sortedList.size() - 1).first);
ImporterClient importerClient =
new ImporterClient(tiSession, uuid, minKey, maxKey, region, ttl);
importerClient.rawWrite(sortedList.iterator());
}

private void doSendBatchPut(BackOffer backOffer, Map<ByteString, ByteString> kvPairs, long ttl) {
>>>>>>> 5aebd12... add configration parameter for RawKV timeout (#246)
ExecutorCompletionService<List<Batch>> completionService =
new ExecutorCompletionService<>(batchPutThreadPool);

Expand Down Expand Up @@ -866,6 +960,6 @@ private Iterator<KvPair> rawScanIterator(
}

private BackOffer defaultBackOff() {
return ConcreteBackOffer.newRawKVBackOff();
return ConcreteBackOffer.newCustomBackOff(conf.getRawKVDefaultBackoffInMS());
}
}