From 81b9815324874fd45eceab6af19093236c4dac59 Mon Sep 17 00:00:00 2001 From: Liangliang Gu Date: Thu, 5 Aug 2021 17:42:42 +0800 Subject: [PATCH] cherry pick #246 to release-3.1 Signed-off-by: ti-srebot --- README.md | 19 ++++ .../java/org/tikv/common/ConfigUtils.java | 22 +++++ .../java/org/tikv/common/TiConfiguration.java | 64 +++++++++++++ src/main/java/org/tikv/raw/RawKVClient.java | 96 ++++++++++++++++++- 4 files changed, 200 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 7617101fc91..aab5bfd720f 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,25 @@ The following includes JVM related parameters. - timeout of scan/delete range grpc request - default: 20s +<<<<<<< HEAD +======= +#### tikv.importer.max_kv_batch_bytes +- Maximal package size transporting from clients to TiKV Server (ingest API) +- default: 1048576 (1M) + +#### tikv.importer.max_kv_batch_size +- Maximal batch size transporting from clients to TiKV Server (ingest API) +- default: 32768 (32K) + +#### tikv.scatter_wait_seconds +- time to wait for scattering regions +- default: 300 (5min) + +#### tikv.rawkv.default_backoff_in_ms +- RawKV default backoff in milliseconds +- default: 20000 (20 seconds) + +>>>>>>> 5aebd12... add configration parameter for RawKV timeout (#246) ### Metrics Parameter #### tikv.metrics.enable diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index e7c497cb015..1bb7d013529 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -15,6 +15,7 @@ package org.tikv.common; +import org.tikv.common.util.BackOffer; import org.tikv.kvproto.Kvrpcpb; public class ConfigUtils { @@ -54,6 +55,18 @@ public class ConfigUtils { public static final String TIKV_HEALTH_CHECK_PERIOD_DURATION = "tikv.health_check_period_duration"; +<<<<<<< HEAD +======= + public static final String TIKV_ENABLE_ATOMIC_FOR_CAS = "tikv.enable_atomic_for_cas"; + + public static final String TIKV_IMPORTER_MAX_KV_BATCH_BYTES = "tikv.importer.max_kv_batch_bytes"; + public static final String TIKV_IMPORTER_MAX_KV_BATCH_SIZE = "tikv.importer.max_kv_batch_size"; + + public static final String TIKV_SCATTER_WAIT_SECONDS = "tikv.scatter_wait_seconds"; + + public static final String TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS = "tikv.rawkv.default_backoff_in_ms"; + +>>>>>>> 5aebd12... add configration parameter for RawKV timeout (#246) public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379"; public static final String DEF_TIMEOUT = "200ms"; public static final String DEF_FORWARD_TIMEOUT = "300ms"; @@ -85,6 +98,15 @@ public class ConfigUtils { public static final int DEF_METRICS_PORT = 3140; public static final String DEF_TIKV_NETWORK_MAPPING_NAME = ""; public static final boolean DEF_GRPC_FORWARD_ENABLE = true; +<<<<<<< HEAD +======= + public static final boolean DEF_TIKV_ENABLE_ATOMIC_FOR_CAS = false; + + public static final int DEF_TIKV_IMPORTER_MAX_KV_BATCH_BYTES = 1024 * 1024; + public static final int DEF_TIKV_IMPORTER_MAX_KV_BATCH_SIZE = 1024 * 32; + public static final int DEF_TIKV_SCATTER_WAIT_SECONDS = 300; + public static final int DEF_TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS = BackOffer.RAWKV_MAX_BACKOFF; +>>>>>>> 5aebd12... add configration parameter for RawKV timeout (#246) public static final String NORMAL_COMMAND_PRIORITY = "NORMAL"; public static final String LOW_COMMAND_PRIORITY = "LOW"; diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 578086a53f1..bf024b354e2 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -81,6 +81,14 @@ private static void loadFromDefaultProperties() { setIfMissing(TIKV_ENABLE_GRPC_FORWARD, DEF_GRPC_FORWARD_ENABLE); setIfMissing(TIKV_GRPC_HEALTH_CHECK_TIMEOUT, DEF_CHECK_HEALTH_TIMEOUT); setIfMissing(TIKV_HEALTH_CHECK_PERIOD_DURATION, DEF_HEALTH_CHECK_PERIOD_DURATION); +<<<<<<< HEAD +======= + setIfMissing(TIKV_ENABLE_ATOMIC_FOR_CAS, DEF_TIKV_ENABLE_ATOMIC_FOR_CAS); + setIfMissing(TIKV_IMPORTER_MAX_KV_BATCH_BYTES, DEF_TIKV_IMPORTER_MAX_KV_BATCH_BYTES); + setIfMissing(TIKV_IMPORTER_MAX_KV_BATCH_SIZE, DEF_TIKV_IMPORTER_MAX_KV_BATCH_SIZE); + setIfMissing(TIKV_SCATTER_WAIT_SECONDS, DEF_TIKV_SCATTER_WAIT_SECONDS); + setIfMissing(TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS, DEF_TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS); +>>>>>>> 5aebd12... add configration parameter for RawKV timeout (#246) } public static void listAll() { @@ -270,6 +278,19 @@ private static ReplicaRead getReplicaRead(String key) { private final String networkMappingName = get(TIKV_NETWORK_MAPPING_NAME); private HostMapping hostMapping = null; +<<<<<<< HEAD +======= + private boolean enableAtomicForCAS = getBoolean(TIKV_ENABLE_ATOMIC_FOR_CAS); + + private int importerMaxKVBatchBytes = getInt(TIKV_IMPORTER_MAX_KV_BATCH_BYTES); + + private int importerMaxKVBatchSize = getInt(TIKV_IMPORTER_MAX_KV_BATCH_SIZE); + + private int scatterWaitSeconds = getInt(TIKV_SCATTER_WAIT_SECONDS); + + private int rawKVDefaultBackoffInMS = getInt(TIKV_RAWKV_DEFAULT_BACKOFF_IN_MS); + +>>>>>>> 5aebd12... add configration parameter for RawKV timeout (#246) public enum KVMode { TXN, RAW @@ -575,4 +596,47 @@ public long getGrpcHealthCheckTimeout() { public long getHealthCheckPeriodDuration() { return this.healthCheckPeriodDuration; } +<<<<<<< HEAD +======= + + public boolean isEnableAtomicForCAS() { + return enableAtomicForCAS; + } + + public void setEnableAtomicForCAS(boolean enableAtomicForCAS) { + this.enableAtomicForCAS = enableAtomicForCAS; + } + + public int getImporterMaxKVBatchBytes() { + return importerMaxKVBatchBytes; + } + + public void setImporterMaxKVBatchBytes(int importerMaxKVBatchBytes) { + this.importerMaxKVBatchBytes = importerMaxKVBatchBytes; + } + + public int getImporterMaxKVBatchSize() { + return importerMaxKVBatchSize; + } + + public void setImporterMaxKVBatchSize(int importerMaxKVBatchSize) { + this.importerMaxKVBatchSize = importerMaxKVBatchSize; + } + + public int getScatterWaitSeconds() { + return scatterWaitSeconds; + } + + public void setScatterWaitSeconds(int scatterWaitSeconds) { + this.scatterWaitSeconds = scatterWaitSeconds; + } + + public int getRawKVDefaultBackoffInMS() { + return rawKVDefaultBackoffInMS; + } + + public void setRawKVDefaultBackoffInMS(int rawKVDefaultBackoffInMS) { + this.rawKVDefaultBackoffInMS = rawKVDefaultBackoffInMS; + } +>>>>>>> 5aebd12... add configration parameter for RawKV timeout (#246) } diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 12bfcaab5d3..bb02ca9422f 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -235,7 +235,11 @@ private void batchPut(Map kvPairs, long ttl, boolean ato String label = "client_raw_batch_put"; Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); try { +<<<<<<< HEAD doSendBatchPut(ConcreteBackOffer.newRawKVBackOff(), kvPairs, ttl, atomic); +======= + doSendBatchPut(defaultBackOff(), kvPairs, ttl); +>>>>>>> 5aebd12... add configration parameter for RawKV timeout (#246) RAW_REQUEST_SUCCESS.labels(label).inc(); } catch (Exception e) { RAW_REQUEST_FAILURE.labels(label).inc(); @@ -615,8 +619,98 @@ public synchronized void deletePrefix(ByteString key) { deleteRange(key, endKey); } +<<<<<<< HEAD private void doSendBatchPut( BackOffer backOffer, Map kvPairs, long ttl, boolean atomic) { +======= + /** + * Ingest KV pairs to RawKV using StreamKV API. + * + * @param list + */ + public synchronized void ingest(List> list) { + ingest(list, null); + } + + /** + * Ingest KV pairs to RawKV using StreamKV API. + * + * @param list + * @param ttl the ttl of the key (in seconds), 0 means the key will never be outdated + */ + public synchronized void ingest(List> list, Long ttl) + throws GrpcException { + if (list.isEmpty()) { + return; + } + + Key min = Key.MAX; + Key max = Key.MIN; + Map map = new HashMap<>(list.size()); + + for (Pair pair : list) { + map.put(pair.first, pair.second); + Key key = Key.toRawKey(pair.first.toByteArray()); + if (key.compareTo(min) < 0) { + min = key; + } + if (key.compareTo(max) > 0) { + max = key; + } + } + + SwitchTiKVModeClient switchTiKVModeClient = tiSession.getSwitchTiKVModeClient(); + + try { + // switch to normal mode + switchTiKVModeClient.switchTiKVToNormalMode(); + + // region split + List splitKeys = new ArrayList<>(2); + splitKeys.add(min.getBytes()); + splitKeys.add(max.next().getBytes()); + tiSession.splitRegionAndScatter(splitKeys); + tiSession.getRegionManager().invalidateAll(); + + // switch to import mode + switchTiKVModeClient.keepTiKVToImportMode(); + + // group keys by region + List keyList = list.stream().map(pair -> pair.first).collect(Collectors.toList()); + Map> groupKeys = + groupKeysByRegion(clientBuilder.getRegionManager(), keyList, defaultBackOff()); + + // ingest for each region + for (Map.Entry> entry : groupKeys.entrySet()) { + TiRegion region = entry.getKey(); + List keys = entry.getValue(); + List> kvs = + keys.stream().map(k -> Pair.create(k, map.get(k))).collect(Collectors.toList()); + doIngest(region, kvs, ttl); + } + } finally { + // swith tikv to normal mode + switchTiKVModeClient.stopKeepTiKVToImportMode(); + switchTiKVModeClient.switchTiKVToNormalMode(); + } + } + + private void doIngest(TiRegion region, List> sortedList, Long ttl) + throws GrpcException { + if (sortedList.isEmpty()) { + return; + } + + ByteString uuid = ByteString.copyFrom(genUUID()); + Key minKey = Key.toRawKey(sortedList.get(0).first); + Key maxKey = Key.toRawKey(sortedList.get(sortedList.size() - 1).first); + ImporterClient importerClient = + new ImporterClient(tiSession, uuid, minKey, maxKey, region, ttl); + importerClient.rawWrite(sortedList.iterator()); + } + + private void doSendBatchPut(BackOffer backOffer, Map kvPairs, long ttl) { +>>>>>>> 5aebd12... add configration parameter for RawKV timeout (#246) ExecutorCompletionService> completionService = new ExecutorCompletionService<>(batchPutThreadPool); @@ -866,6 +960,6 @@ private Iterator rawScanIterator( } private BackOffer defaultBackOff() { - return ConcreteBackOffer.newRawKVBackOff(); + return ConcreteBackOffer.newCustomBackOff(conf.getRawKVDefaultBackoffInMS()); } }