From e15661ac6cc988f09fb9e8619c38c77c7f221152 Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Mon, 19 Jul 2021 16:22:30 +0800 Subject: [PATCH 1/6] support RawKV ingest API Signed-off-by: marsishandsome --- README.md | 12 ++ scripts/proto.sh | 4 +- .../java/org/tikv/common/ConfigUtils.java | 9 + src/main/java/org/tikv/common/PDClient.java | 5 +- .../java/org/tikv/common/TiConfiguration.java | 41 ++++ src/main/java/org/tikv/common/TiSession.java | 64 +++++- .../tikv/common/importer/ImporterClient.java | 191 +++++++++++++++++ .../common/importer/ImporterStoreClient.java | 199 ++++++++++++++++++ .../common/importer/SwitchTiKVModeClient.java | 79 +++++++ .../org/tikv/common/region/RegionCache.java | 6 + .../tikv/common/region/RegionStoreClient.java | 1 + .../java/org/tikv/common/region/TiRegion.java | 8 +- .../java/org/tikv/common/util/BackOffer.java | 12 +- .../org/tikv/common/util/ClientUtils.java | 15 ++ src/main/java/org/tikv/raw/RawKVClient.java | 93 ++++++++ .../tikv/common/importer/RawKVIngestTest.java | 108 ++++++++++ .../tikv/common/importer/RegionSplitTest.java | 52 +++++ .../common/importer/SwitchTiKVModeTest.java | 33 +++ src/test/java/org/tikv/util/TestUtils.java | 33 +++ 19 files changed, 943 insertions(+), 22 deletions(-) create mode 100644 src/main/java/org/tikv/common/importer/ImporterClient.java create mode 100644 src/main/java/org/tikv/common/importer/ImporterStoreClient.java create mode 100644 src/main/java/org/tikv/common/importer/SwitchTiKVModeClient.java create mode 100644 src/test/java/org/tikv/common/importer/RawKVIngestTest.java create mode 100644 src/test/java/org/tikv/common/importer/RegionSplitTest.java create mode 100644 src/test/java/org/tikv/common/importer/SwitchTiKVModeTest.java create mode 100644 src/test/java/org/tikv/util/TestUtils.java diff --git a/README.md b/README.md index 832b3223702..543331ec17c 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,18 @@ The following includes JVM related parameters. - timeout of scan/delete range grpc request - default: 20s +#### tikv.importer.max_kv_batch_bytes +- Maximal package size transporting from clients to TiKV Server (ingest API) +- default: 1048576 (1M) + +#### tikv.importer.max_kv_batch_size +- Maximal batch size transporting from clients to TiKV Server (ingest API) +- default: 32768 (32K) + +#### tikv.scatter_wait_seconds +- time to wait for scattering regions +- default: 300 (5min) + ### Metrics Parameter #### tikv.metrics.enable diff --git a/scripts/proto.sh b/scripts/proto.sh index 2ddaa1205c2..b5bfbdac707 100755 --- a/scripts/proto.sh +++ b/scripts/proto.sh @@ -14,7 +14,7 @@ # limitations under the License. # -kvproto_hash=6ed99a08e262d8a32d6355dcba91cf99cb92074a +kvproto_hash=d4c03f6956e1e3f3043d61df1f3c8d30a425b9d4 raft_rs_hash=b9891b673573fad77ebcf9bbe0969cf945841926 tipb_hash=c4d518eb1d60c21f05b028b36729e64610346dac @@ -32,7 +32,7 @@ cd "$TIKV_CLIENT_HOME" || exit if [ -d "$kvproto_dir" ]; then git -C ${kvproto_dir} fetch -p else - git clone https://github.com/pingcap/kvproto ${kvproto_dir} + git clone https://github.com/andylokandy/kvproto ${kvproto_dir} fi git -C ${kvproto_dir} checkout ${kvproto_hash} diff --git a/src/main/java/org/tikv/common/ConfigUtils.java b/src/main/java/org/tikv/common/ConfigUtils.java index 8a2f8e3c99d..6616a090d5c 100644 --- a/src/main/java/org/tikv/common/ConfigUtils.java +++ b/src/main/java/org/tikv/common/ConfigUtils.java @@ -56,6 +56,11 @@ public class ConfigUtils { public static final String TIKV_ENABLE_ATOMIC_FOR_CAS = "tikv.enable_atomic_for_cas"; + public static final String TIKV_IMPORTER_MAX_KV_BATCH_BYTES = "tikv.importer.max_kv_batch_bytes"; + public static final String TIKV_IMPORTER_MAX_KV_BATCH_SIZE = "tikv.importer.max_kv_batch_size"; + + public static final String TIKV_SCATTER_WAIT_SECONDS = "tikv.scatter_wait_seconds"; + public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379"; public static final String DEF_TIMEOUT = "200ms"; public static final String DEF_FORWARD_TIMEOUT = "300ms"; @@ -89,6 +94,10 @@ public class ConfigUtils { public static final boolean DEF_GRPC_FORWARD_ENABLE = true; public static final boolean DEF_TIKV_ENABLE_ATOMIC_FOR_CAS = false; + public static final int DEF_TIKV_IMPORTER_MAX_KV_BATCH_BYTES = 1024 * 1024; + public static final int DEF_TIKV_IMPORTER_MAX_KV_BATCH_SIZE = 1024 * 32; + public static final int DEF_TIKV_SCATTER_WAIT_SECONDS = 300; + public static final String NORMAL_COMMAND_PRIORITY = "NORMAL"; public static final String LOW_COMMAND_PRIORITY = "LOW"; public static final String HIGH_COMMAND_PRIORITY = "HIGH"; diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 8d108afe2b9..9c7adc4847d 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -48,7 +48,6 @@ import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.tikv.common.TiConfiguration.KVMode; import org.tikv.common.codec.Codec.BytesCodec; import org.tikv.common.codec.CodecDataInput; import org.tikv.common.codec.CodecDataOutput; @@ -230,7 +229,7 @@ private boolean isScatterRegionFinish(GetOperatorResponse resp) { public Pair getRegionByKey(BackOffer backOffer, ByteString key) { Histogram.Timer requestTimer = PD_GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer(); try { - if (conf.getKvMode() == KVMode.TXN) { + if (conf.isTxnKVMode()) { CodecDataOutput cdo = new CodecDataOutput(); BytesCodec.writeBytes(cdo, key.toByteArray()); key = cdo.toByteString(); @@ -679,7 +678,7 @@ public String toString() { } private Metapb.Region decodeRegion(Metapb.Region region) { - final boolean isRawRegion = conf.getKvMode() == KVMode.RAW; + final boolean isRawRegion = conf.isRawKVMode(); Metapb.Region.Builder builder = Metapb.Region.newBuilder() .setId(region.getId()) diff --git a/src/main/java/org/tikv/common/TiConfiguration.java b/src/main/java/org/tikv/common/TiConfiguration.java index 5337c1e7618..168a8aeebda 100644 --- a/src/main/java/org/tikv/common/TiConfiguration.java +++ b/src/main/java/org/tikv/common/TiConfiguration.java @@ -82,6 +82,9 @@ private static void loadFromDefaultProperties() { setIfMissing(TIKV_GRPC_HEALTH_CHECK_TIMEOUT, DEF_CHECK_HEALTH_TIMEOUT); setIfMissing(TIKV_HEALTH_CHECK_PERIOD_DURATION, DEF_HEALTH_CHECK_PERIOD_DURATION); setIfMissing(TIKV_ENABLE_ATOMIC_FOR_CAS, DEF_TIKV_ENABLE_ATOMIC_FOR_CAS); + setIfMissing(TIKV_IMPORTER_MAX_KV_BATCH_BYTES, DEF_TIKV_IMPORTER_MAX_KV_BATCH_BYTES); + setIfMissing(TIKV_IMPORTER_MAX_KV_BATCH_SIZE, DEF_TIKV_IMPORTER_MAX_KV_BATCH_SIZE); + setIfMissing(TIKV_SCATTER_WAIT_SECONDS, DEF_TIKV_SCATTER_WAIT_SECONDS); } public static void listAll() { @@ -273,6 +276,12 @@ private static ReplicaRead getReplicaRead(String key) { private boolean enableAtomicForCAS = getBoolean(TIKV_ENABLE_ATOMIC_FOR_CAS); + private int importerMaxKVBatchBytes = getInt(TIKV_IMPORTER_MAX_KV_BATCH_BYTES); + + private int importerMaxKVBatchSize = getInt(TIKV_IMPORTER_MAX_KV_BATCH_SIZE); + + private int scatterWaitSeconds = getInt(TIKV_SCATTER_WAIT_SECONDS); + public enum KVMode { TXN, RAW @@ -489,6 +498,14 @@ public KVMode getKvMode() { return kvMode; } + public boolean isRawKVMode() { + return getKvMode() == TiConfiguration.KVMode.RAW; + } + + public boolean isTxnKVMode() { + return getKvMode() == KVMode.TXN; + } + public TiConfiguration setKvMode(String kvMode) { this.kvMode = KVMode.valueOf(kvMode); return this; @@ -586,4 +603,28 @@ public boolean isEnableAtomicForCAS() { public void setEnableAtomicForCAS(boolean enableAtomicForCAS) { this.enableAtomicForCAS = enableAtomicForCAS; } + + public int getImporterMaxKVBatchBytes() { + return importerMaxKVBatchBytes; + } + + public void setImporterMaxKVBatchBytes(int importerMaxKVBatchBytes) { + this.importerMaxKVBatchBytes = importerMaxKVBatchBytes; + } + + public int getImporterMaxKVBatchSize() { + return importerMaxKVBatchSize; + } + + public void setImporterMaxKVBatchSize(int importerMaxKVBatchSize) { + this.importerMaxKVBatchSize = importerMaxKVBatchSize; + } + + public int getScatterWaitSeconds() { + return scatterWaitSeconds; + } + + public void setScatterWaitSeconds(int scatterWaitSeconds) { + this.scatterWaitSeconds = scatterWaitSeconds; + } } diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 2bf1e8a94a7..a2eb062e10c 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -31,6 +31,8 @@ import org.slf4j.LoggerFactory; import org.tikv.common.catalog.Catalog; import org.tikv.common.exception.TiKVException; +import org.tikv.common.importer.ImporterStoreClient; +import org.tikv.common.importer.SwitchTiKVModeClient; import org.tikv.common.key.Key; import org.tikv.common.meta.TiTimestamp; import org.tikv.common.region.RegionManager; @@ -67,8 +69,10 @@ public class TiSession implements AutoCloseable { private volatile RegionManager regionManager; private volatile boolean enableGrpcForward; private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder; + private volatile ImporterStoreClient.ImporterStoreClientBuilder importerClientBuilder; private boolean isClosed = false; private MetricsServer metricsServer; + private static final int MAX_SPLIT_REGION_STACK_DEPTH = 6; public TiSession(TiConfiguration conf) { this.conf = conf; @@ -132,6 +136,21 @@ public RegionStoreClient.RegionStoreClientBuilder getRegionStoreClientBuilder() return res; } + public ImporterStoreClient.ImporterStoreClientBuilder getImporterRegionStoreClientBuilder() { + ImporterStoreClient.ImporterStoreClientBuilder res = importerClientBuilder; + if (res == null) { + synchronized (this) { + if (importerClientBuilder == null) { + importerClientBuilder = + new ImporterStoreClient.ImporterStoreClientBuilder( + conf, this.channelFactory, this.getRegionManager(), this.getPDClient()); + } + res = importerClientBuilder; + } + } + return res; + } + public TiConfiguration getConf() { return conf; } @@ -322,10 +341,22 @@ public ChannelFactory getChannelFactory() { return channelFactory; } + /** + * SwitchTiKVModeClient is used for SST Ingest. + * + * @return a SwitchTiKVModeClient + */ + public SwitchTiKVModeClient getSwitchTiKVModeClient() { + return new SwitchTiKVModeClient(getPDClient(), getImporterRegionStoreClientBuilder()); + } + /** * split region and scatter * * @param splitKeys + * @param splitRegionBackoffMS + * @param scatterRegionBackoffMS + * @param scatterWaitMS */ public void splitRegionAndScatter( List splitKeys, @@ -340,7 +371,7 @@ public void splitRegionAndScatter( splitRegion( splitKeys .stream() - .map(k -> Key.toRawKey(k).next().toByteString()) + .map(k -> Key.toRawKey(k).toByteString()) .collect(Collectors.toList()), ConcreteBackOffer.newCustomBackOff(splitRegionBackoffMS)); @@ -375,11 +406,28 @@ public void splitRegionAndScatter( logger.info("splitRegionAndScatter cost {} seconds", (endMS - startMS) / 1000); } + /** + * split region and scatter + * + * @param splitKeys + */ + public void splitRegionAndScatter(List splitKeys) { + int splitRegionBackoffMS = BackOffer.SPLIT_REGION_BACKOFF; + int scatterRegionBackoffMS = BackOffer.SCATTER_REGION_BACKOFF; + int scatterWaitMS = conf.getScatterWaitSeconds() * 1000; + splitRegionAndScatter(splitKeys, splitRegionBackoffMS, scatterRegionBackoffMS, scatterWaitMS); + } + private List splitRegion(List splitKeys, BackOffer backOffer) { + return splitRegion(splitKeys, backOffer, 1); + } + + private List splitRegion( + List splitKeys, BackOffer backOffer, int depth) { List regions = new ArrayList<>(); Map> groupKeys = - groupKeysByRegion(regionManager, splitKeys, backOffer); + groupKeysByRegion(getRegionManager(), splitKeys, backOffer); for (Map.Entry> entry : groupKeys.entrySet()) { Pair pair = @@ -404,9 +452,17 @@ private List splitRegion(List splitKeys, BackOffer ba } catch (final TiKVException e) { // retry logger.warn("ReSplitting ranges for splitRegion", e); - clientBuilder.getRegionManager().invalidateRegion(region); + getRegionManager().invalidateRegion(region); backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); - newRegions = splitRegion(splits, backOffer); + if (depth >= MAX_SPLIT_REGION_STACK_DEPTH) { + logger.warn( + String.format( + "Skip split region because MAX_SPLIT_REGION_STACK_DEPTH(%d) reached!", + MAX_SPLIT_REGION_STACK_DEPTH)); + newRegions = new ArrayList<>(); + } else { + newRegions = splitRegion(splits, backOffer, depth + 1); + } } logger.info("region id={}, new region size={}", region.getId(), newRegions.size()); regions.addAll(newRegions); diff --git a/src/main/java/org/tikv/common/importer/ImporterClient.java b/src/main/java/org/tikv/common/importer/ImporterClient.java new file mode 100644 index 00000000000..4bcf8d60c46 --- /dev/null +++ b/src/main/java/org/tikv/common/importer/ImporterClient.java @@ -0,0 +1,191 @@ +/* + * + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.importer; + +import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.tikv.common.TiConfiguration; +import org.tikv.common.TiSession; +import org.tikv.common.exception.GrpcException; +import org.tikv.common.key.Key; +import org.tikv.common.region.TiRegion; +import org.tikv.common.region.TiStore; +import org.tikv.common.util.Pair; +import org.tikv.kvproto.ImportSstpb; +import org.tikv.kvproto.Metapb; + +public class ImporterClient { + private TiConfiguration tiConf; + private TiSession tiSession; + private ByteString uuid; + private Key minKey; + private Key maxKey; + private TiRegion region; + private Long ttl; + + private boolean openStream = false; + private ImportSstpb.SSTMeta sstMeta; + private List clientList; + private ImporterStoreClient clientLeader; + + public ImporterClient( + TiSession tiSession, ByteString uuid, Key minKey, Key maxKey, TiRegion region, Long ttl) { + this.uuid = uuid; + this.tiConf = tiSession.getConf(); + this.tiSession = tiSession; + this.minKey = minKey; + this.maxKey = maxKey; + this.region = region; + this.ttl = ttl; + } + + /** + * write KV pairs to RawKV using KVStream interface + * + * @param iterator + */ + public void rawWrite(Iterator> iterator) throws GrpcException { + if (!tiConf.isRawKVMode()) { + throw new IllegalArgumentException("KVMode is not RAW in TiConfiguration!"); + } + + openStream = false; + + int maxKVBatchSize = tiConf.getImporterMaxKVBatchSize(); + int maxKVBatchBytes = tiConf.getImporterMaxKVBatchBytes(); + int totalBytes = 0; + while (iterator.hasNext()) { + ArrayList pairs = new ArrayList<>(maxKVBatchSize); + for (int i = 0; i < maxKVBatchSize; i++) { + if (iterator.hasNext()) { + Pair pair = iterator.next(); + pairs.add(ImportSstpb.Pair.newBuilder().setKey(pair.first).setValue(pair.second).build()); + totalBytes += (pair.first.size() + pair.second.size()); + } + if (totalBytes > maxKVBatchBytes) { + break; + } + } + if (!openStream) { + init(); + startRawWrite(); + rawWriteMeta(); + openStream = true; + } + rawWriteBatch(pairs); + } + + if (openStream) { + finishRawWrite(); + ingest(); + } + } + + private void init() { + long regionId = region.getId(); + Metapb.RegionEpoch regionEpoch = region.getRegionEpoch(); + ImportSstpb.Range range = + ImportSstpb.Range.newBuilder() + .setStart(minKey.toByteString()) + .setEnd(maxKey.toByteString()) + .build(); + + sstMeta = + ImportSstpb.SSTMeta.newBuilder() + .setUuid(uuid) + .setRegionId(regionId) + .setRegionEpoch(regionEpoch) + .setRange(range) + .build(); + + clientList = new ArrayList<>(); + for (Metapb.Peer peer : region.getPeersList()) { + long storeId = peer.getStoreId(); + TiStore store = tiSession.getRegionManager().getStoreById(storeId); + ImporterStoreClient importerStoreClient = + tiSession.getImporterRegionStoreClientBuilder().build(store); + clientList.add(importerStoreClient); + + if (region.getLeader().getStoreId() == storeId) { + clientLeader = importerStoreClient; + } + } + } + + private void startRawWrite() { + for (ImporterStoreClient client : clientList) { + client.startRawWrite(); + } + } + + private void rawWriteMeta() { + ImportSstpb.RawWriteRequest request = + ImportSstpb.RawWriteRequest.newBuilder().setMeta(sstMeta).build(); + for (ImporterStoreClient client : clientList) { + client.rawWriteBatch(request); + } + } + + private void rawWriteBatch(List pairs) { + ImportSstpb.RawWriteBatch batch; + + if (ttl == null || ttl <= 0) { + batch = ImportSstpb.RawWriteBatch.newBuilder().addAllPairs(pairs).build(); + } else { + batch = ImportSstpb.RawWriteBatch.newBuilder().addAllPairs(pairs).setTtl(ttl).build(); + } + + ImportSstpb.RawWriteRequest request = + ImportSstpb.RawWriteRequest.newBuilder().setBatch(batch).build(); + for (ImporterStoreClient client : clientList) { + client.rawWriteBatch(request); + } + } + + private void finishRawWrite() { + for (ImporterStoreClient client : clientList) { + client.finishRawWrite(); + } + } + + private void ingest() throws GrpcException { + int returnNumber = 0; + while (returnNumber < clientList.size()) { + returnNumber = 0; + for (ImporterStoreClient client : clientList) { + if (client.isRawWriteResponseReceived()) { + returnNumber++; + } else if (client.hasRawWriteResponseError()) { + throw new GrpcException(client.getRawWriteError()); + } + } + + if (returnNumber < clientList.size()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + clientLeader.multiIngest(region.getLeaderContext()); + } +} diff --git a/src/main/java/org/tikv/common/importer/ImporterStoreClient.java b/src/main/java/org/tikv/common/importer/ImporterStoreClient.java new file mode 100644 index 00000000000..a99195273b6 --- /dev/null +++ b/src/main/java/org/tikv/common/importer/ImporterStoreClient.java @@ -0,0 +1,199 @@ +/* + * + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.importer; + +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.AbstractGRPCClient; +import org.tikv.common.PDClient; +import org.tikv.common.TiConfiguration; +import org.tikv.common.exception.GrpcException; +import org.tikv.common.operation.NoopHandler; +import org.tikv.common.region.RegionManager; +import org.tikv.common.region.TiStore; +import org.tikv.common.util.BackOffer; +import org.tikv.common.util.ChannelFactory; +import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.kvproto.ImportSSTGrpc; +import org.tikv.kvproto.ImportSstpb; +import org.tikv.kvproto.Kvrpcpb; + +public class ImporterStoreClient + extends AbstractGRPCClient + implements StreamObserver { + + private static final Logger logger = LoggerFactory.getLogger(ImporterStoreClient.class); + + protected ImporterStoreClient( + TiConfiguration conf, + ChannelFactory channelFactory, + ImportSSTGrpc.ImportSSTBlockingStub blockingStub, + ImportSSTGrpc.ImportSSTStub asyncStub) { + super(conf, channelFactory, blockingStub, asyncStub); + } + + private StreamObserver streamObserverRequest; + private ImportSstpb.RawWriteResponse rawWriteResponse; + private Throwable rawWriteError; + + public synchronized boolean isRawWriteResponseReceived() { + return rawWriteResponse != null; + } + + private synchronized ImportSstpb.RawWriteResponse getRawWriteResponse() { + return rawWriteResponse; + } + + private synchronized void setRawWriteResponse(ImportSstpb.RawWriteResponse rawWriteResponse) { + this.rawWriteResponse = rawWriteResponse; + } + + public synchronized boolean hasRawWriteResponseError() { + return this.rawWriteError != null; + } + + public synchronized Throwable getRawWriteError() { + return this.rawWriteError; + } + + private synchronized void setRawWriteError(Throwable t) { + this.rawWriteError = t; + } + + @Override + public void onNext(ImportSstpb.RawWriteResponse value) { + setRawWriteResponse(value); + } + + @Override + public void onError(Throwable t) { + setRawWriteError(t); + logger.error("Error during raw write!", t); + } + + @Override + public void onCompleted() { + // do nothing + } + + /** + * Ingest KV pairs to RawKV using gRPC streaming mode. This API should be called on both leader + * and followers. + * + * @return + */ + public void startRawWrite() { + streamObserverRequest = getAsyncStub().rawWrite(this); + } + + /** + * This API should be called after `startRawWrite`. + * + * @param request + */ + public void rawWriteBatch(ImportSstpb.RawWriteRequest request) { + streamObserverRequest.onNext(request); + } + + /** This API should be called after `rawWriteBatch`. */ + public void finishRawWrite() { + streamObserverRequest.onCompleted(); + } + + /** + * This API should be called after `finishRawWrite`. This API should be called onn leader only. + * + * @param ctx + */ + public void multiIngest(Kvrpcpb.Context ctx) { + List metasList = getRawWriteResponse().getMetasList(); + + ImportSstpb.MultiIngestRequest request = + ImportSstpb.MultiIngestRequest.newBuilder().setContext(ctx).addAllSsts(metasList).build(); + + ImportSstpb.IngestResponse response = getBlockingStub().multiIngest(request); + if (response.hasError()) { + throw new GrpcException("" + response.getError()); + } + } + + public void switchMode(ImportSstpb.SwitchMode mode) { + Supplier request = + () -> ImportSstpb.SwitchModeRequest.newBuilder().setMode(mode).build(); + NoopHandler noopHandler = new NoopHandler<>(); + + callWithRetry( + ConcreteBackOffer.newCustomBackOff(BackOffer.TIKV_SWITCH_MODE_BACKOFF), + ImportSSTGrpc.getSwitchModeMethod(), + request, + noopHandler); + } + + @Override + protected ImportSSTGrpc.ImportSSTBlockingStub getBlockingStub() { + return blockingStub.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); + } + + @Override + protected ImportSSTGrpc.ImportSSTStub getAsyncStub() { + return asyncStub.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); + } + + @Override + public void close() throws Exception {} + + public static class ImporterStoreClientBuilder { + private final TiConfiguration conf; + private final ChannelFactory channelFactory; + private final RegionManager regionManager; + private final PDClient pdClient; + + public ImporterStoreClientBuilder( + TiConfiguration conf, + ChannelFactory channelFactory, + RegionManager regionManager, + PDClient pdClient) { + Objects.requireNonNull(conf, "conf is null"); + Objects.requireNonNull(channelFactory, "channelFactory is null"); + Objects.requireNonNull(regionManager, "regionManager is null"); + this.conf = conf; + this.channelFactory = channelFactory; + this.regionManager = regionManager; + this.pdClient = pdClient; + } + + public synchronized ImporterStoreClient build(TiStore store) throws GrpcException { + Objects.requireNonNull(store, "store is null"); + + String addressStr = store.getStore().getAddress(); + logger.debug(String.format("Create region store client on address %s", addressStr)); + + ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); + ImportSSTGrpc.ImportSSTBlockingStub blockingStub = ImportSSTGrpc.newBlockingStub(channel); + ImportSSTGrpc.ImportSSTStub asyncStub = ImportSSTGrpc.newStub(channel); + + return new ImporterStoreClient(conf, channelFactory, blockingStub, asyncStub); + } + } +} diff --git a/src/main/java/org/tikv/common/importer/SwitchTiKVModeClient.java b/src/main/java/org/tikv/common/importer/SwitchTiKVModeClient.java new file mode 100644 index 00000000000..e230217e435 --- /dev/null +++ b/src/main/java/org/tikv/common/importer/SwitchTiKVModeClient.java @@ -0,0 +1,79 @@ +/* + * + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.importer; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.tikv.common.PDClient; +import org.tikv.common.region.TiStore; +import org.tikv.common.util.BackOffer; +import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.kvproto.ImportSstpb; +import org.tikv.kvproto.Metapb; + +public class SwitchTiKVModeClient { + private static final int IMPORT_MODE_TIMEOUT = 600; + private static final int KEEP_TIKV_TO_IMPORT_MODE_PERIOD = IMPORT_MODE_TIMEOUT / 5; + + private final PDClient pdClient; + private final ImporterStoreClient.ImporterStoreClientBuilder builder; + + private final ScheduledExecutorService ingestScheduledExecutorService; + + public SwitchTiKVModeClient( + PDClient pdClient, ImporterStoreClient.ImporterStoreClientBuilder builder) { + this.pdClient = pdClient; + this.builder = builder; + + this.ingestScheduledExecutorService = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("switch-tikv-mode-pool-%d") + .setDaemon(true) + .build()); + } + + public void switchTiKVToNormalMode() { + doSwitchTiKVMode(ImportSstpb.SwitchMode.Normal); + } + + public void keepTiKVToImportMode() { + ingestScheduledExecutorService.scheduleAtFixedRate( + this::switchTiKVToImportMode, 0, KEEP_TIKV_TO_IMPORT_MODE_PERIOD, TimeUnit.SECONDS); + } + + public void stopKeepTiKVToImportMode() { + ingestScheduledExecutorService.shutdown(); + } + + private void switchTiKVToImportMode() { + doSwitchTiKVMode(ImportSstpb.SwitchMode.Import); + } + + private void doSwitchTiKVMode(ImportSstpb.SwitchMode mode) { + BackOffer bo = ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF); + List allStores = pdClient.getAllStores(bo); + for (Metapb.Store store : allStores) { + ImporterStoreClient client = builder.build(new TiStore(store)); + client.switchMode(mode); + } + } +} diff --git a/src/main/java/org/tikv/common/region/RegionCache.java b/src/main/java/org/tikv/common/region/RegionCache.java index a52fc716f98..77f41010a67 100644 --- a/src/main/java/org/tikv/common/region/RegionCache.java +++ b/src/main/java/org/tikv/common/region/RegionCache.java @@ -29,6 +29,12 @@ public RegionCache() { keyToRegionIdCache = TreeRangeMap.create(); } + public synchronized void invalidateAll() { + regionCache.clear(); + storeCache.clear(); + keyToRegionIdCache.clear(); + } + public synchronized TiRegion getRegionByKey(ByteString key, BackOffer backOffer) { Long regionId; if (key.isEmpty()) { diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index 19eed19531c..bfe655e9129 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -753,6 +753,7 @@ public List splitRegion(Iterable splitKeys) { SplitRegionRequest.newBuilder() .setContext(makeContext(storeType)) .addAllSplitKeys(splitKeys) + .setIsRawKv(conf.isRawKVMode()) .build(); KVErrorHandler handler = diff --git a/src/main/java/org/tikv/common/region/TiRegion.java b/src/main/java/org/tikv/common/region/TiRegion.java index b53557f9306..2f34f1064d2 100644 --- a/src/main/java/org/tikv/common/region/TiRegion.java +++ b/src/main/java/org/tikv/common/region/TiRegion.java @@ -108,6 +108,10 @@ public List getLearnerList() { return peers; } + public List getPeersList() { + return getMeta().getPeersList(); + } + public Peer getCurrentReplica() { return replicaList.get(replicaIdx); } @@ -137,10 +141,6 @@ public ByteString getEndKey() { return meta.getEndKey(); } - public Key getRowEndKey() { - return Key.toRawKey(getEndKey()); - } - public Kvrpcpb.Context getLeaderContext() { return getContext(this.leader, java.util.Collections.emptySet(), false); } diff --git a/src/main/java/org/tikv/common/util/BackOffer.java b/src/main/java/org/tikv/common/util/BackOffer.java index 06f73d36e5d..9cdf39ba0d7 100644 --- a/src/main/java/org/tikv/common/util/BackOffer.java +++ b/src/main/java/org/tikv/common/util/BackOffer.java @@ -26,17 +26,11 @@ public interface BackOffer { int BATCH_GET_MAX_BACKOFF = 40 * seconds; int COP_NEXT_MAX_BACKOFF = 40 * seconds; int GET_MAX_BACKOFF = 40 * seconds; - int PREWRITE_MAX_BACKOFF = 20 * seconds; - int CLEANUP_MAX_BACKOFF = 20 * seconds; - int GC_ONE_REGION_MAX_BACKOFF = 20 * seconds; - int GC_RESOLVE_LOCK_MAX_BACKOFF = 100 * seconds; - int GC_DELETE_RANGE_MAX_BACKOFF = 100 * seconds; - int RAWKV_MAX_BACKOFF = 20 * seconds; - - int SPLIT_REGION_BACKOFF = 20 * seconds; - int BATCH_COMMIT_BACKOFF = 10 * seconds; int PD_INFO_BACKOFF = 5 * seconds; + int TIKV_SWITCH_MODE_BACKOFF = seconds; + int SPLIT_REGION_BACKOFF = 12000; + int SCATTER_REGION_BACKOFF = 30000; /** * doBackOff sleeps a while base on the BackOffType and records the error message. Will stop until diff --git a/src/main/java/org/tikv/common/util/ClientUtils.java b/src/main/java/org/tikv/common/util/ClientUtils.java index e85babf608b..22783d7cf34 100644 --- a/src/main/java/org/tikv/common/util/ClientUtils.java +++ b/src/main/java/org/tikv/common/util/ClientUtils.java @@ -213,4 +213,19 @@ public static List getTasksWithOutput( throw new TiKVException("Execution exception met.", e); } } + + public static byte[] genUUID() { + UUID uuid = UUID.randomUUID(); + + byte[] out = new byte[16]; + long msb = uuid.getMostSignificantBits(); + long lsb = uuid.getLeastSignificantBits(); + for (int i = 0; i < 8; i++) { + out[i] = (byte) ((msb >> ((7 - i) * 8)) & 0xff); + } + for (int i = 8; i < 16; i++) { + out[i] = (byte) ((lsb >> ((15 - i) * 8)) & 0xff); + } + return out; + } } diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index 79974b60c98..f80c8e4bd07 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -27,8 +27,11 @@ import org.slf4j.LoggerFactory; import org.tikv.common.TiConfiguration; import org.tikv.common.TiSession; +import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.RawCASConflictException; import org.tikv.common.exception.TiKVException; +import org.tikv.common.importer.ImporterClient; +import org.tikv.common.importer.SwitchTiKVModeClient; import org.tikv.common.key.Key; import org.tikv.common.operation.iterator.RawScanIterator; import org.tikv.common.region.RegionStoreClient; @@ -38,6 +41,7 @@ import org.tikv.kvproto.Kvrpcpb.KvPair; public class RawKVClient implements AutoCloseable { + private final TiSession tiSession; private final RegionStoreClientBuilder clientBuilder; private final TiConfiguration conf; private final boolean atomicForCAS; @@ -85,6 +89,7 @@ public RawKVClient(TiSession session, RegionStoreClientBuilder clientBuilder) { Objects.requireNonNull(session, "session is null"); Objects.requireNonNull(clientBuilder, "clientBuilder is null"); this.conf = session.getConf(); + this.tiSession = session; this.clientBuilder = clientBuilder; this.batchGetThreadPool = session.getThreadPoolForBatchGet(); this.batchPutThreadPool = session.getThreadPoolForBatchPut(); @@ -639,6 +644,94 @@ public synchronized void deletePrefix(ByteString key) { deleteRange(key, endKey); } + /** + * Ingest KV pairs to RawKV using StreamKV API. + * + * @param list + */ + public synchronized void ingest(List> list) { + ingest(list, null); + } + + /** + * Ingest KV pairs to RawKV using StreamKV API. + * + * @param list + * @param ttl the ttl of the key (in seconds), 0 means the key will never be outdated + */ + public synchronized void ingest(List> list, Long ttl) + throws GrpcException { + if (list.isEmpty()) { + return; + } + + Key min = Key.MAX; + Key max = Key.MIN; + Map map = new HashMap<>(list.size()); + + for (Pair pair : list) { + map.put(pair.first, pair.second); + Key key = Key.toRawKey(pair.first.toByteArray()); + if (key.compareTo(min) < 0) { + min = key; + } + if (key.compareTo(max) > 0) { + max = key; + } + } + + SwitchTiKVModeClient switchTiKVModeClient = tiSession.getSwitchTiKVModeClient(); + + try { + // switch to normal mode + switchTiKVModeClient.switchTiKVToNormalMode(); + + // region split + List splitKeys = new ArrayList<>(2); + splitKeys.add(min.getBytes()); + splitKeys.add(max.next().getBytes()); + tiSession.splitRegionAndScatter(splitKeys); + tiSession.getRegionManager().invalidateAll(); + + // switch to import mode + switchTiKVModeClient.keepTiKVToImportMode(); + + // group keys by region + List sortedKeyList = + list.stream().map(pair -> pair.first).collect(Collectors.toList()); + Map> groupKeys = + groupKeysByRegion( + clientBuilder.getRegionManager(), sortedKeyList, ConcreteBackOffer.newRawKVBackOff()); + + // ingest for each region + for (Map.Entry> entry : groupKeys.entrySet()) { + TiRegion region = entry.getKey(); + List keys = entry.getValue(); + List> kvs = + keys.stream().map(k -> Pair.create(k, map.get(k))).collect(Collectors.toList()); + doIngest(region, kvs, ttl); + } + } finally { + // swith tikv to normal mode + switchTiKVModeClient.stopKeepTiKVToImportMode(); + switchTiKVModeClient.switchTiKVToNormalMode(); + } + } + + private void doIngest(TiRegion region, List> sortedList, Long ttl) + throws GrpcException { + if (sortedList.isEmpty()) { + return; + } + + ByteString uuid = ByteString.copyFrom(genUUID()); + Key minKey = Key.toRawKey(sortedList.get(0).first); + Key maxKey = Key.toRawKey(sortedList.get(sortedList.size() - 1).first); + ImporterClient importerClient = + new ImporterClient(tiSession, uuid, minKey, maxKey, region, ttl); + importerClient.rawWrite(sortedList.iterator()); + } + private void doSendBatchPut(BackOffer backOffer, Map kvPairs, long ttl) { ExecutorCompletionService> completionService = new ExecutorCompletionService<>(batchPutThreadPool); diff --git a/src/test/java/org/tikv/common/importer/RawKVIngestTest.java b/src/test/java/org/tikv/common/importer/RawKVIngestTest.java new file mode 100644 index 00000000000..94bc7856b2a --- /dev/null +++ b/src/test/java/org/tikv/common/importer/RawKVIngestTest.java @@ -0,0 +1,108 @@ +package org.tikv.common.importer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.tikv.common.TiConfiguration; +import org.tikv.common.TiSession; +import org.tikv.common.key.Key; +import org.tikv.common.util.Pair; +import org.tikv.raw.RawKVClient; +import org.tikv.util.TestUtils; + +public class RawKVIngestTest { + private TiSession session; + + private static final int KEY_NUMBER = 16; + private static final String KEY_PREFIX = "prefix_rawkv_ingest_test_"; + private static final int KEY_LENGTH = KEY_PREFIX.length() + 10; + private static final int VALUE_LENGTH = 16; + + @Before + public void setup() { + TiConfiguration conf = TiConfiguration.createRawDefault(); + session = TiSession.create(conf); + } + + @After + public void tearDown() throws Exception { + if (session != null) { + session.close(); + } + } + + @Test + public void rawKVIngestTest() { + RawKVClient client = session.createRawClient(); + + // gen test data + List> sortedList = new ArrayList<>(); + for (int i = 0; i < KEY_NUMBER; i++) { + byte[] key = TestUtils.genRandomKey(KEY_PREFIX, KEY_LENGTH); + byte[] value = TestUtils.genRandomValue(VALUE_LENGTH); + sortedList.add(Pair.create(ByteString.copyFrom(key), ByteString.copyFrom(value))); + } + sortedList.sort( + (o1, o2) -> { + Key k1 = Key.toRawKey(o1.first.toByteArray()); + Key k2 = Key.toRawKey(o2.first.toByteArray()); + return k1.compareTo(k2); + }); + + // ingest + client.ingest(sortedList); + + // assert + for (Pair pair : sortedList) { + Optional v = client.get(pair.first); + assertTrue(v.isPresent()); + assertEquals(v.get(), pair.second); + } + } + + @Test + public void rawKVIngestTestWithTTL() throws InterruptedException { + long ttl = 10; + RawKVClient client = session.createRawClient(); + + // gen test data + List> sortedList = new ArrayList<>(); + for (int i = 0; i < KEY_NUMBER; i++) { + byte[] key = TestUtils.genRandomKey(KEY_PREFIX, KEY_LENGTH); + byte[] value = TestUtils.genRandomValue(VALUE_LENGTH); + sortedList.add(Pair.create(ByteString.copyFrom(key), ByteString.copyFrom(value))); + } + sortedList.sort( + (o1, o2) -> { + Key k1 = Key.toRawKey(o1.first.toByteArray()); + Key k2 = Key.toRawKey(o2.first.toByteArray()); + return k1.compareTo(k2); + }); + + // ingest + client.ingest(sortedList, ttl); + + // assert + for (Pair pair : sortedList) { + Optional v = client.get(pair.first); + assertTrue(v.isPresent()); + assertEquals(v.get(), pair.second); + } + + // sleep + Thread.sleep(ttl * 2 * 1000); + // assert + for (Pair pair : sortedList) { + Optional v = client.get(pair.first); + assertFalse(v.isPresent()); + } + } +} diff --git a/src/test/java/org/tikv/common/importer/RegionSplitTest.java b/src/test/java/org/tikv/common/importer/RegionSplitTest.java new file mode 100644 index 00000000000..12186956d26 --- /dev/null +++ b/src/test/java/org/tikv/common/importer/RegionSplitTest.java @@ -0,0 +1,52 @@ +package org.tikv.common.importer; + +import static org.junit.Assert.assertArrayEquals; +import static org.tikv.util.TestUtils.genRandomKey; + +import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.List; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.tikv.common.TiConfiguration; +import org.tikv.common.TiSession; +import org.tikv.common.region.TiRegion; + +public class RegionSplitTest { + private TiSession session; + + private static final int KEY_NUMBER = 10; + private static final String KEY_PREFIX = "prefix_region_split_test_"; + private static final int KEY_LENGTH = KEY_PREFIX.length() + 10; + + @Before + public void setup() { + TiConfiguration conf = TiConfiguration.createRawDefault(); + session = TiSession.create(conf); + } + + @After + public void tearDown() throws Exception { + if (session != null) { + session.close(); + } + } + + @Test + public void rawKVSplitTest() { + List splitKeys = new ArrayList<>(KEY_NUMBER); + for (int i = 0; i < KEY_NUMBER; i++) { + splitKeys.add(genRandomKey(KEY_PREFIX, KEY_LENGTH)); + } + + session.splitRegionAndScatter(splitKeys); + session.getRegionManager().invalidateAll(); + + for (int i = 0; i < KEY_NUMBER; i++) { + byte[] key = splitKeys.get(i); + TiRegion region = session.getRegionManager().getRegionByKey(ByteString.copyFrom(key)); + assertArrayEquals(key, region.getStartKey().toByteArray()); + } + } +} diff --git a/src/test/java/org/tikv/common/importer/SwitchTiKVModeTest.java b/src/test/java/org/tikv/common/importer/SwitchTiKVModeTest.java new file mode 100644 index 00000000000..96d6483501e --- /dev/null +++ b/src/test/java/org/tikv/common/importer/SwitchTiKVModeTest.java @@ -0,0 +1,33 @@ +package org.tikv.common.importer; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.tikv.common.TiConfiguration; +import org.tikv.common.TiSession; + +public class SwitchTiKVModeTest { + private TiSession session; + + @Before + public void setup() { + TiConfiguration conf = TiConfiguration.createRawDefault(); + session = TiSession.create(conf); + } + + @After + public void tearDown() throws Exception { + if (session != null) { + session.close(); + } + } + + @Test + public void swithTiKVModeTest() throws InterruptedException { + SwitchTiKVModeClient switchTiKVModeClient = session.getSwitchTiKVModeClient(); + switchTiKVModeClient.keepTiKVToImportMode(); + Thread.sleep(6000); + switchTiKVModeClient.stopKeepTiKVToImportMode(); + switchTiKVModeClient.switchTiKVToNormalMode(); + } +} diff --git a/src/test/java/org/tikv/util/TestUtils.java b/src/test/java/org/tikv/util/TestUtils.java new file mode 100644 index 00000000000..1dd59c8f4d5 --- /dev/null +++ b/src/test/java/org/tikv/util/TestUtils.java @@ -0,0 +1,33 @@ +package org.tikv.util; + +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; + +public class TestUtils { + public static byte[] genRandomKey(String keyPrefix, int keyLength) { + int length = keyLength - keyPrefix.length(); + if (length <= 0) { + length = 0; + } + return (keyPrefix + genRandomString(length)).getBytes(); + } + + public static byte[] genRandomValue(int length) { + return genRandomString(length).getBytes(); + } + + private static String genRandomString(int length) { + Random rnd = ThreadLocalRandom.current(); + StringBuilder ret = new StringBuilder(length); + for (int i = 0; i < length; i++) { + boolean isChar = (rnd.nextInt(2) % 2 == 0); + if (isChar) { + int choice = rnd.nextInt(2) % 2 == 0 ? 65 : 97; + ret.append((char) (choice + rnd.nextInt(26))); + } else { + ret.append(rnd.nextInt(10)); + } + } + return ret.toString(); + } +} From 79a6e9e654138bb02384ad109c23b065ee689167 Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Tue, 20 Jul 2021 17:51:58 +0800 Subject: [PATCH 2/6] address code review Signed-off-by: marsishandsome --- .../java/org/tikv/common/importer/ImporterClient.java | 10 +++++----- .../org/tikv/common/importer/ImporterStoreClient.java | 2 +- src/main/java/org/tikv/raw/RawKVClient.java | 5 ++--- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/tikv/common/importer/ImporterClient.java b/src/main/java/org/tikv/common/importer/ImporterClient.java index 4bcf8d60c46..b69b0aa5cb9 100644 --- a/src/main/java/org/tikv/common/importer/ImporterClient.java +++ b/src/main/java/org/tikv/common/importer/ImporterClient.java @@ -40,7 +40,7 @@ public class ImporterClient { private TiRegion region; private Long ttl; - private boolean openStream = false; + private boolean streamOpened = false; private ImportSstpb.SSTMeta sstMeta; private List clientList; private ImporterStoreClient clientLeader; @@ -66,7 +66,7 @@ public void rawWrite(Iterator> iterator) throws Grp throw new IllegalArgumentException("KVMode is not RAW in TiConfiguration!"); } - openStream = false; + streamOpened = false; int maxKVBatchSize = tiConf.getImporterMaxKVBatchSize(); int maxKVBatchBytes = tiConf.getImporterMaxKVBatchBytes(); @@ -83,16 +83,16 @@ public void rawWrite(Iterator> iterator) throws Grp break; } } - if (!openStream) { + if (!streamOpened) { init(); startRawWrite(); rawWriteMeta(); - openStream = true; + streamOpened = true; } rawWriteBatch(pairs); } - if (openStream) { + if (streamOpened) { finishRawWrite(); ingest(); } diff --git a/src/main/java/org/tikv/common/importer/ImporterStoreClient.java b/src/main/java/org/tikv/common/importer/ImporterStoreClient.java index a99195273b6..db36966611d 100644 --- a/src/main/java/org/tikv/common/importer/ImporterStoreClient.java +++ b/src/main/java/org/tikv/common/importer/ImporterStoreClient.java @@ -122,7 +122,7 @@ public void finishRawWrite() { } /** - * This API should be called after `finishRawWrite`. This API should be called onn leader only. + * This API should be called after `finishRawWrite`. This API should be called on leader only. * * @param ctx */ diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index f80c8e4bd07..b87d2fe4582 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -697,11 +697,10 @@ public synchronized void ingest(List> list, Long tt switchTiKVModeClient.keepTiKVToImportMode(); // group keys by region - List sortedKeyList = - list.stream().map(pair -> pair.first).collect(Collectors.toList()); + List keyList = list.stream().map(pair -> pair.first).collect(Collectors.toList()); Map> groupKeys = groupKeysByRegion( - clientBuilder.getRegionManager(), sortedKeyList, ConcreteBackOffer.newRawKVBackOff()); + clientBuilder.getRegionManager(), keyList, ConcreteBackOffer.newRawKVBackOff()); // ingest for each region for (Map.Entry> entry : groupKeys.entrySet()) { From 66cc9794125391d0745f24025561c961e766981a Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Wed, 21 Jul 2021 11:46:04 +0800 Subject: [PATCH 3/6] address code review Signed-off-by: marsishandsome --- .../org/tikv/common/importer/ImporterClient.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/tikv/common/importer/ImporterClient.java b/src/main/java/org/tikv/common/importer/ImporterClient.java index b69b0aa5cb9..99494b5205f 100644 --- a/src/main/java/org/tikv/common/importer/ImporterClient.java +++ b/src/main/java/org/tikv/common/importer/ImporterClient.java @@ -166,18 +166,19 @@ private void finishRawWrite() { } private void ingest() throws GrpcException { - int returnNumber = 0; - while (returnNumber < clientList.size()) { - returnNumber = 0; - for (ImporterStoreClient client : clientList) { + List workingClients = new ArrayList<>(clientList); + while (!workingClients.isEmpty()) { + Iterator itor = workingClients.iterator(); + while (itor.hasNext()) { + ImporterStoreClient client = itor.next(); if (client.isRawWriteResponseReceived()) { - returnNumber++; + itor.remove(); } else if (client.hasRawWriteResponseError()) { throw new GrpcException(client.getRawWriteError()); } } - if (returnNumber < clientList.size()) { + if (!workingClients.isEmpty()) { try { Thread.sleep(1000); } catch (InterruptedException e) { From 92471c2e9250b15ef4d9b9a336a612cc26f5295e Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Wed, 21 Jul 2021 11:51:12 +0800 Subject: [PATCH 4/6] address code review Signed-off-by: marsishandsome --- src/main/java/org/tikv/common/TiSession.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index a2eb062e10c..63a07ed9a40 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -449,6 +449,8 @@ private List splitRegion( List newRegions; try { newRegions = getRegionStoreClientBuilder().build(region, store).splitRegion(splits); + // invalidate old region + getRegionManager().invalidateRegion(region); } catch (final TiKVException e) { // retry logger.warn("ReSplitting ranges for splitRegion", e); From 4945902bb198ec3a03cc96432efc39bd8af1c0bb Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Thu, 22 Jul 2021 10:16:11 +0800 Subject: [PATCH 5/6] fix Signed-off-by: marsishandsome --- scripts/proto.sh | 4 ++-- src/main/java/org/tikv/common/region/RegionManager.java | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/scripts/proto.sh b/scripts/proto.sh index b5bfbdac707..8d65d7dd2f9 100755 --- a/scripts/proto.sh +++ b/scripts/proto.sh @@ -14,7 +14,7 @@ # limitations under the License. # -kvproto_hash=d4c03f6956e1e3f3043d61df1f3c8d30a425b9d4 +kvproto_hash=2ac2a7984b2d01b96ed56fd8474f4bf80fa33c51 raft_rs_hash=b9891b673573fad77ebcf9bbe0969cf945841926 tipb_hash=c4d518eb1d60c21f05b028b36729e64610346dac @@ -32,7 +32,7 @@ cd "$TIKV_CLIENT_HOME" || exit if [ -d "$kvproto_dir" ]; then git -C ${kvproto_dir} fetch -p else - git clone https://github.com/andylokandy/kvproto ${kvproto_dir} + git clone https://github.com/pingcap/kvproto ${kvproto_dir} fi git -C ${kvproto_dir} checkout ${kvproto_hash} diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 36a3cc2474a..8bbff1cf32b 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -89,6 +89,10 @@ public ReadOnlyPDClient getPDClient() { return this.pdClient; } + public void invalidateAll() { + cache.invalidateAll(); + } + public TiRegion getRegionByKey(ByteString key) { return getRegionByKey(key, ConcreteBackOffer.newGetBackOff()); } From 9fc01feeba96ac1bd4f343cfd724109658bc3fbb Mon Sep 17 00:00:00 2001 From: marsishandsome Date: Thu, 22 Jul 2021 10:18:18 +0800 Subject: [PATCH 6/6] test with tikv master branch Signed-off-by: marsishandsome --- .ci/integration_test.groovy | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.ci/integration_test.groovy b/.ci/integration_test.groovy index 2d7a28ff4ea..ebe197b0d36 100644 --- a/.ci/integration_test.groovy +++ b/.ci/integration_test.groovy @@ -1,8 +1,8 @@ def call(ghprbActualCommit, ghprbPullId, ghprbPullTitle, ghprbPullLink, ghprbPullDescription, credentialsId) { - def TIDB_BRANCH = "release-5.0" - def TIKV_BRANCH = "release-5.0" - def PD_BRANCH = "release-5.0" + def TIDB_BRANCH = "master" + def TIKV_BRANCH = "master" + def PD_BRANCH = "master" // parse tidb branch def m1 = ghprbCommentBody =~ /tidb\s*=\s*([^\s\\]+)(\s|\\|$)/