Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions src/main/java/org/tikv/common/TiSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.*;
import org.tikv.kvproto.ImportSstpb;
import org.tikv.kvproto.Metapb;
import org.tikv.raw.RawKVClient;
import org.tikv.txn.KVClient;
Expand Down Expand Up @@ -119,7 +120,7 @@ public KVClient createKVClient() {

RegionStoreClientBuilder builder =
new RegionStoreClientBuilder(conf, channelFactory, this.getRegionManager(), client);
return new KVClient(conf, builder);
return new KVClient(conf, builder, this);
}

public TxnKVClient createTxnClient() {
Expand Down Expand Up @@ -152,9 +153,17 @@ public ImporterStoreClient.ImporterStoreClientBuilder getImporterRegionStoreClie
if (res == null) {
synchronized (this) {
if (importerClientBuilder == null) {
importerClientBuilder =
new ImporterStoreClient.ImporterStoreClientBuilder(
conf, this.channelFactory, this.getRegionManager(), this.getPDClient());
if (conf.isTxnKVMode()) {
importerClientBuilder =
new ImporterStoreClient.ImporterStoreClientBuilder<
ImportSstpb.WriteRequest, ImportSstpb.WriteRequest>(
conf, this.channelFactory, this.getRegionManager(), this.getPDClient());
} else {
importerClientBuilder =
new ImporterStoreClient.ImporterStoreClientBuilder<
ImportSstpb.RawWriteRequest, ImportSstpb.RawWriteResponse>(
conf, this.channelFactory, this.getRegionManager(), this.getPDClient());
}
}
res = importerClientBuilder;
}
Expand Down
104 changes: 69 additions & 35 deletions src/main/java/org/tikv/common/importer/ImporterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.List;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.codec.Codec;
import org.tikv.common.codec.CodecDataOutput;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.key.Key;
import org.tikv.common.region.TiRegion;
Expand Down Expand Up @@ -57,14 +59,11 @@ public ImporterClient(
}

/**
* write KV pairs to RawKV using KVStream interface
* write KV pairs to RawKV/Txn using KVStream interface
*
* @param iterator
*/
public void rawWrite(Iterator<Pair<ByteString, ByteString>> iterator) throws GrpcException {
if (!tiConf.isRawKVMode()) {
throw new IllegalArgumentException("KVMode is not RAW in TiConfiguration!");
}
public void write(Iterator<Pair<ByteString, ByteString>> iterator) throws GrpcException {

streamOpened = false;

Expand All @@ -85,15 +84,15 @@ public void rawWrite(Iterator<Pair<ByteString, ByteString>> iterator) throws Grp
}
if (!streamOpened) {
init();
startRawWrite();
rawWriteMeta();
startWrite();
writeMeta();
streamOpened = true;
}
rawWriteBatch(pairs);
writeBatch(pairs);
}

if (streamOpened) {
finishRawWrite();
finishWrite();
ingest();
}
}
Expand All @@ -102,10 +101,15 @@ private void init() {
long regionId = region.getId();
Metapb.RegionEpoch regionEpoch = region.getRegionEpoch();
ImportSstpb.Range range =
ImportSstpb.Range.newBuilder()
.setStart(minKey.toByteString())
.setEnd(maxKey.toByteString())
.build();
tiConf.isTxnKVMode()
? ImportSstpb.Range.newBuilder()
.setStart(encode(minKey.toByteString()))
.setEnd(encode(maxKey.toByteString()))
.build()
: ImportSstpb.Range.newBuilder()
.setStart(minKey.toByteString())
.setEnd(maxKey.toByteString())
.build();

sstMeta =
ImportSstpb.SSTMeta.newBuilder()
Expand All @@ -129,39 +133,69 @@ private void init() {
}
}

private void startRawWrite() {
private ByteString encode(ByteString key) {
CodecDataOutput cdo = new CodecDataOutput();
Codec.BytesCodec.writeBytes(cdo, key.toByteArray());
return cdo.toByteString();
}

private void startWrite() {
for (ImporterStoreClient client : clientList) {
client.startRawWrite();
client.startWrite();
}
}

private void rawWriteMeta() {
ImportSstpb.RawWriteRequest request =
ImportSstpb.RawWriteRequest.newBuilder().setMeta(sstMeta).build();
for (ImporterStoreClient client : clientList) {
client.rawWriteBatch(request);
private void writeMeta() {
if (tiConf.isTxnKVMode()) {
ImportSstpb.WriteRequest request =
ImportSstpb.WriteRequest.newBuilder().setMeta(sstMeta).build();
for (ImporterStoreClient client : clientList) {
client.writeBatch(request);
}
} else {
ImportSstpb.RawWriteRequest request =
ImportSstpb.RawWriteRequest.newBuilder().setMeta(sstMeta).build();
for (ImporterStoreClient client : clientList) {
client.writeBatch(request);
}
}
}

private void rawWriteBatch(List<ImportSstpb.Pair> pairs) {
ImportSstpb.RawWriteBatch batch;
private void writeBatch(List<ImportSstpb.Pair> pairs) {
if (tiConf.isTxnKVMode()) {
ImportSstpb.WriteBatch batch;

if (ttl == null || ttl <= 0) {
batch = ImportSstpb.RawWriteBatch.newBuilder().addAllPairs(pairs).build();
batch =
ImportSstpb.WriteBatch.newBuilder()
.addAllPairs(pairs)
.setCommitTs(tiSession.getTimestamp().getVersion())
.build();

ImportSstpb.WriteRequest request =
ImportSstpb.WriteRequest.newBuilder().setBatch(batch).build();
for (ImporterStoreClient client : clientList) {
client.writeBatch(request);
}
} else {
batch = ImportSstpb.RawWriteBatch.newBuilder().addAllPairs(pairs).setTtl(ttl).build();
}
ImportSstpb.RawWriteBatch batch;

ImportSstpb.RawWriteRequest request =
ImportSstpb.RawWriteRequest.newBuilder().setBatch(batch).build();
for (ImporterStoreClient client : clientList) {
client.rawWriteBatch(request);
if (ttl == null || ttl <= 0) {
batch = ImportSstpb.RawWriteBatch.newBuilder().addAllPairs(pairs).build();
} else {
batch = ImportSstpb.RawWriteBatch.newBuilder().addAllPairs(pairs).setTtl(ttl).build();
}

ImportSstpb.RawWriteRequest request =
ImportSstpb.RawWriteRequest.newBuilder().setBatch(batch).build();
for (ImporterStoreClient client : clientList) {
client.writeBatch(request);
}
}
}

private void finishRawWrite() {
private void finishWrite() {
for (ImporterStoreClient client : clientList) {
client.finishRawWrite();
client.finishWrite();
}
}

Expand All @@ -171,10 +205,10 @@ private void ingest() throws GrpcException {
Iterator<ImporterStoreClient> itor = workingClients.iterator();
while (itor.hasNext()) {
ImporterStoreClient client = itor.next();
if (client.isRawWriteResponseReceived()) {
if (client.isWriteResponseReceived()) {
itor.remove();
} else if (client.hasRawWriteResponseError()) {
throw new GrpcException(client.getRawWriteError());
} else if (client.hasWriteResponseError()) {
throw new GrpcException(client.getWriteError());
}
}

Expand Down
82 changes: 49 additions & 33 deletions src/main/java/org/tikv/common/importer/ImporterStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
import org.tikv.kvproto.ImportSstpb;
import org.tikv.kvproto.Kvrpcpb;

public class ImporterStoreClient
public class ImporterStoreClient<RequestClass, ResponseClass>
extends AbstractGRPCClient<ImportSSTGrpc.ImportSSTBlockingStub, ImportSSTGrpc.ImportSSTStub>
implements StreamObserver<ImportSstpb.RawWriteResponse> {
implements StreamObserver<ResponseClass> {

private static final Logger logger = LoggerFactory.getLogger(ImporterStoreClient.class);

Expand All @@ -53,43 +53,43 @@ protected ImporterStoreClient(
super(conf, channelFactory, blockingStub, asyncStub);
}

private StreamObserver<ImportSstpb.RawWriteRequest> streamObserverRequest;
private ImportSstpb.RawWriteResponse rawWriteResponse;
private Throwable rawWriteError;
private StreamObserver<RequestClass> streamObserverRequest;
private ResponseClass writeResponse;
private Throwable writeError;

public synchronized boolean isRawWriteResponseReceived() {
return rawWriteResponse != null;
public synchronized boolean isWriteResponseReceived() {
return writeResponse != null;
}

private synchronized ImportSstpb.RawWriteResponse getRawWriteResponse() {
return rawWriteResponse;
private synchronized ResponseClass getWriteResponse() {
return writeResponse;
}

private synchronized void setRawWriteResponse(ImportSstpb.RawWriteResponse rawWriteResponse) {
this.rawWriteResponse = rawWriteResponse;
private synchronized void setWriteResponse(ResponseClass writeResponse) {
this.writeResponse = writeResponse;
}

public synchronized boolean hasRawWriteResponseError() {
return this.rawWriteError != null;
public synchronized boolean hasWriteResponseError() {
return this.writeError != null;
}

public synchronized Throwable getRawWriteError() {
return this.rawWriteError;
public synchronized Throwable getWriteError() {
return this.writeError;
}

private synchronized void setRawWriteError(Throwable t) {
this.rawWriteError = t;
private synchronized void setWriteError(Throwable t) {
this.writeError = t;
}

@Override
public void onNext(ImportSstpb.RawWriteResponse value) {
setRawWriteResponse(value);
public void onNext(ResponseClass response) {
setWriteResponse(response);
}

@Override
public void onError(Throwable t) {
setRawWriteError(t);
logger.error("Error during raw write!", t);
setWriteError(t);
logger.error("Error during write!", t);
}

@Override
Expand All @@ -98,36 +98,51 @@ public void onCompleted() {
}

/**
* Ingest KV pairs to RawKV using gRPC streaming mode. This API should be called on both leader
* and followers.
* Ingest KV pairs to RawKV/Txn using gRPC streaming mode. This API should be called on both
* leader and followers.
*
* @return
*/
public void startRawWrite() {
streamObserverRequest = getAsyncStub().rawWrite(this);
public void startWrite() {
if (conf.isRawKVMode()) {
streamObserverRequest =
(StreamObserver<RequestClass>)
getAsyncStub().rawWrite((StreamObserver<ImportSstpb.RawWriteResponse>) this);
} else {
streamObserverRequest =
(StreamObserver<RequestClass>)
getAsyncStub().write((StreamObserver<ImportSstpb.WriteResponse>) this);
}
}

/**
* This API should be called after `startRawWrite`.
* This API should be called after `startWrite`.
*
* @param request
*/
public void rawWriteBatch(ImportSstpb.RawWriteRequest request) {
public void writeBatch(RequestClass request) {
streamObserverRequest.onNext(request);
}

/** This API should be called after `rawWriteBatch`. */
public void finishRawWrite() {
/** This API should be called after `writeBatch`. */
public void finishWrite() {
streamObserverRequest.onCompleted();
}

/**
* This API should be called after `finishRawWrite`. This API should be called on leader only.
* This API should be called after `finishWrite`. This API should be called on leader only.
*
* @param ctx
*/
public void multiIngest(Kvrpcpb.Context ctx) {
List<ImportSstpb.SSTMeta> metasList = getRawWriteResponse().getMetasList();
List<ImportSstpb.SSTMeta> metasList;
if (writeResponse instanceof ImportSstpb.RawWriteResponse) {
metasList = ((ImportSstpb.RawWriteResponse) getWriteResponse()).getMetasList();
} else if (writeResponse instanceof ImportSstpb.WriteResponse) {
metasList = ((ImportSstpb.WriteResponse) getWriteResponse()).getMetasList();
} else {
throw new IllegalArgumentException("Wrong response type");
}

ImportSstpb.MultiIngestRequest request =
ImportSstpb.MultiIngestRequest.newBuilder().setContext(ctx).addAllSsts(metasList).build();
Expand Down Expand Up @@ -163,7 +178,7 @@ protected ImportSSTGrpc.ImportSSTStub getAsyncStub() {
@Override
public void close() throws Exception {}

public static class ImporterStoreClientBuilder {
public static class ImporterStoreClientBuilder<RequestClass, ResponseClass> {
private final TiConfiguration conf;
private final ChannelFactory channelFactory;
private final RegionManager regionManager;
Expand Down Expand Up @@ -193,7 +208,8 @@ public synchronized ImporterStoreClient build(TiStore store) throws GrpcExceptio
ImportSSTGrpc.ImportSSTBlockingStub blockingStub = ImportSSTGrpc.newBlockingStub(channel);
ImportSSTGrpc.ImportSSTStub asyncStub = ImportSSTGrpc.newStub(channel);

return new ImporterStoreClient(conf, channelFactory, blockingStub, asyncStub);
return new ImporterStoreClient<RequestClass, ResponseClass>(
conf, channelFactory, blockingStub, asyncStub);
}
}
}
2 changes: 1 addition & 1 deletion src/main/java/org/tikv/raw/RawKVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ private void doIngest(TiRegion region, List<Pair<ByteString, ByteString>> sorted
Key maxKey = Key.toRawKey(sortedList.get(sortedList.size() - 1).first);
ImporterClient importerClient =
new ImporterClient(tiSession, uuid, minKey, maxKey, region, ttl);
importerClient.rawWrite(sortedList.iterator());
importerClient.write(sortedList.iterator());
}

private void doSendBatchPut(BackOffer backOffer, Map<ByteString, ByteString> kvPairs, long ttl) {
Expand Down
Loading