diff --git a/src/main/java/org/tikv/common/importer/ImporterClient.java b/src/main/java/org/tikv/common/importer/ImporterClient.java index 3c86eaf7a27..d0f17f304a0 100644 --- a/src/main/java/org/tikv/common/importer/ImporterClient.java +++ b/src/main/java/org/tikv/common/importer/ImporterClient.java @@ -17,7 +17,11 @@ package org.tikv.common.importer; +import static org.tikv.common.operation.RegionErrorHandler.NO_LEADER_STORE_ID; + import com.google.protobuf.ByteString; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -28,11 +32,16 @@ import org.tikv.common.codec.Codec; import org.tikv.common.codec.CodecDataOutput; import org.tikv.common.exception.GrpcException; +import org.tikv.common.exception.RegionException; import org.tikv.common.exception.TiKVException; import org.tikv.common.key.Key; import org.tikv.common.region.TiRegion; import org.tikv.common.region.TiStore; +import org.tikv.common.util.BackOffFunction; +import org.tikv.common.util.BackOffer; +import org.tikv.common.util.ConcreteBackOffer; import org.tikv.common.util.Pair; +import org.tikv.kvproto.Errorpb.Error; import org.tikv.kvproto.ImportSstpb; import org.tikv.kvproto.Metapb; @@ -249,6 +258,71 @@ private void ingest() throws GrpcException { } } - clientLeader.multiIngest(region.getLeaderContext()); + Object writeResponse = clientLeader.getWriteResponse(); + BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(BackOffer.INGEST_BACKOFF); + ingestWithRetry(writeResponse, backOffer); + } + + private void ingestWithRetry(Object writeResponse, BackOffer backOffer) { + try { + clientLeader.multiIngest(region.getLeaderContext(), writeResponse); + } catch (RegionException e) { + logger.warn("ingest failed.", e); + boolean retry = false; + Error error = e.getRegionErr(); + if (error != null) { + if (error.hasNotLeader()) { + retry = true; + long newStoreId = error.getNotLeader().getLeader().getStoreId(); + + // update Leader here + logger.warn( + String.format( + "NotLeader Error with region id %d and store id %d, new store id %d", + region.getId(), region.getLeader().getStoreId(), newStoreId)); + + BackOffFunction.BackOffFuncType backOffFuncType; + if (newStoreId != NO_LEADER_STORE_ID) { + long regionId = region.getId(); + region = tiSession.getRegionManager().updateLeader(region, newStoreId); + if (region == null) { + // epoch is not changed, getRegionById is faster than getRegionByKey + region = tiSession.getRegionManager().getRegionById(regionId); + } + backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader; + } else { + logger.info( + String.format( + "Received zero store id, from region %d try next time", region.getId())); + tiSession.getRegionManager().invalidateRegion(region); + region = tiSession.getRegionManager().getRegionById(region.getId()); + backOffFuncType = BackOffFunction.BackOffFuncType.BoRegionMiss; + } + + backOffer.doBackOff(backOffFuncType, e); + init(); + } else if (error.hasServerIsBusy()) { + retry = true; + // this error is reported from kv: + // will occur when write pressure is high. Please try later. + logger.warn( + String.format( + "Server is busy for region [%s], reason: %s", + region, error.getServerIsBusy().getReason())); + backOffer.doBackOff( + BackOffFunction.BackOffFuncType.BoServerBusy, + new StatusRuntimeException( + Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString()))); + } else { + tiSession.getRegionManager().invalidateRegion(region); + } + } + + if (retry) { + ingestWithRetry(writeResponse, backOffer); + } else { + throw e; + } + } } } diff --git a/src/main/java/org/tikv/common/importer/ImporterStoreClient.java b/src/main/java/org/tikv/common/importer/ImporterStoreClient.java index 79c302c314c..87c57853608 100644 --- a/src/main/java/org/tikv/common/importer/ImporterStoreClient.java +++ b/src/main/java/org/tikv/common/importer/ImporterStoreClient.java @@ -29,6 +29,7 @@ import org.tikv.common.PDClient; import org.tikv.common.TiConfiguration; import org.tikv.common.exception.GrpcException; +import org.tikv.common.exception.RegionException; import org.tikv.common.operation.NoopHandler; import org.tikv.common.region.RegionManager; import org.tikv.common.region.TiStore; @@ -61,7 +62,7 @@ public synchronized boolean isWriteResponseReceived() { return writeResponse != null; } - private synchronized ResponseClass getWriteResponse() { + public synchronized ResponseClass getWriteResponse() { return writeResponse; } @@ -133,15 +134,17 @@ public void finishWrite() { * This API should be called after `finishWrite`. This API should be called on leader only. * * @param ctx + * @param writeResponse + * @throws RegionException */ - public void multiIngest(Kvrpcpb.Context ctx) { + public void multiIngest(Kvrpcpb.Context ctx, Object writeResponse) throws RegionException { List metasList; if (writeResponse instanceof ImportSstpb.RawWriteResponse) { - metasList = ((ImportSstpb.RawWriteResponse) getWriteResponse()).getMetasList(); + metasList = ((ImportSstpb.RawWriteResponse) writeResponse).getMetasList(); } else if (writeResponse instanceof ImportSstpb.WriteResponse) { - metasList = ((ImportSstpb.WriteResponse) getWriteResponse()).getMetasList(); + metasList = ((ImportSstpb.WriteResponse) writeResponse).getMetasList(); } else { - throw new IllegalArgumentException("Wrong response type"); + throw new IllegalArgumentException("Wrong response type: " + writeResponse); } ImportSstpb.MultiIngestRequest request = @@ -149,7 +152,7 @@ public void multiIngest(Kvrpcpb.Context ctx) { ImportSstpb.IngestResponse response = getBlockingStub().multiIngest(request); if (response.hasError()) { - throw new GrpcException("" + response.getError()); + throw new RegionException(response.getError()); } } diff --git a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java index 9aa678b41e7..452aad13d06 100644 --- a/src/main/java/org/tikv/common/operation/RegionErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/RegionErrorHandler.java @@ -18,7 +18,7 @@ public class RegionErrorHandler implements ErrorHandler { private static final Logger logger = LoggerFactory.getLogger(RegionErrorHandler.class); // if a store does not have leader currently, store id is set to 0 - private static final int NO_LEADER_STORE_ID = 0; + public static final int NO_LEADER_STORE_ID = 0; private final Function getRegionError; private final RegionManager regionManager; private final RegionErrorReceiver recv; diff --git a/src/main/java/org/tikv/common/util/BackOffer.java b/src/main/java/org/tikv/common/util/BackOffer.java index 9cdf39ba0d7..01926d6d42b 100644 --- a/src/main/java/org/tikv/common/util/BackOffer.java +++ b/src/main/java/org/tikv/common/util/BackOffer.java @@ -31,6 +31,7 @@ public interface BackOffer { int TIKV_SWITCH_MODE_BACKOFF = seconds; int SPLIT_REGION_BACKOFF = 12000; int SCATTER_REGION_BACKOFF = 30000; + int INGEST_BACKOFF = 30000; /** * doBackOff sleeps a while base on the BackOffType and records the error message. Will stop until