Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 57 additions & 5 deletions src/main/java/org/tikv/common/operation/RegionErrorHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@
import com.google.protobuf.ByteString;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.codec.KeyUtils;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.region.RegionErrorReceiver;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.kvproto.Errorpb;
import org.tikv.kvproto.Metapb;

public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
private static final Logger logger = LoggerFactory.getLogger(RegionErrorHandler.class);
Expand Down Expand Up @@ -115,11 +119,11 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {
// throwing it out.
return false;
} else if (error.hasEpochNotMatch()) {
// this error is reported from raftstore:
// region has outdated version,please try later.
logger.warn(String.format("Stale Epoch encountered for region [%s]", recv.getRegion()));
this.regionManager.onRegionStale(recv.getRegion());
return false;
logger.warn(
String.format(
"tikv reports `EpochNotMatch` retry later, region: %s, EpochNotMatch: %s",
recv.getRegion(), error.getEpochNotMatch()));
return onRegionEpochNotMatch(backOffer, error.getEpochNotMatch().getCurrentRegionsList());
} else if (error.hasServerIsBusy()) {
// this error is reported from kv:
// will occur when write pressure is high. Please try later.
Expand Down Expand Up @@ -171,6 +175,54 @@ public boolean handleRegionError(BackOffer backOffer, Errorpb.Error error) {
return false;
}

// ref: https://github.com/tikv/client-go/blob/tidb-5.2/internal/locate/region_request.go#L985
// OnRegionEpochNotMatch removes the old region and inserts new regions into the cache.
// It returns whether retries the request because it's possible the region epoch is ahead of
// TiKV's due to slow appling.
private boolean onRegionEpochNotMatch(BackOffer backOffer, List<Metapb.Region> currentRegions) {
if (currentRegions.size() == 0) {
this.regionManager.onRegionStale(recv.getRegion());
return false;
}

// Find whether the region epoch in `ctx` is ahead of TiKV's. If so, backoff.
for (Metapb.Region meta : currentRegions) {
if (meta.getId() == recv.getRegion().getId()
&& (meta.getRegionEpoch().getConfVer() < recv.getRegion().getVerID().getConfVer()
|| meta.getRegionEpoch().getVersion() < recv.getRegion().getVerID().getVer())) {
String errorMsg =
String.format(
"region epoch is ahead of tikv, region: %s, currentRegions: %s",
recv.getRegion(), currentRegions);
logger.info(errorMsg);
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoRegionMiss, new TiKVException(errorMsg));
return true;
}
}

boolean needInvalidateOld = true;
List<TiRegion> newRegions = new ArrayList<>(currentRegions.size());
// If the region epoch is not ahead of TiKV's, replace region meta in region cache.
for (Metapb.Region meta : currentRegions) {
TiRegion region = regionManager.createRegion(meta, backOffer);
newRegions.add(region);
if (recv.getRegion().getVerID() == region.getVerID()) {
needInvalidateOld = false;
}
}

if (needInvalidateOld) {
this.regionManager.onRegionStale(recv.getRegion());
}

for (TiRegion region : newRegions) {
regionManager.insertRegionToCache(region);
}

return false;
}

@Override
public boolean handleRequestError(BackOffer backOffer, Exception e) {
if (recv.onStoreUnreachable()) {
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/org/tikv/common/region/RegionCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,18 @@ public synchronized void invalidateRegion(TiRegion region) {
}
}

public synchronized void insertRegionToCache(TiRegion region) {
try {
TiRegion oldRegion = regionCache.get(region.getId());
if (oldRegion != null) {
keyToRegionIdCache.remove(makeRange(oldRegion.getStartKey(), oldRegion.getEndKey()));
}
regionCache.put(region.getId(), region);
keyToRegionIdCache.put(makeRange(region.getStartKey(), region.getEndKey()), region.getId());
} catch (Exception ignore) {
}
}

public synchronized boolean updateRegion(TiRegion expected, TiRegion region) {
try {
if (logger.isDebugEnabled()) {
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/org/tikv/common/region/RegionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ public Pair<TiRegion, TiStore> getRegionStorePairByKey(
return Pair.create(region, store);
}

public TiRegion createRegion(Metapb.Region region, BackOffer backOffer) {
List<Metapb.Peer> peers = region.getPeersList();
List<TiStore> stores = getRegionStore(peers, backOffer);
return new TiRegion(conf, region, null, peers, stores);
}

private TiRegion createRegion(Metapb.Region region, Metapb.Peer leader, BackOffer backOffer) {
List<Metapb.Peer> peers = region.getPeersList();
List<TiStore> stores = getRegionStore(peers, backOffer);
Expand Down Expand Up @@ -262,6 +268,10 @@ public void invalidateRegion(TiRegion region) {
cache.invalidateRegion(region);
}

public void insertRegionToCache(TiRegion region) {
cache.insertRegionToCache(region);
}

private BackOffer defaultBackOff() {
return ConcreteBackOffer.newCustomBackOff(conf.getRawKVDefaultBackoffInMS());
}
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/org/tikv/common/region/TiRegion.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ public TiRegion(
replicaIdx = 0;
}

public TiConfiguration getConf() {
return conf;
}

public Peer getLeader() {
return leader;
}
Expand Down Expand Up @@ -271,6 +275,18 @@ public class RegionVerID {
this.ver = ver;
}

public long getId() {
return id;
}

public long getConfVer() {
return confVer;
}

public long getVer() {
return ver;
}

@Override
public boolean equals(Object other) {
if (this == other) {
Expand Down