Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 38 additions & 26 deletions src/main/java/org/tikv/common/policy/RetryPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ public abstract class RetryPolicy<RespT> {
.help("grpc request latency.")
.labelNames("type")
.register();
public static final Histogram CALL_WITH_RETRY_DURATION =
Histogram.build()
.name("client_java_call_with_retry_duration")
.help("callWithRetry duration.")
.labelNames("type")
.register();
public static final Counter GRPC_REQUEST_RETRY_NUM =
Counter.build()
.name("client_java_grpc_requests_retry_num")
Expand Down Expand Up @@ -62,37 +68,43 @@ private void rethrowNotRecoverableException(Exception e) {
}

public RespT callWithRetry(Callable<RespT> proc, String methodName) {
while (true) {
RespT result = null;
try {
// add single request duration histogram
Histogram.Timer requestTimer = GRPC_SINGLE_REQUEST_LATENCY.labels(methodName).startTimer();
Histogram.Timer callWithRetryTimer = CALL_WITH_RETRY_DURATION.labels(methodName).startTimer();
try {
while (true) {
RespT result = null;
try {
result = proc.call();
} finally {
requestTimer.observeDuration();
// add single request duration histogram
Histogram.Timer requestTimer =
GRPC_SINGLE_REQUEST_LATENCY.labels(methodName).startTimer();
try {
result = proc.call();
} finally {
requestTimer.observeDuration();
}
} catch (Exception e) {
rethrowNotRecoverableException(e);
// Handle request call error
boolean retry = handler.handleRequestError(backOffer, e);
if (retry) {
GRPC_REQUEST_RETRY_NUM.labels(methodName).inc();
continue;
} else {
return result;
}
}
} catch (Exception e) {
rethrowNotRecoverableException(e);
// Handle request call error
boolean retry = handler.handleRequestError(backOffer, e);
if (retry) {
GRPC_REQUEST_RETRY_NUM.labels(methodName).inc();
continue;
} else {
return result;
}
}

// Handle response error
if (handler != null) {
boolean retry = handler.handleResponseError(backOffer, result);
if (retry) {
GRPC_REQUEST_RETRY_NUM.labels(methodName).inc();
continue;
// Handle response error
if (handler != null) {
boolean retry = handler.handleResponseError(backOffer, result);
if (retry) {
GRPC_REQUEST_RETRY_NUM.labels(methodName).inc();
continue;
}
}
return result;
}
return result;
} finally {
callWithRetryTimer.observeDuration();
}
}

Expand Down
67 changes: 40 additions & 27 deletions src/main/java/org/tikv/common/util/ConcreteBackOffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.tikv.common.ConfigUtils.TIKV_BO_REGION_MISS_BASE_IN_MS;

import com.google.common.base.Preconditions;
import io.prometheus.client.Histogram;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -35,7 +36,14 @@ public class ConcreteBackOffer implements BackOffer {
private final Map<BackOffFunction.BackOffFuncType, BackOffFunction> backOffFunctionMap;
private final List<Exception> errors;
private int totalSleep;
private long deadline;
private final long deadline;

public static final Histogram BACKOFF_DURATION =
Histogram.build()
.name("client_java_backoff_duration")
.help("backoff duration.")
.labelNames("type")
.register();

private ConcreteBackOffer(int maxSleep, long deadline) {
Preconditions.checkArgument(
Expand Down Expand Up @@ -140,35 +148,40 @@ public void doBackOff(BackOffFunction.BackOffFuncType funcType, Exception err) {
@Override
public void doBackOffWithMaxSleep(
BackOffFunction.BackOffFuncType funcType, long maxSleepMs, Exception err) {
BackOffFunction backOffFunction =
backOffFunctionMap.computeIfAbsent(funcType, this::createBackOffFunc);

// Back off will not be done here
long sleep = backOffFunction.getSleepMs(maxSleepMs);
totalSleep += sleep;

logger.debug(
String.format(
"%s, retry later(totalSleep %dms, maxSleep %dms)",
err.getMessage(), totalSleep, maxSleep));
errors.add(err);

// Check deadline
if (deadline > 0) {
long currentMs = System.currentTimeMillis();
if (currentMs + sleep >= deadline) {
logThrowError(String.format("Deadline %d is exceeded, errors:", deadline), err);
Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(funcType.name()).startTimer();
try {
BackOffFunction backOffFunction =
backOffFunctionMap.computeIfAbsent(funcType, this::createBackOffFunc);

// Back off will not be done here
long sleep = backOffFunction.getSleepMs(maxSleepMs);
totalSleep += sleep;

logger.debug(
String.format(
"%s, retry later(totalSleep %dms, maxSleep %dms)",
err.getMessage(), totalSleep, maxSleep));
errors.add(err);

// Check deadline
if (deadline > 0) {
long currentMs = System.currentTimeMillis();
if (currentMs + sleep >= deadline) {
logThrowError(String.format("Deadline %d is exceeded, errors:", deadline), err);
}
}
}

try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
throw new GrpcException(e);
}
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
throw new GrpcException(e);
}

if (maxSleep > 0 && totalSleep >= maxSleep) {
logThrowError(String.format("BackOffer.maxSleep %dms is exceeded, errors:", maxSleep), err);
if (maxSleep > 0 && totalSleep >= maxSleep) {
logThrowError(String.format("BackOffer.maxSleep %dms is exceeded, errors:", maxSleep), err);
}
} finally {
backOffTimer.observeDuration();
}
}

Expand Down