Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ private boolean onRegionEpochNotMatch(BackOffer backOffer, List<Metapb.Region> c

@Override
public boolean handleRequestError(BackOffer backOffer, Exception e) {
if (recv.onStoreUnreachable(backOffer.getSlowLog())) {
if (recv.onStoreUnreachable(backOffer)) {
if (!backOffer.canRetryAfterSleep(BackOffFunction.BackOffFuncType.BoTiKVRPC)) {
regionManager.onRequestFail(recv.getRegion());
throw new GrpcException("retry is exhausted.", e);
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/tikv/common/policy/RetryPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public RespT callWithRetry(Callable<RespT> proc, String methodName, BackOffer ba
} catch (Exception e) {
rethrowNotRecoverableException(e);
// Handle request call error
backOffer.checkTimeout();
boolean retry = handler.handleRequestError(backOffer, e);
if (retry) {
GRPC_REQUEST_RETRY_NUM.labels(methodName).inc();
Expand Down
25 changes: 11 additions & 14 deletions src/main/java/org/tikv/common/region/AbstractRegionStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@
import org.tikv.common.AbstractGRPCClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.log.SlowLog;
import org.tikv.common.log.SlowLogEmptyImpl;
import org.tikv.common.log.SlowLogSpan;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Metapb;
Expand Down Expand Up @@ -81,10 +80,6 @@ protected AbstractRegionStoreClient(
this.store = store;
if (this.store.getProxyStore() != null) {
this.timeout = conf.getForwardTimeout();
} else if (!this.store.isReachable()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why it's removed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need to do Seek Leader and Seek Proxy during initializing RegionStoreClient. Reasons are:

  • Seek Leader and Seek Proxy cost time, but we do not want to the initializing process cost too much time.
  • Seek Leader and Seek Proxy will happen after the first call when reciving gRPC Deadline Exceeded.

// cannot get Deadline or SlowLog instance here
// use SlowLogEmptyImpl instead to skip slow log record
onStoreUnreachable(SlowLogEmptyImpl.INSTANCE);
}
}

Expand Down Expand Up @@ -134,22 +129,24 @@ public boolean onNotLeader(TiRegion newRegion) {
}

@Override
public boolean onStoreUnreachable(SlowLog slowLog) {
public boolean onStoreUnreachable(BackOffer backOffer) {
if (!store.isValid()) {
logger.warn(String.format("store [%d] has been invalid", store.getId()));
store = regionManager.getStoreById(store.getId());
store = regionManager.getStoreById(store.getId(), backOffer);
updateClientStub();
return true;
}

// seek an available leader store to send request
Boolean result = seekLeaderStore(slowLog);
backOffer.checkTimeout();
Boolean result = seekLeaderStore(backOffer);
if (result != null) {
return result;
}
if (conf.getEnableGrpcForward()) {
// seek an available proxy store to forward request
return seekProxyStore(slowLog);
backOffer.checkTimeout();
return seekProxyStore(backOffer);
}
return false;
}
Expand Down Expand Up @@ -182,9 +179,9 @@ private void updateClientStub() {
}
}

private Boolean seekLeaderStore(SlowLog slowLog) {
private Boolean seekLeaderStore(BackOffer backOffer) {
Histogram.Timer switchLeaderDurationTimer = SEEK_LEADER_STORE_DURATION.startTimer();
SlowLogSpan slowLogSpan = slowLog.start("seekLeaderStore");
SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("seekLeaderStore");
try {
List<Metapb.Peer> peers = region.getFollowerList();
if (peers.isEmpty()) {
Expand Down Expand Up @@ -226,8 +223,8 @@ private Boolean seekLeaderStore(SlowLog slowLog) {
return null;
}

private boolean seekProxyStore(SlowLog slowLog) {
SlowLogSpan slowLogSpan = slowLog.start("seekProxyStore");
private boolean seekProxyStore(BackOffer backOffer) {
SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("seekProxyStore");
Histogram.Timer grpcForwardDurationTimer = SEEK_PROXY_STORE_DURATION.startTimer();
try {
logger.info(String.format("try grpc forward: region[%d]", region.getId()));
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/tikv/common/region/RegionErrorReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

package org.tikv.common.region;

import org.tikv.common.log.SlowLog;
import org.tikv.common.util.BackOffer;

public interface RegionErrorReceiver {
boolean onNotLeader(TiRegion region);

/// return whether we need to retry this request.
boolean onStoreUnreachable(SlowLog slowLog);
boolean onStoreUnreachable(BackOffer backOffer);

TiRegion getRegion();
}
3 changes: 3 additions & 0 deletions src/main/java/org/tikv/common/region/RegionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.InvalidStoreException;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.log.SlowLogSpan;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.ConcreteBackOffer;
Expand Down Expand Up @@ -96,6 +97,7 @@ public TiRegion getRegionByKey(ByteString key) {

public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) {
Histogram.Timer requestTimer = GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer();
SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("getRegionByKey");
TiRegion region = cache.getRegionByKey(key, backOffer);
try {
if (region == null) {
Expand All @@ -106,6 +108,7 @@ public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) {
}
} finally {
requestTimer.observeDuration();
slowLogSpan.end();
}

return region;
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/tikv/common/util/BackOffFunction.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public enum BackOffFuncType {
BoRegionMiss,
BoUpdateLeader,
BoServerBusy,
BoTxnNotFound
BoTxnNotFound,
BoCheckTimeout
}
}
4 changes: 4 additions & 0 deletions src/main/java/org/tikv/common/util/BackOffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ public interface BackOffer {
* max back off time exceeded and throw an exception to the caller.
*/
void doBackOff(BackOffFunction.BackOffFuncType funcType, Exception err);

/** check if deadline exceeded. */
void checkTimeout();

/**
* canRetryAfterSleep sleeps a while base on the BackOffType and records the error message. Will
* stop until max back off time exceeded and throw an exception to the caller. It will return
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/org/tikv/common/util/ConcreteBackOffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.log.SlowLog;
import org.tikv.common.log.SlowLogEmptyImpl;
import org.tikv.common.log.SlowLogSpan;
Expand Down Expand Up @@ -142,6 +143,9 @@ private BackOffFunction createBackOffFunc(BackOffFunction.BackOffFuncType funcTy
case BoTxnNotFound:
backOffFunction = BackOffFunction.create(2, 500, BackOffStrategy.NoJitter);
break;
case BoCheckTimeout:
backOffFunction = BackOffFunction.create(0, 0, BackOffStrategy.NoJitter);
break;
}
return backOffFunction;
}
Expand All @@ -151,6 +155,13 @@ public void doBackOff(BackOffFunction.BackOffFuncType funcType, Exception err) {
doBackOffWithMaxSleep(funcType, -1, err);
}

@Override
public void checkTimeout() {
if (!canRetryAfterSleep(BackOffFunction.BackOffFuncType.BoCheckTimeout)) {
logThrowError(new TiKVException("Request Timeout"));
}
}

@Override
public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType) {
return canRetryAfterSleep(funcType, -1);
Expand Down
25 changes: 21 additions & 4 deletions src/test/java/org/tikv/raw/RawKVClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,7 @@ public void testCustomBackOff() {
} finally {
long e = System.currentTimeMillis();
long duration = e - s;
logger.info("duration = " + duration);
assert (duration >= 2900);
assertTrue(duration >= 2900);
}
}

Expand All @@ -199,8 +198,26 @@ public void testDeadlineBackOff() {
} finally {
long e = System.currentTimeMillis();
long duration = e - s;
logger.info("duration = " + duration);
assert (duration <= timeout + sleep);
assertTrue(duration <= timeout + sleep);
}
}

@Test
public void testBackoffTimeout() {
int timeout = 500;
int sleep = 150;
BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(timeout, SlowLogEmptyImpl.INSTANCE);
long s = System.currentTimeMillis();
try {
while (true) {
Thread.sleep(sleep);
backOffer.checkTimeout();
}
} catch (Exception ignored) {
} finally {
long e = System.currentTimeMillis();
long duration = e - s;
assertTrue(duration <= timeout + sleep);
}
}

Expand Down