From c58ece7d1b6d81aa28cb5a0b1a5ec49859f5a0b6 Mon Sep 17 00:00:00 2001 From: abingcbc Date: Tue, 17 Aug 2021 16:12:57 +0800 Subject: [PATCH 1/2] feat: txn ingest Signed-off-by: abingcbc --- src/main/java/org/tikv/common/TiSession.java | 23 ++- .../common/importer/TxnImporterClient.java | 186 ++++++++++++++++++ .../importer/TxnImporterStoreClient.java | 167 ++++++++++++++++ src/main/java/org/tikv/txn/KVClient.java | 86 +++++++- .../tikv/common/importer/TxnKVIngestTest.java | 69 +++++++ 5 files changed, 525 insertions(+), 6 deletions(-) create mode 100644 src/main/java/org/tikv/common/importer/TxnImporterClient.java create mode 100644 src/main/java/org/tikv/common/importer/TxnImporterStoreClient.java create mode 100644 src/test/java/org/tikv/common/importer/TxnKVIngestTest.java diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 9618562b101..729e8821c7a 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -33,6 +33,7 @@ import org.tikv.common.exception.TiKVException; import org.tikv.common.importer.ImporterStoreClient; import org.tikv.common.importer.SwitchTiKVModeClient; +import org.tikv.common.importer.TxnImporterStoreClient; import org.tikv.common.key.Key; import org.tikv.common.meta.TiTimestamp; import org.tikv.common.region.RegionManager; @@ -70,6 +71,8 @@ public class TiSession implements AutoCloseable { private volatile boolean enableGrpcForward; private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder; private volatile ImporterStoreClient.ImporterStoreClientBuilder importerClientBuilder; + private volatile TxnImporterStoreClient.TxnImporterStoreClientBuilder + txnImporterStoreClientBuilder; private volatile boolean isClosed = false; private volatile SwitchTiKVModeClient switchTiKVModeClient; private MetricsServer metricsServer; @@ -119,7 +122,7 @@ public KVClient createKVClient() { RegionStoreClientBuilder builder = new RegionStoreClientBuilder(conf, channelFactory, this.getRegionManager(), client); - return new KVClient(conf, builder); + return new KVClient(conf, builder, this); } public TxnKVClient createTxnClient() { @@ -162,6 +165,24 @@ public ImporterStoreClient.ImporterStoreClientBuilder getImporterRegionStoreClie return res; } + public TxnImporterStoreClient.TxnImporterStoreClientBuilder + getTxnImporterRegionStoreClientBuilder() { + checkIsClosed(); + + TxnImporterStoreClient.TxnImporterStoreClientBuilder res = txnImporterStoreClientBuilder; + if (res == null) { + synchronized (this) { + if (txnImporterStoreClientBuilder == null) { + txnImporterStoreClientBuilder = + new TxnImporterStoreClient.TxnImporterStoreClientBuilder( + conf, this.channelFactory, this.getRegionManager(), this.getPDClient()); + } + res = txnImporterStoreClientBuilder; + } + } + return res; + } + public TiConfiguration getConf() { return conf; } diff --git a/src/main/java/org/tikv/common/importer/TxnImporterClient.java b/src/main/java/org/tikv/common/importer/TxnImporterClient.java new file mode 100644 index 00000000000..eef09aad208 --- /dev/null +++ b/src/main/java/org/tikv/common/importer/TxnImporterClient.java @@ -0,0 +1,186 @@ +/* + * + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.importer; + +import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.tikv.common.TiConfiguration; +import org.tikv.common.TiSession; +import org.tikv.common.codec.Codec; +import org.tikv.common.codec.CodecDataOutput; +import org.tikv.common.exception.GrpcException; +import org.tikv.common.key.Key; +import org.tikv.common.region.TiRegion; +import org.tikv.common.region.TiStore; +import org.tikv.common.util.Pair; +import org.tikv.kvproto.ImportSstpb; +import org.tikv.kvproto.Metapb; + +public class TxnImporterClient { + private TiConfiguration tiConf; + private TiSession tiSession; + private ByteString uuid; + private Key minKey; + private Key maxKey; + private TiRegion region; + + private boolean streamOpened = false; + private ImportSstpb.SSTMeta sstMeta; + private List clientList; + private TxnImporterStoreClient clientLeader; + + public TxnImporterClient( + TiSession tiSession, ByteString uuid, Key minKey, Key maxKey, TiRegion region) { + this.uuid = uuid; + this.tiConf = tiSession.getConf(); + this.tiSession = tiSession; + this.minKey = minKey; + this.maxKey = maxKey; + this.region = region; + } + + public void txnWrite(Iterator> iterator) throws GrpcException { + + streamOpened = false; + + int maxKVBatchSize = tiConf.getImporterMaxKVBatchSize(); + int maxKVBatchBytes = tiConf.getImporterMaxKVBatchBytes(); + int totalBytes = 0; + while (iterator.hasNext()) { + ArrayList pairs = new ArrayList<>(maxKVBatchSize); + for (int i = 0; i < maxKVBatchSize; i++) { + if (iterator.hasNext()) { + Pair pair = iterator.next(); + pairs.add(ImportSstpb.Pair.newBuilder().setKey(pair.first).setValue(pair.second).build()); + totalBytes += (pair.first.size() + pair.second.size()); + } + if (totalBytes > maxKVBatchBytes) { + break; + } + } + if (!streamOpened) { + init(); + startTxnWrite(); + txnWriteMeta(); + streamOpened = true; + } + txnWriteBatch(pairs); + } + + if (streamOpened) { + finishTxnWrite(); + ingest(); + } + } + + private ByteString encode(ByteString key) { + CodecDataOutput cdo = new CodecDataOutput(); + Codec.BytesCodec.writeBytes(cdo, key.toByteArray()); + ByteString key2 = cdo.toByteString(); + return key2; + } + + private void init() { + long regionId = region.getId(); + Metapb.RegionEpoch regionEpoch = region.getRegionEpoch(); + ImportSstpb.Range range = + ImportSstpb.Range.newBuilder() + .setStart(encode(minKey.toByteString())) + .setEnd(encode(maxKey.toByteString())) + .build(); + + sstMeta = + ImportSstpb.SSTMeta.newBuilder() + .setUuid(uuid) + .setRegionId(regionId) + .setRegionEpoch(regionEpoch) + .setRange(range) + .build(); + + clientList = new ArrayList<>(); + for (Metapb.Peer peer : region.getPeersList()) { + long storeId = peer.getStoreId(); + TiStore store = tiSession.getRegionManager().getStoreById(storeId); + TxnImporterStoreClient importerStoreClient = + tiSession.getTxnImporterRegionStoreClientBuilder().build(store); + clientList.add(importerStoreClient); + + if (region.getLeader().getStoreId() == storeId) { + clientLeader = importerStoreClient; + } + } + } + + private void startTxnWrite() { + for (TxnImporterStoreClient client : clientList) { + client.startTxnWrite(); + } + } + + private void txnWriteMeta() { + ImportSstpb.WriteRequest request = + ImportSstpb.WriteRequest.newBuilder().setMeta(sstMeta).build(); + for (TxnImporterStoreClient client : clientList) { + client.txnWriteBatch(request); + } + } + + private void txnWriteBatch(List pairs) { + ImportSstpb.WriteBatch batch = + ImportSstpb.WriteBatch.newBuilder().addAllPairs(pairs).setCommitTs(10).build(); + + ImportSstpb.WriteRequest request = + ImportSstpb.WriteRequest.newBuilder().setMeta(sstMeta).setBatch(batch).build(); + for (TxnImporterStoreClient client : clientList) { + client.txnWriteBatch(request); + } + } + + private void finishTxnWrite() { + for (TxnImporterStoreClient client : clientList) { + client.finishTxnWrite(); + } + } + + private void ingest() throws GrpcException { + List workingClients = new ArrayList<>(clientList); + while (!workingClients.isEmpty()) { + Iterator itor = workingClients.iterator(); + while (itor.hasNext()) { + TxnImporterStoreClient client = itor.next(); + if (client.isTxnWriteResponseReceived()) { + itor.remove(); + } else if (client.hasTxnWriteResponseError()) { + throw new GrpcException(client.getTxnWriteError()); + } + } + + if (!workingClients.isEmpty()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + clientLeader.multiIngest(region.getLeaderContext()); + } +} diff --git a/src/main/java/org/tikv/common/importer/TxnImporterStoreClient.java b/src/main/java/org/tikv/common/importer/TxnImporterStoreClient.java new file mode 100644 index 00000000000..4a70efa1d12 --- /dev/null +++ b/src/main/java/org/tikv/common/importer/TxnImporterStoreClient.java @@ -0,0 +1,167 @@ +/* + * + * Copyright 2021 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.tikv.common.importer; + +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.AbstractGRPCClient; +import org.tikv.common.PDClient; +import org.tikv.common.TiConfiguration; +import org.tikv.common.exception.GrpcException; +import org.tikv.common.region.RegionManager; +import org.tikv.common.region.TiStore; +import org.tikv.common.util.ChannelFactory; +import org.tikv.kvproto.ImportSSTGrpc; +import org.tikv.kvproto.ImportSstpb; +import org.tikv.kvproto.Kvrpcpb; + +public class TxnImporterStoreClient + extends AbstractGRPCClient + implements StreamObserver { + + private static final Logger logger = LoggerFactory.getLogger(TxnImporterStoreClient.class); + + protected TxnImporterStoreClient( + TiConfiguration conf, + ChannelFactory channelFactory, + ImportSSTGrpc.ImportSSTBlockingStub blockingStub, + ImportSSTGrpc.ImportSSTStub asyncStub) { + super(conf, channelFactory, blockingStub, asyncStub); + } + + private StreamObserver streamObserverRequest; + private ImportSstpb.WriteResponse txnWriteResponse; + private Throwable txnWriteError; + + public synchronized boolean isTxnWriteResponseReceived() { + return txnWriteResponse != null; + } + + private synchronized ImportSstpb.WriteResponse getTxnWriteResponse() { + return txnWriteResponse; + } + + private synchronized void setTxnWriteResponse(ImportSstpb.WriteResponse txnWriteResponse) { + this.txnWriteResponse = txnWriteResponse; + } + + public synchronized boolean hasTxnWriteResponseError() { + return this.txnWriteResponse != null; + } + + public synchronized Throwable getTxnWriteError() { + return this.txnWriteError; + } + + private synchronized void setTxnWriteError(Throwable t) { + this.txnWriteError = t; + } + + @Override + public void onNext(ImportSstpb.WriteResponse writeResponse) { + setTxnWriteResponse(writeResponse); + } + + @Override + public void onError(Throwable throwable) { + setTxnWriteError(throwable); + logger.error("Error during txn write!", throwable); + } + + @Override + public void onCompleted() { + // do nothing + } + + @Override + public void close() throws Exception {} + + public void startTxnWrite() { + streamObserverRequest = getAsyncStub().write(this); + } + + public void txnWriteBatch(ImportSstpb.WriteRequest request) { + streamObserverRequest.onNext(request); + } + + public void finishTxnWrite() { + streamObserverRequest.onCompleted(); + } + + public void multiIngest(Kvrpcpb.Context ctx) { + List metasList = getTxnWriteResponse().getMetasList(); + logger.info(metasList.get(0).getCfName()); + + ImportSstpb.MultiIngestRequest request = + ImportSstpb.MultiIngestRequest.newBuilder().setContext(ctx).addAllSsts(metasList).build(); + + ImportSstpb.IngestResponse response = getBlockingStub().multiIngest(request); + if (response.hasError()) { + throw new GrpcException("" + response.getError()); + } + } + + public static class TxnImporterStoreClientBuilder { + private final TiConfiguration conf; + private final ChannelFactory channelFactory; + private final RegionManager regionManager; + private final PDClient pdClient; + + public TxnImporterStoreClientBuilder( + TiConfiguration conf, + ChannelFactory channelFactory, + RegionManager regionManager, + PDClient pdClient) { + Objects.requireNonNull(conf, "conf is null"); + Objects.requireNonNull(channelFactory, "channelFactory is null"); + Objects.requireNonNull(regionManager, "regionManager is null"); + this.conf = conf; + this.channelFactory = channelFactory; + this.regionManager = regionManager; + this.pdClient = pdClient; + } + + public synchronized TxnImporterStoreClient build(TiStore store) throws GrpcException { + Objects.requireNonNull(store, "store is null"); + + String addressStr = store.getStore().getAddress(); + logger.debug(String.format("Create region store client on address %s", addressStr)); + + ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); + ImportSSTGrpc.ImportSSTBlockingStub blockingStub = ImportSSTGrpc.newBlockingStub(channel); + ImportSSTGrpc.ImportSSTStub asyncStub = ImportSSTGrpc.newStub(channel); + + return new TxnImporterStoreClient(conf, channelFactory, blockingStub, asyncStub); + } + } + + @Override + protected ImportSSTGrpc.ImportSSTBlockingStub getBlockingStub() { + return blockingStub.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); + } + + @Override + protected ImportSSTGrpc.ImportSSTStub getAsyncStub() { + return asyncStub.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); + } +} diff --git a/src/main/java/org/tikv/txn/KVClient.java b/src/main/java/org/tikv/txn/KVClient.java index 28b29f4ee17..817c2297094 100644 --- a/src/main/java/org/tikv/txn/KVClient.java +++ b/src/main/java/org/tikv/txn/KVClient.java @@ -23,22 +23,25 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.TiConfiguration; +import org.tikv.common.TiSession; import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.TiKVException; +import org.tikv.common.importer.SwitchTiKVModeClient; +import org.tikv.common.importer.TxnImporterClient; +import org.tikv.common.key.Key; import org.tikv.common.operation.iterator.ConcreteScanIterator; import org.tikv.common.region.RegionStoreClient; import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder; import org.tikv.common.region.TiRegion; -import org.tikv.common.util.BackOffFunction; -import org.tikv.common.util.BackOffer; -import org.tikv.common.util.Batch; -import org.tikv.common.util.ConcreteBackOffer; +import org.tikv.common.util.*; import org.tikv.kvproto.Kvrpcpb; public class KVClient implements AutoCloseable { + private final TiSession tiSession; private static final Logger logger = LoggerFactory.getLogger(KVClient.class); private static final int MAX_BATCH_LIMIT = 1024; private static final int BATCH_GET_SIZE = 16 * 1024; @@ -46,9 +49,10 @@ public class KVClient implements AutoCloseable { private final TiConfiguration conf; private final ExecutorService executorService; - public KVClient(TiConfiguration conf, RegionStoreClientBuilder clientBuilder) { + public KVClient(TiConfiguration conf, RegionStoreClientBuilder clientBuilder, TiSession session) { Objects.requireNonNull(conf, "conf is null"); Objects.requireNonNull(clientBuilder, "clientBuilder is null"); + this.tiSession = session; this.conf = conf; this.clientBuilder = clientBuilder; executorService = @@ -131,6 +135,64 @@ public List scan(ByteString startKey, long version) throws GrpcE return scan(startKey, version, Integer.MAX_VALUE); } + public synchronized void ingest(List> list) throws GrpcException { + if (list.isEmpty()) { + return; + } + + Key min = Key.MAX; + Key max = Key.MIN; + Map map = new HashMap<>(list.size()); + + for (Pair pair : list) { + map.put(pair.first, pair.second); + Key key = Key.toRawKey(pair.first.toByteArray()); + if (key.compareTo(min) < 0) { + min = key; + } + if (key.compareTo(max) > 0) { + max = key; + } + } + + SwitchTiKVModeClient switchTiKVModeClient = tiSession.getSwitchTiKVModeClient(); + + try { + // switch to normal mode + switchTiKVModeClient.switchTiKVToNormalMode(); + + // region split + List splitKeys = new ArrayList<>(2); + splitKeys.add(min.getBytes()); + splitKeys.add(max.next().getBytes()); + + tiSession.splitRegionAndScatter(splitKeys); + tiSession.getRegionManager().invalidateAll(); + + // switch to import mode + switchTiKVModeClient.keepTiKVToImportMode(); + + // group keys by region + List keyList = list.stream().map(pair -> pair.first).collect(Collectors.toList()); + Map> groupKeys = + groupKeysByRegion( + clientBuilder.getRegionManager(), keyList, ConcreteBackOffer.newRawKVBackOff()); + + // ingest for each region + for (Map.Entry> entry : groupKeys.entrySet()) { + TiRegion region = entry.getKey(); + List keys = entry.getValue(); + List> kvs = + keys.stream().map(k -> Pair.create(k, map.get(k))).collect(Collectors.toList()); + doIngest(region, kvs); + } + } finally { + // swith tikv to normal mode + switchTiKVModeClient.stopKeepTiKVToImportMode(); + switchTiKVModeClient.switchTiKVToNormalMode(); + } + } + private List doSendBatchGet( BackOffer backOffer, List keys, long version) { ExecutorCompletionService> completionService = @@ -202,4 +264,18 @@ private Iterator scanIterator( int limit) { return new ConcreteScanIterator(conf, builder, startKey, version, limit); } + + private void doIngest(TiRegion region, List> sortedList) + throws GrpcException { + if (sortedList.isEmpty()) { + return; + } + + ByteString uuid = ByteString.copyFrom(genUUID()); + Key minKey = Key.toRawKey(sortedList.get(0).first); + Key maxKey = Key.toRawKey(sortedList.get(sortedList.size() - 1).first); + TxnImporterClient txnImporterClient = + new TxnImporterClient(tiSession, uuid, minKey, maxKey, region); + txnImporterClient.txnWrite(sortedList.iterator()); + } } diff --git a/src/test/java/org/tikv/common/importer/TxnKVIngestTest.java b/src/test/java/org/tikv/common/importer/TxnKVIngestTest.java new file mode 100644 index 00000000000..8a558aca787 --- /dev/null +++ b/src/test/java/org/tikv/common/importer/TxnKVIngestTest.java @@ -0,0 +1,69 @@ +package org.tikv.common.importer; + +import static org.junit.Assert.assertEquals; + +import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.List; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.tikv.common.TiConfiguration; +import org.tikv.common.TiSession; +import org.tikv.common.key.Key; +import org.tikv.common.util.Pair; +import org.tikv.txn.KVClient; +import org.tikv.util.TestUtils; + +public class TxnKVIngestTest { + private TiSession session; + + private static final int KEY_NUMBER = 16; + private static final String KEY_PREFIX = "prefix_txn_ingest_test_"; + private static final int KEY_LENGTH = KEY_PREFIX.length() + 10; + private static final int VALUE_LENGTH = 16; + + @Before + public void setup() { + TiConfiguration conf = TiConfiguration.createDefault(); + session = TiSession.create(conf); + } + + @After + public void tearDown() throws Exception { + if (session != null) { + session.close(); + } + } + + @Test + public void txnIngestTest() throws InterruptedException { + KVClient client = session.createKVClient(); + + // gen test data + List> sortedList = new ArrayList<>(); + for (int i = 0; i < KEY_NUMBER; i++) { + byte[] key = TestUtils.genRandomKey(KEY_PREFIX, KEY_LENGTH); + byte[] value = TestUtils.genRandomValue(VALUE_LENGTH); + sortedList.add(Pair.create(ByteString.copyFrom(key), ByteString.copyFrom(value))); + } + sortedList.sort( + (o1, o2) -> { + Key k1 = Key.toRawKey(o1.first.toByteArray()); + Key k2 = Key.toRawKey(o2.first.toByteArray()); + return k1.compareTo(k2); + }); + + // ingest + client.ingest(sortedList); + + // assert + long version = session.getTimestamp().getVersion(); + for (Pair pair : sortedList) { + ByteString key = pair.first; + ByteString v = client.get(key, version); + System.out.println("get " + key.toStringUtf8() + "\t" + v.toStringUtf8()); + assertEquals(v, pair.second); + } + } +} From 7fa706a609c01ddf38223ba9fc5697cc828329f6 Mon Sep 17 00:00:00 2001 From: abingcbc Date: Wed, 18 Aug 2021 21:25:23 +0800 Subject: [PATCH 2/2] refactor: merge TxnImporterClient into Importerclient Signed-off-by: abingcbc --- src/main/java/org/tikv/common/TiSession.java | 36 ++-- .../tikv/common/importer/ImporterClient.java | 104 ++++++---- .../common/importer/ImporterStoreClient.java | 82 ++++---- .../common/importer/TxnImporterClient.java | 186 ------------------ .../importer/TxnImporterStoreClient.java | 167 ---------------- src/main/java/org/tikv/raw/RawKVClient.java | 2 +- src/main/java/org/tikv/txn/KVClient.java | 7 +- .../tikv/common/importer/TxnKVIngestTest.java | 2 +- 8 files changed, 135 insertions(+), 451 deletions(-) delete mode 100644 src/main/java/org/tikv/common/importer/TxnImporterClient.java delete mode 100644 src/main/java/org/tikv/common/importer/TxnImporterStoreClient.java diff --git a/src/main/java/org/tikv/common/TiSession.java b/src/main/java/org/tikv/common/TiSession.java index 729e8821c7a..12a6cefabd6 100644 --- a/src/main/java/org/tikv/common/TiSession.java +++ b/src/main/java/org/tikv/common/TiSession.java @@ -33,7 +33,6 @@ import org.tikv.common.exception.TiKVException; import org.tikv.common.importer.ImporterStoreClient; import org.tikv.common.importer.SwitchTiKVModeClient; -import org.tikv.common.importer.TxnImporterStoreClient; import org.tikv.common.key.Key; import org.tikv.common.meta.TiTimestamp; import org.tikv.common.region.RegionManager; @@ -42,6 +41,7 @@ import org.tikv.common.region.TiRegion; import org.tikv.common.region.TiStore; import org.tikv.common.util.*; +import org.tikv.kvproto.ImportSstpb; import org.tikv.kvproto.Metapb; import org.tikv.raw.RawKVClient; import org.tikv.txn.KVClient; @@ -71,8 +71,6 @@ public class TiSession implements AutoCloseable { private volatile boolean enableGrpcForward; private volatile RegionStoreClient.RegionStoreClientBuilder clientBuilder; private volatile ImporterStoreClient.ImporterStoreClientBuilder importerClientBuilder; - private volatile TxnImporterStoreClient.TxnImporterStoreClientBuilder - txnImporterStoreClientBuilder; private volatile boolean isClosed = false; private volatile SwitchTiKVModeClient switchTiKVModeClient; private MetricsServer metricsServer; @@ -155,9 +153,17 @@ public ImporterStoreClient.ImporterStoreClientBuilder getImporterRegionStoreClie if (res == null) { synchronized (this) { if (importerClientBuilder == null) { - importerClientBuilder = - new ImporterStoreClient.ImporterStoreClientBuilder( - conf, this.channelFactory, this.getRegionManager(), this.getPDClient()); + if (conf.isTxnKVMode()) { + importerClientBuilder = + new ImporterStoreClient.ImporterStoreClientBuilder< + ImportSstpb.WriteRequest, ImportSstpb.WriteRequest>( + conf, this.channelFactory, this.getRegionManager(), this.getPDClient()); + } else { + importerClientBuilder = + new ImporterStoreClient.ImporterStoreClientBuilder< + ImportSstpb.RawWriteRequest, ImportSstpb.RawWriteResponse>( + conf, this.channelFactory, this.getRegionManager(), this.getPDClient()); + } } res = importerClientBuilder; } @@ -165,24 +171,6 @@ public ImporterStoreClient.ImporterStoreClientBuilder getImporterRegionStoreClie return res; } - public TxnImporterStoreClient.TxnImporterStoreClientBuilder - getTxnImporterRegionStoreClientBuilder() { - checkIsClosed(); - - TxnImporterStoreClient.TxnImporterStoreClientBuilder res = txnImporterStoreClientBuilder; - if (res == null) { - synchronized (this) { - if (txnImporterStoreClientBuilder == null) { - txnImporterStoreClientBuilder = - new TxnImporterStoreClient.TxnImporterStoreClientBuilder( - conf, this.channelFactory, this.getRegionManager(), this.getPDClient()); - } - res = txnImporterStoreClientBuilder; - } - } - return res; - } - public TiConfiguration getConf() { return conf; } diff --git a/src/main/java/org/tikv/common/importer/ImporterClient.java b/src/main/java/org/tikv/common/importer/ImporterClient.java index 99494b5205f..d2822a3dac2 100644 --- a/src/main/java/org/tikv/common/importer/ImporterClient.java +++ b/src/main/java/org/tikv/common/importer/ImporterClient.java @@ -23,6 +23,8 @@ import java.util.List; import org.tikv.common.TiConfiguration; import org.tikv.common.TiSession; +import org.tikv.common.codec.Codec; +import org.tikv.common.codec.CodecDataOutput; import org.tikv.common.exception.GrpcException; import org.tikv.common.key.Key; import org.tikv.common.region.TiRegion; @@ -57,14 +59,11 @@ public ImporterClient( } /** - * write KV pairs to RawKV using KVStream interface + * write KV pairs to RawKV/Txn using KVStream interface * * @param iterator */ - public void rawWrite(Iterator> iterator) throws GrpcException { - if (!tiConf.isRawKVMode()) { - throw new IllegalArgumentException("KVMode is not RAW in TiConfiguration!"); - } + public void write(Iterator> iterator) throws GrpcException { streamOpened = false; @@ -85,15 +84,15 @@ public void rawWrite(Iterator> iterator) throws Grp } if (!streamOpened) { init(); - startRawWrite(); - rawWriteMeta(); + startWrite(); + writeMeta(); streamOpened = true; } - rawWriteBatch(pairs); + writeBatch(pairs); } if (streamOpened) { - finishRawWrite(); + finishWrite(); ingest(); } } @@ -102,10 +101,15 @@ private void init() { long regionId = region.getId(); Metapb.RegionEpoch regionEpoch = region.getRegionEpoch(); ImportSstpb.Range range = - ImportSstpb.Range.newBuilder() - .setStart(minKey.toByteString()) - .setEnd(maxKey.toByteString()) - .build(); + tiConf.isTxnKVMode() + ? ImportSstpb.Range.newBuilder() + .setStart(encode(minKey.toByteString())) + .setEnd(encode(maxKey.toByteString())) + .build() + : ImportSstpb.Range.newBuilder() + .setStart(minKey.toByteString()) + .setEnd(maxKey.toByteString()) + .build(); sstMeta = ImportSstpb.SSTMeta.newBuilder() @@ -129,39 +133,69 @@ private void init() { } } - private void startRawWrite() { + private ByteString encode(ByteString key) { + CodecDataOutput cdo = new CodecDataOutput(); + Codec.BytesCodec.writeBytes(cdo, key.toByteArray()); + return cdo.toByteString(); + } + + private void startWrite() { for (ImporterStoreClient client : clientList) { - client.startRawWrite(); + client.startWrite(); } } - private void rawWriteMeta() { - ImportSstpb.RawWriteRequest request = - ImportSstpb.RawWriteRequest.newBuilder().setMeta(sstMeta).build(); - for (ImporterStoreClient client : clientList) { - client.rawWriteBatch(request); + private void writeMeta() { + if (tiConf.isTxnKVMode()) { + ImportSstpb.WriteRequest request = + ImportSstpb.WriteRequest.newBuilder().setMeta(sstMeta).build(); + for (ImporterStoreClient client : clientList) { + client.writeBatch(request); + } + } else { + ImportSstpb.RawWriteRequest request = + ImportSstpb.RawWriteRequest.newBuilder().setMeta(sstMeta).build(); + for (ImporterStoreClient client : clientList) { + client.writeBatch(request); + } } } - private void rawWriteBatch(List pairs) { - ImportSstpb.RawWriteBatch batch; + private void writeBatch(List pairs) { + if (tiConf.isTxnKVMode()) { + ImportSstpb.WriteBatch batch; - if (ttl == null || ttl <= 0) { - batch = ImportSstpb.RawWriteBatch.newBuilder().addAllPairs(pairs).build(); + batch = + ImportSstpb.WriteBatch.newBuilder() + .addAllPairs(pairs) + .setCommitTs(tiSession.getTimestamp().getVersion()) + .build(); + + ImportSstpb.WriteRequest request = + ImportSstpb.WriteRequest.newBuilder().setBatch(batch).build(); + for (ImporterStoreClient client : clientList) { + client.writeBatch(request); + } } else { - batch = ImportSstpb.RawWriteBatch.newBuilder().addAllPairs(pairs).setTtl(ttl).build(); - } + ImportSstpb.RawWriteBatch batch; - ImportSstpb.RawWriteRequest request = - ImportSstpb.RawWriteRequest.newBuilder().setBatch(batch).build(); - for (ImporterStoreClient client : clientList) { - client.rawWriteBatch(request); + if (ttl == null || ttl <= 0) { + batch = ImportSstpb.RawWriteBatch.newBuilder().addAllPairs(pairs).build(); + } else { + batch = ImportSstpb.RawWriteBatch.newBuilder().addAllPairs(pairs).setTtl(ttl).build(); + } + + ImportSstpb.RawWriteRequest request = + ImportSstpb.RawWriteRequest.newBuilder().setBatch(batch).build(); + for (ImporterStoreClient client : clientList) { + client.writeBatch(request); + } } } - private void finishRawWrite() { + private void finishWrite() { for (ImporterStoreClient client : clientList) { - client.finishRawWrite(); + client.finishWrite(); } } @@ -171,10 +205,10 @@ private void ingest() throws GrpcException { Iterator itor = workingClients.iterator(); while (itor.hasNext()) { ImporterStoreClient client = itor.next(); - if (client.isRawWriteResponseReceived()) { + if (client.isWriteResponseReceived()) { itor.remove(); - } else if (client.hasRawWriteResponseError()) { - throw new GrpcException(client.getRawWriteError()); + } else if (client.hasWriteResponseError()) { + throw new GrpcException(client.getWriteError()); } } diff --git a/src/main/java/org/tikv/common/importer/ImporterStoreClient.java b/src/main/java/org/tikv/common/importer/ImporterStoreClient.java index 96534b778ab..006bf517334 100644 --- a/src/main/java/org/tikv/common/importer/ImporterStoreClient.java +++ b/src/main/java/org/tikv/common/importer/ImporterStoreClient.java @@ -39,9 +39,9 @@ import org.tikv.kvproto.ImportSstpb; import org.tikv.kvproto.Kvrpcpb; -public class ImporterStoreClient +public class ImporterStoreClient extends AbstractGRPCClient - implements StreamObserver { + implements StreamObserver { private static final Logger logger = LoggerFactory.getLogger(ImporterStoreClient.class); @@ -53,43 +53,43 @@ protected ImporterStoreClient( super(conf, channelFactory, blockingStub, asyncStub); } - private StreamObserver streamObserverRequest; - private ImportSstpb.RawWriteResponse rawWriteResponse; - private Throwable rawWriteError; + private StreamObserver streamObserverRequest; + private ResponseClass writeResponse; + private Throwable writeError; - public synchronized boolean isRawWriteResponseReceived() { - return rawWriteResponse != null; + public synchronized boolean isWriteResponseReceived() { + return writeResponse != null; } - private synchronized ImportSstpb.RawWriteResponse getRawWriteResponse() { - return rawWriteResponse; + private synchronized ResponseClass getWriteResponse() { + return writeResponse; } - private synchronized void setRawWriteResponse(ImportSstpb.RawWriteResponse rawWriteResponse) { - this.rawWriteResponse = rawWriteResponse; + private synchronized void setWriteResponse(ResponseClass writeResponse) { + this.writeResponse = writeResponse; } - public synchronized boolean hasRawWriteResponseError() { - return this.rawWriteError != null; + public synchronized boolean hasWriteResponseError() { + return this.writeError != null; } - public synchronized Throwable getRawWriteError() { - return this.rawWriteError; + public synchronized Throwable getWriteError() { + return this.writeError; } - private synchronized void setRawWriteError(Throwable t) { - this.rawWriteError = t; + private synchronized void setWriteError(Throwable t) { + this.writeError = t; } @Override - public void onNext(ImportSstpb.RawWriteResponse value) { - setRawWriteResponse(value); + public void onNext(ResponseClass response) { + setWriteResponse(response); } @Override public void onError(Throwable t) { - setRawWriteError(t); - logger.error("Error during raw write!", t); + setWriteError(t); + logger.error("Error during write!", t); } @Override @@ -98,36 +98,51 @@ public void onCompleted() { } /** - * Ingest KV pairs to RawKV using gRPC streaming mode. This API should be called on both leader - * and followers. + * Ingest KV pairs to RawKV/Txn using gRPC streaming mode. This API should be called on both + * leader and followers. * * @return */ - public void startRawWrite() { - streamObserverRequest = getAsyncStub().rawWrite(this); + public void startWrite() { + if (conf.isRawKVMode()) { + streamObserverRequest = + (StreamObserver) + getAsyncStub().rawWrite((StreamObserver) this); + } else { + streamObserverRequest = + (StreamObserver) + getAsyncStub().write((StreamObserver) this); + } } /** - * This API should be called after `startRawWrite`. + * This API should be called after `startWrite`. * * @param request */ - public void rawWriteBatch(ImportSstpb.RawWriteRequest request) { + public void writeBatch(RequestClass request) { streamObserverRequest.onNext(request); } - /** This API should be called after `rawWriteBatch`. */ - public void finishRawWrite() { + /** This API should be called after `writeBatch`. */ + public void finishWrite() { streamObserverRequest.onCompleted(); } /** - * This API should be called after `finishRawWrite`. This API should be called on leader only. + * This API should be called after `finishWrite`. This API should be called on leader only. * * @param ctx */ public void multiIngest(Kvrpcpb.Context ctx) { - List metasList = getRawWriteResponse().getMetasList(); + List metasList; + if (writeResponse instanceof ImportSstpb.RawWriteResponse) { + metasList = ((ImportSstpb.RawWriteResponse) getWriteResponse()).getMetasList(); + } else if (writeResponse instanceof ImportSstpb.WriteResponse) { + metasList = ((ImportSstpb.WriteResponse) getWriteResponse()).getMetasList(); + } else { + throw new IllegalArgumentException("Wrong response type"); + } ImportSstpb.MultiIngestRequest request = ImportSstpb.MultiIngestRequest.newBuilder().setContext(ctx).addAllSsts(metasList).build(); @@ -163,7 +178,7 @@ protected ImportSSTGrpc.ImportSSTStub getAsyncStub() { @Override public void close() throws Exception {} - public static class ImporterStoreClientBuilder { + public static class ImporterStoreClientBuilder { private final TiConfiguration conf; private final ChannelFactory channelFactory; private final RegionManager regionManager; @@ -193,7 +208,8 @@ public synchronized ImporterStoreClient build(TiStore store) throws GrpcExceptio ImportSSTGrpc.ImportSSTBlockingStub blockingStub = ImportSSTGrpc.newBlockingStub(channel); ImportSSTGrpc.ImportSSTStub asyncStub = ImportSSTGrpc.newStub(channel); - return new ImporterStoreClient(conf, channelFactory, blockingStub, asyncStub); + return new ImporterStoreClient( + conf, channelFactory, blockingStub, asyncStub); } } } diff --git a/src/main/java/org/tikv/common/importer/TxnImporterClient.java b/src/main/java/org/tikv/common/importer/TxnImporterClient.java deleted file mode 100644 index eef09aad208..00000000000 --- a/src/main/java/org/tikv/common/importer/TxnImporterClient.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * - * Copyright 2021 PingCAP, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.tikv.common.importer; - -import com.google.protobuf.ByteString; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import org.tikv.common.TiConfiguration; -import org.tikv.common.TiSession; -import org.tikv.common.codec.Codec; -import org.tikv.common.codec.CodecDataOutput; -import org.tikv.common.exception.GrpcException; -import org.tikv.common.key.Key; -import org.tikv.common.region.TiRegion; -import org.tikv.common.region.TiStore; -import org.tikv.common.util.Pair; -import org.tikv.kvproto.ImportSstpb; -import org.tikv.kvproto.Metapb; - -public class TxnImporterClient { - private TiConfiguration tiConf; - private TiSession tiSession; - private ByteString uuid; - private Key minKey; - private Key maxKey; - private TiRegion region; - - private boolean streamOpened = false; - private ImportSstpb.SSTMeta sstMeta; - private List clientList; - private TxnImporterStoreClient clientLeader; - - public TxnImporterClient( - TiSession tiSession, ByteString uuid, Key minKey, Key maxKey, TiRegion region) { - this.uuid = uuid; - this.tiConf = tiSession.getConf(); - this.tiSession = tiSession; - this.minKey = minKey; - this.maxKey = maxKey; - this.region = region; - } - - public void txnWrite(Iterator> iterator) throws GrpcException { - - streamOpened = false; - - int maxKVBatchSize = tiConf.getImporterMaxKVBatchSize(); - int maxKVBatchBytes = tiConf.getImporterMaxKVBatchBytes(); - int totalBytes = 0; - while (iterator.hasNext()) { - ArrayList pairs = new ArrayList<>(maxKVBatchSize); - for (int i = 0; i < maxKVBatchSize; i++) { - if (iterator.hasNext()) { - Pair pair = iterator.next(); - pairs.add(ImportSstpb.Pair.newBuilder().setKey(pair.first).setValue(pair.second).build()); - totalBytes += (pair.first.size() + pair.second.size()); - } - if (totalBytes > maxKVBatchBytes) { - break; - } - } - if (!streamOpened) { - init(); - startTxnWrite(); - txnWriteMeta(); - streamOpened = true; - } - txnWriteBatch(pairs); - } - - if (streamOpened) { - finishTxnWrite(); - ingest(); - } - } - - private ByteString encode(ByteString key) { - CodecDataOutput cdo = new CodecDataOutput(); - Codec.BytesCodec.writeBytes(cdo, key.toByteArray()); - ByteString key2 = cdo.toByteString(); - return key2; - } - - private void init() { - long regionId = region.getId(); - Metapb.RegionEpoch regionEpoch = region.getRegionEpoch(); - ImportSstpb.Range range = - ImportSstpb.Range.newBuilder() - .setStart(encode(minKey.toByteString())) - .setEnd(encode(maxKey.toByteString())) - .build(); - - sstMeta = - ImportSstpb.SSTMeta.newBuilder() - .setUuid(uuid) - .setRegionId(regionId) - .setRegionEpoch(regionEpoch) - .setRange(range) - .build(); - - clientList = new ArrayList<>(); - for (Metapb.Peer peer : region.getPeersList()) { - long storeId = peer.getStoreId(); - TiStore store = tiSession.getRegionManager().getStoreById(storeId); - TxnImporterStoreClient importerStoreClient = - tiSession.getTxnImporterRegionStoreClientBuilder().build(store); - clientList.add(importerStoreClient); - - if (region.getLeader().getStoreId() == storeId) { - clientLeader = importerStoreClient; - } - } - } - - private void startTxnWrite() { - for (TxnImporterStoreClient client : clientList) { - client.startTxnWrite(); - } - } - - private void txnWriteMeta() { - ImportSstpb.WriteRequest request = - ImportSstpb.WriteRequest.newBuilder().setMeta(sstMeta).build(); - for (TxnImporterStoreClient client : clientList) { - client.txnWriteBatch(request); - } - } - - private void txnWriteBatch(List pairs) { - ImportSstpb.WriteBatch batch = - ImportSstpb.WriteBatch.newBuilder().addAllPairs(pairs).setCommitTs(10).build(); - - ImportSstpb.WriteRequest request = - ImportSstpb.WriteRequest.newBuilder().setMeta(sstMeta).setBatch(batch).build(); - for (TxnImporterStoreClient client : clientList) { - client.txnWriteBatch(request); - } - } - - private void finishTxnWrite() { - for (TxnImporterStoreClient client : clientList) { - client.finishTxnWrite(); - } - } - - private void ingest() throws GrpcException { - List workingClients = new ArrayList<>(clientList); - while (!workingClients.isEmpty()) { - Iterator itor = workingClients.iterator(); - while (itor.hasNext()) { - TxnImporterStoreClient client = itor.next(); - if (client.isTxnWriteResponseReceived()) { - itor.remove(); - } else if (client.hasTxnWriteResponseError()) { - throw new GrpcException(client.getTxnWriteError()); - } - } - - if (!workingClients.isEmpty()) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - - clientLeader.multiIngest(region.getLeaderContext()); - } -} diff --git a/src/main/java/org/tikv/common/importer/TxnImporterStoreClient.java b/src/main/java/org/tikv/common/importer/TxnImporterStoreClient.java deleted file mode 100644 index 4a70efa1d12..00000000000 --- a/src/main/java/org/tikv/common/importer/TxnImporterStoreClient.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * - * Copyright 2021 PingCAP, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.tikv.common.importer; - -import io.grpc.ManagedChannel; -import io.grpc.stub.StreamObserver; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.tikv.common.AbstractGRPCClient; -import org.tikv.common.PDClient; -import org.tikv.common.TiConfiguration; -import org.tikv.common.exception.GrpcException; -import org.tikv.common.region.RegionManager; -import org.tikv.common.region.TiStore; -import org.tikv.common.util.ChannelFactory; -import org.tikv.kvproto.ImportSSTGrpc; -import org.tikv.kvproto.ImportSstpb; -import org.tikv.kvproto.Kvrpcpb; - -public class TxnImporterStoreClient - extends AbstractGRPCClient - implements StreamObserver { - - private static final Logger logger = LoggerFactory.getLogger(TxnImporterStoreClient.class); - - protected TxnImporterStoreClient( - TiConfiguration conf, - ChannelFactory channelFactory, - ImportSSTGrpc.ImportSSTBlockingStub blockingStub, - ImportSSTGrpc.ImportSSTStub asyncStub) { - super(conf, channelFactory, blockingStub, asyncStub); - } - - private StreamObserver streamObserverRequest; - private ImportSstpb.WriteResponse txnWriteResponse; - private Throwable txnWriteError; - - public synchronized boolean isTxnWriteResponseReceived() { - return txnWriteResponse != null; - } - - private synchronized ImportSstpb.WriteResponse getTxnWriteResponse() { - return txnWriteResponse; - } - - private synchronized void setTxnWriteResponse(ImportSstpb.WriteResponse txnWriteResponse) { - this.txnWriteResponse = txnWriteResponse; - } - - public synchronized boolean hasTxnWriteResponseError() { - return this.txnWriteResponse != null; - } - - public synchronized Throwable getTxnWriteError() { - return this.txnWriteError; - } - - private synchronized void setTxnWriteError(Throwable t) { - this.txnWriteError = t; - } - - @Override - public void onNext(ImportSstpb.WriteResponse writeResponse) { - setTxnWriteResponse(writeResponse); - } - - @Override - public void onError(Throwable throwable) { - setTxnWriteError(throwable); - logger.error("Error during txn write!", throwable); - } - - @Override - public void onCompleted() { - // do nothing - } - - @Override - public void close() throws Exception {} - - public void startTxnWrite() { - streamObserverRequest = getAsyncStub().write(this); - } - - public void txnWriteBatch(ImportSstpb.WriteRequest request) { - streamObserverRequest.onNext(request); - } - - public void finishTxnWrite() { - streamObserverRequest.onCompleted(); - } - - public void multiIngest(Kvrpcpb.Context ctx) { - List metasList = getTxnWriteResponse().getMetasList(); - logger.info(metasList.get(0).getCfName()); - - ImportSstpb.MultiIngestRequest request = - ImportSstpb.MultiIngestRequest.newBuilder().setContext(ctx).addAllSsts(metasList).build(); - - ImportSstpb.IngestResponse response = getBlockingStub().multiIngest(request); - if (response.hasError()) { - throw new GrpcException("" + response.getError()); - } - } - - public static class TxnImporterStoreClientBuilder { - private final TiConfiguration conf; - private final ChannelFactory channelFactory; - private final RegionManager regionManager; - private final PDClient pdClient; - - public TxnImporterStoreClientBuilder( - TiConfiguration conf, - ChannelFactory channelFactory, - RegionManager regionManager, - PDClient pdClient) { - Objects.requireNonNull(conf, "conf is null"); - Objects.requireNonNull(channelFactory, "channelFactory is null"); - Objects.requireNonNull(regionManager, "regionManager is null"); - this.conf = conf; - this.channelFactory = channelFactory; - this.regionManager = regionManager; - this.pdClient = pdClient; - } - - public synchronized TxnImporterStoreClient build(TiStore store) throws GrpcException { - Objects.requireNonNull(store, "store is null"); - - String addressStr = store.getStore().getAddress(); - logger.debug(String.format("Create region store client on address %s", addressStr)); - - ManagedChannel channel = channelFactory.getChannel(addressStr, pdClient.getHostMapping()); - ImportSSTGrpc.ImportSSTBlockingStub blockingStub = ImportSSTGrpc.newBlockingStub(channel); - ImportSSTGrpc.ImportSSTStub asyncStub = ImportSSTGrpc.newStub(channel); - - return new TxnImporterStoreClient(conf, channelFactory, blockingStub, asyncStub); - } - } - - @Override - protected ImportSSTGrpc.ImportSSTBlockingStub getBlockingStub() { - return blockingStub.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); - } - - @Override - protected ImportSSTGrpc.ImportSSTStub getAsyncStub() { - return asyncStub.withDeadlineAfter(getTimeout(), TimeUnit.MILLISECONDS); - } -} diff --git a/src/main/java/org/tikv/raw/RawKVClient.java b/src/main/java/org/tikv/raw/RawKVClient.java index c6eb45ed782..af502d86723 100644 --- a/src/main/java/org/tikv/raw/RawKVClient.java +++ b/src/main/java/org/tikv/raw/RawKVClient.java @@ -727,7 +727,7 @@ private void doIngest(TiRegion region, List> sorted Key maxKey = Key.toRawKey(sortedList.get(sortedList.size() - 1).first); ImporterClient importerClient = new ImporterClient(tiSession, uuid, minKey, maxKey, region, ttl); - importerClient.rawWrite(sortedList.iterator()); + importerClient.write(sortedList.iterator()); } private void doSendBatchPut(BackOffer backOffer, Map kvPairs, long ttl) { diff --git a/src/main/java/org/tikv/txn/KVClient.java b/src/main/java/org/tikv/txn/KVClient.java index 817c2297094..6b7b63e4dde 100644 --- a/src/main/java/org/tikv/txn/KVClient.java +++ b/src/main/java/org/tikv/txn/KVClient.java @@ -30,8 +30,8 @@ import org.tikv.common.TiSession; import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.TiKVException; +import org.tikv.common.importer.ImporterClient; import org.tikv.common.importer.SwitchTiKVModeClient; -import org.tikv.common.importer.TxnImporterClient; import org.tikv.common.key.Key; import org.tikv.common.operation.iterator.ConcreteScanIterator; import org.tikv.common.region.RegionStoreClient; @@ -274,8 +274,7 @@ private void doIngest(TiRegion region, List> sorted ByteString uuid = ByteString.copyFrom(genUUID()); Key minKey = Key.toRawKey(sortedList.get(0).first); Key maxKey = Key.toRawKey(sortedList.get(sortedList.size() - 1).first); - TxnImporterClient txnImporterClient = - new TxnImporterClient(tiSession, uuid, minKey, maxKey, region); - txnImporterClient.txnWrite(sortedList.iterator()); + ImporterClient importerClient = new ImporterClient(tiSession, uuid, minKey, maxKey, region, 0L); + importerClient.write(sortedList.iterator()); } } diff --git a/src/test/java/org/tikv/common/importer/TxnKVIngestTest.java b/src/test/java/org/tikv/common/importer/TxnKVIngestTest.java index 8a558aca787..fc0935977c7 100644 --- a/src/test/java/org/tikv/common/importer/TxnKVIngestTest.java +++ b/src/test/java/org/tikv/common/importer/TxnKVIngestTest.java @@ -26,6 +26,7 @@ public class TxnKVIngestTest { @Before public void setup() { TiConfiguration conf = TiConfiguration.createDefault(); + conf.setTest(true); session = TiSession.create(conf); } @@ -62,7 +63,6 @@ public void txnIngestTest() throws InterruptedException { for (Pair pair : sortedList) { ByteString key = pair.first; ByteString v = client.get(key, version); - System.out.println("get " + key.toStringUtf8() + "\t" + v.toStringUtf8()); assertEquals(v, pair.second); } }