Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 75 additions & 1 deletion src/main/java/org/tikv/common/importer/ImporterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package org.tikv.common.importer;

import static org.tikv.common.operation.RegionErrorHandler.NO_LEADER_STORE_ID;

import com.google.protobuf.ByteString;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand All @@ -28,11 +32,16 @@
import org.tikv.common.codec.Codec;
import org.tikv.common.codec.CodecDataOutput;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.RegionException;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.key.Key;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Errorpb.Error;
import org.tikv.kvproto.ImportSstpb;
import org.tikv.kvproto.Metapb;

Expand Down Expand Up @@ -249,6 +258,71 @@ private void ingest() throws GrpcException {
}
}

clientLeader.multiIngest(region.getLeaderContext());
Object writeResponse = clientLeader.getWriteResponse();
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(BackOffer.INGEST_BACKOFF);
ingestWithRetry(writeResponse, backOffer);
}

private void ingestWithRetry(Object writeResponse, BackOffer backOffer) {
try {
clientLeader.multiIngest(region.getLeaderContext(), writeResponse);
} catch (RegionException e) {
logger.warn("ingest failed.", e);
boolean retry = false;
Error error = e.getRegionErr();
if (error != null) {
if (error.hasNotLeader()) {
retry = true;
long newStoreId = error.getNotLeader().getLeader().getStoreId();

// update Leader here
logger.warn(
String.format(
"NotLeader Error with region id %d and store id %d, new store id %d",
region.getId(), region.getLeader().getStoreId(), newStoreId));

BackOffFunction.BackOffFuncType backOffFuncType;
if (newStoreId != NO_LEADER_STORE_ID) {
long regionId = region.getId();
region = tiSession.getRegionManager().updateLeader(region, newStoreId);
if (region == null) {
// epoch is not changed, getRegionById is faster than getRegionByKey
region = tiSession.getRegionManager().getRegionById(regionId);
}
backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader;
} else {
logger.info(
String.format(
"Received zero store id, from region %d try next time", region.getId()));
tiSession.getRegionManager().invalidateRegion(region);
region = tiSession.getRegionManager().getRegionById(region.getId());
backOffFuncType = BackOffFunction.BackOffFuncType.BoRegionMiss;
}

backOffer.doBackOff(backOffFuncType, e);
init();
} else if (error.hasServerIsBusy()) {
retry = true;
// this error is reported from kv:
// will occur when write pressure is high. Please try later.
logger.warn(
String.format(
"Server is busy for region [%s], reason: %s",
region, error.getServerIsBusy().getReason()));
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoServerBusy,
new StatusRuntimeException(
Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString())));
} else {
tiSession.getRegionManager().invalidateRegion(region);
}
}

if (retry) {
ingestWithRetry(writeResponse, backOffer);
} else {
throw e;
}
}
}
}
15 changes: 9 additions & 6 deletions src/main/java/org/tikv/common/importer/ImporterStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.tikv.common.PDClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.RegionException;
import org.tikv.common.operation.NoopHandler;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiStore;
Expand Down Expand Up @@ -61,7 +62,7 @@ public synchronized boolean isWriteResponseReceived() {
return writeResponse != null;
}

private synchronized ResponseClass getWriteResponse() {
public synchronized ResponseClass getWriteResponse() {
return writeResponse;
}

Expand Down Expand Up @@ -133,23 +134,25 @@ public void finishWrite() {
* This API should be called after `finishWrite`. This API should be called on leader only.
*
* @param ctx
* @param writeResponse
* @throws RegionException
*/
public void multiIngest(Kvrpcpb.Context ctx) {
public void multiIngest(Kvrpcpb.Context ctx, Object writeResponse) throws RegionException {
List<ImportSstpb.SSTMeta> metasList;
if (writeResponse instanceof ImportSstpb.RawWriteResponse) {
metasList = ((ImportSstpb.RawWriteResponse) getWriteResponse()).getMetasList();
metasList = ((ImportSstpb.RawWriteResponse) writeResponse).getMetasList();
} else if (writeResponse instanceof ImportSstpb.WriteResponse) {
metasList = ((ImportSstpb.WriteResponse) getWriteResponse()).getMetasList();
metasList = ((ImportSstpb.WriteResponse) writeResponse).getMetasList();
} else {
throw new IllegalArgumentException("Wrong response type");
throw new IllegalArgumentException("Wrong response type: " + writeResponse);
}

ImportSstpb.MultiIngestRequest request =
ImportSstpb.MultiIngestRequest.newBuilder().setContext(ctx).addAllSsts(metasList).build();

ImportSstpb.IngestResponse response = getBlockingStub().multiIngest(request);
if (response.hasError()) {
throw new GrpcException("" + response.getError());
throw new RegionException(response.getError());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
private static final Logger logger = LoggerFactory.getLogger(RegionErrorHandler.class);
// if a store does not have leader currently, store id is set to 0
private static final int NO_LEADER_STORE_ID = 0;
public static final int NO_LEADER_STORE_ID = 0;
private final Function<RespT, Errorpb.Error> getRegionError;
private final RegionManager regionManager;
private final RegionErrorReceiver recv;
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/tikv/common/util/BackOffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public interface BackOffer {
int TIKV_SWITCH_MODE_BACKOFF = seconds;
int SPLIT_REGION_BACKOFF = 12000;
int SCATTER_REGION_BACKOFF = 30000;
int INGEST_BACKOFF = 30000;

/**
* doBackOff sleeps a while base on the BackOffType and records the error message. Will stop until
Expand Down