Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/main/java/org/tikv/common/operation/RegionErrorHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,19 @@ private boolean onRegionEpochNotMatch(BackOffer backOffer, List<Metapb.Region> c
@Override
public boolean handleRequestError(BackOffer backOffer, Exception e) {
if (recv.onStoreUnreachable()) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoTiKVRPC, e);
if (!backOffer.canRetryAfterSleep(BackOffFunction.BackOffFuncType.BoTiKVRPC)) {
regionManager.onRequestFail(recv.getRegion());
throw new GrpcException("retry is exhausted.", e);
}
return true;
}

logger.warn("request failed because of: " + e.getMessage());
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoTiKVRPC,
new GrpcException(
"send tikv request error: " + e.getMessage() + ", try next peer later", e));
if (!backOffer.canRetryAfterSleep(BackOffFunction.BackOffFuncType.BoTiKVRPC)) {
regionManager.onRequestFail(recv.getRegion());
throw new GrpcException(
"send tikv request error: " + e.getMessage() + ", try next peer later", e);
}
// TiKV maybe down, so do not retry in `callWithRetry`
// should re-fetch the new leader from PD and send request to it
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ public boolean onStoreUnreachable() {
// so that we can
// reduce the latency cost by fail requests.
if (targetStore.canForwardFirst()) {
if (conf.getEnableGrpcForward() && retryForwardTimes <= region.getFollowerList().size()) {
return retryOtherStoreByProxyForward();
if (retryOtherStoreByProxyForward()) {
return true;
}
if (retryOtherStoreLeader()) {
return true;
Expand All @@ -153,12 +153,10 @@ public boolean onStoreUnreachable() {
if (retryOtherStoreLeader()) {
return true;
}
if (conf.getEnableGrpcForward() && retryForwardTimes <= region.getFollowerList().size()) {
return retryOtherStoreByProxyForward();
if (retryOtherStoreByProxyForward()) {
return true;
}
return true;
}

logger.warn(
String.format(
"retry time exceed for region[%d], invalid this region[%d]",
Expand Down Expand Up @@ -262,6 +260,9 @@ private void updateClientStub() {
}

private boolean retryOtherStoreByProxyForward() {
if (!conf.getEnableGrpcForward() || retryForwardTimes > region.getFollowerList().size()) {
return false;
}
TiStore proxyStore = switchProxyStore();
if (proxyStore == null) {
logger.warn(
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/tikv/common/util/BackOffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ public interface BackOffer {
* max back off time exceeded and throw an exception to the caller.
*/
void doBackOff(BackOffFunction.BackOffFuncType funcType, Exception err);
/**
* canRetryAfterSleep sleeps a while base on the BackOffType and records the error message. Will
* stop until max back off time exceeded and throw an exception to the caller. It will return
* false if the total sleep time has exceed some limit condition.
*/
boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType);

/**
* BackoffWithMaxSleep sleeps a while base on the backoffType and records the error message and
Expand Down
76 changes: 42 additions & 34 deletions src/main/java/org/tikv/common/util/ConcreteBackOffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,47 +146,55 @@ public void doBackOff(BackOffFunction.BackOffFuncType funcType, Exception err) {
}

@Override
public void doBackOffWithMaxSleep(
BackOffFunction.BackOffFuncType funcType, long maxSleepMs, Exception err) {
public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType) {
return canRetryAfterSleep(funcType, -1);
}

public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long maxSleepMs) {
Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(funcType.name()).startTimer();
try {
BackOffFunction backOffFunction =
backOffFunctionMap.computeIfAbsent(funcType, this::createBackOffFunc);

// Back off will not be done here
long sleep = backOffFunction.getSleepMs(maxSleepMs);
totalSleep += sleep;

logger.debug(
String.format(
"%s, retry later(totalSleep %dms, maxSleep %dms)",
err.getMessage(), totalSleep, maxSleep));
errors.add(err);

// Check deadline
if (deadline > 0) {
long currentMs = System.currentTimeMillis();
if (currentMs + sleep >= deadline) {
logThrowError(String.format("Deadline %d is exceeded, errors:", deadline), err);
}
BackOffFunction backOffFunction =
backOffFunctionMap.computeIfAbsent(funcType, this::createBackOffFunc);

// Back off will not be done here
long sleep = backOffFunction.getSleepMs(maxSleepMs);
totalSleep += sleep;
// Check deadline
if (deadline > 0) {
long currentMs = System.currentTimeMillis();
if (currentMs + sleep >= deadline) {
logger.warn(String.format("Deadline %d is exceeded, errors:", deadline));
return false;
}
}

try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
throw new GrpcException(e);
}
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
throw new GrpcException(e);
}
backOffTimer.observeDuration();
if (maxSleep > 0 && totalSleep >= maxSleep) {
logger.warn(String.format("BackOffer.maxSleep %dms is exceeded, errors:", maxSleep));
return false;
}
return true;
}

if (maxSleep > 0 && totalSleep >= maxSleep) {
logThrowError(String.format("BackOffer.maxSleep %dms is exceeded, errors:", maxSleep), err);
}
} finally {
backOffTimer.observeDuration();
@Override
public void doBackOffWithMaxSleep(
BackOffFunction.BackOffFuncType funcType, long maxSleepMs, Exception err) {
logger.debug(
String.format(
"%s, retry later(totalSleep %dms, maxSleep %dms)",
err.getMessage(), totalSleep, maxSleep));
errors.add(err);
if (!canRetryAfterSleep(funcType, maxSleepMs)) {
logThrowError(err);
}
}

private void logThrowError(String msg, Exception err) {
StringBuilder errMsg = new StringBuilder(msg);
private void logThrowError(Exception err) {
StringBuilder errMsg = new StringBuilder();
for (int i = 0; i < errors.size(); i++) {
Exception curErr = errors.get(i);
// Print only last 3 errors for non-DEBUG log levels.
Expand Down