Skip to content
This repository was archived by the owner on Sep 26, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.tracing.ApiTracer;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -137,14 +138,23 @@ void clearAttemptServiceData() {

// "super." is used here to avoid infinite loops of callback chains
void handleAttempt(Throwable throwable, ResponseT response) {
ApiTracer tracer = retryingContext.getTracer();

synchronized (lock) {
try {
clearAttemptServiceData();
if (throwable instanceof CancellationException) {
// An attempt triggered cancellation.
// In almost all cases, the operation caller caused the attempt to trigger the
// cancellation by invoking cancel() on the CallbackChainRetryingFuture, which cancelled
// the current attempt.
// In a theoretical scenario, the attempt callable might've thrown the exception on its
// own volition. However it's currently impossible to disambiguate the 2 scenarios.
tracer.attemptCancelled();

This comment was marked as spam.

This comment was marked as spam.

super.cancel(false);
} else if (throwable instanceof RejectedExecutionException) {
// external executor cannot continue retrying
tracer.attemptPermanentFailure(throwable);
super.setException(throwable);
}
if (isDone()) {
Expand All @@ -155,21 +165,30 @@ void handleAttempt(Throwable throwable, ResponseT response) {
retryAlgorithm.createNextAttempt(throwable, response, attemptSettings);
boolean shouldRetry = retryAlgorithm.shouldRetry(throwable, response, nextAttemptSettings);
if (shouldRetry) {
tracer.attemptFailed(throwable, nextAttemptSettings.getRandomizedRetryDelay());
attemptSettings = nextAttemptSettings;
setAttemptResult(throwable, response, true);
// a new attempt will be (must be) scheduled by an external executor
} else if (throwable != null) {
if (retryAlgorithm.getResultAlgorithm().shouldRetry(throwable, response)) {
tracer.attemptFailedRetriesExhausted(throwable);
} else {
tracer.attemptPermanentFailure(throwable);
}
super.setException(throwable);
} else {
tracer.attemptSucceeded();
super.set(response);
}
} catch (CancellationException e) {
// A retry algorithm triggered cancellation.
tracer.attemptFailedRetriesExhausted(e);
super.cancel(false);
} catch (Exception e) {
// Should never happen, but still possible in case of buggy retry algorithm implementation.
// Any bugs/exceptions (except CancellationException) in retry algorithms immediately
// terminate retrying future and set the result to the thrown exception.
tracer.attemptPermanentFailure(e);
super.setException(e);
}
}
Expand Down
5 changes: 5 additions & 0 deletions gax/src/main/java/com/google/api/gax/rpc/AttemptCallable.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public ResponseT call() {
if (externalFuture.isDone()) {
return null;
}

callContext
.getTracer()
.attemptStarted(externalFuture.getAttemptSettings().getOverallAttemptCount());

ApiFuture<ResponseT> internalFuture = callable.futureCall(request, callContext);
externalFuture.setAttemptFuture(internalFuture);
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ public ResponseT call() {
if (externalFuture.isDone()) {
return null;
}

callContext
.getTracer()
.attemptStarted(externalFuture.getAttemptSettings().getOverallAttemptCount());

// NOTE: The callable here is an OperationCheckingCallable, which will compose its own
// request using a resolved operation name and ignore anything that we pass here for the
// request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ public Void call() {
outerRetryingFuture.getAttemptSettings().getRpcTimeout());
}

attemptContext
.getTracer()
.attemptStarted(outerRetryingFuture.getAttemptSettings().getOverallAttemptCount());

innerCallable.call(
request,
new StateCheckingResponseObserver<ResponseT>() {
Expand Down
7 changes: 6 additions & 1 deletion gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ public interface ApiTracer {
/** Adds an annotation that the attempt succeeded. */
void attemptSucceeded();

/** Add an annotation that the attempt was cancelled by the user. */
void attemptCancelled();

/**
* Adds an annotation that the attempt failed, but another attempt will be made after the delay.
*
Expand All @@ -93,8 +96,10 @@ public interface ApiTracer {
/**
* Adds an annotation that the attempt failed and that no further attempts will be made because
* retry limits have been reached.
*
* @param error the last error received before retries were exhausted.
*/
void attemptFailedRetriesExhausted();
void attemptFailedRetriesExhausted(Throwable error);

/**
* Adds an annotation that the attempt failed and that no further attempts will be made because
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,18 @@ public void attemptSucceeded() {
// noop
}

@Override
public void attemptCancelled() {
// noop
}

@Override
public void attemptFailed(Throwable error, Duration delay) {
// noop
}

@Override
public void attemptFailedRetriesExhausted() {
public void attemptFailedRetriesExhausted(Throwable error) {
// noop
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,58 +36,86 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import com.google.api.core.ApiFuture;
import com.google.api.core.NanoClock;
import com.google.api.gax.retrying.FailingCallable.CustomException;
import com.google.api.gax.rpc.testing.FakeCallContext;
import com.google.api.gax.tracing.ApiTracer;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.threeten.bp.Duration;

@RunWith(MockitoJUnitRunner.class)
@RunWith(JUnit4.class)
public abstract class AbstractRetryingExecutorTest {
@Mock protected RetryingContext retryingContext;
@Rule public final MockitoRule mockitoRule = MockitoJUnit.rule();

@Mock protected ApiTracer tracer;
protected RetryingContext retryingContext;

protected abstract RetryingExecutorWithContext<String> getExecutor(
RetryAlgorithm<String> retryAlgorithm);

protected abstract RetryAlgorithm<String> getAlgorithm(
RetrySettings retrySettings, int apocalypseCountDown, RuntimeException apocalypseException);

@Before
public void setUp() {
retryingContext = FakeCallContext.createDefault().withTracer(tracer);
}

@Test
public void testSuccess() throws Exception {
FailingCallable callable = new FailingCallable(0, "SUCCESS");
FailingCallable callable = new FailingCallable(0, "SUCCESS", tracer);
RetryingExecutorWithContext<String> executor =
getExecutor(getAlgorithm(FAST_RETRY_SETTINGS, 0, null));
RetryingFuture<String> future = executor.createFuture(callable, retryingContext);
future.setAttemptFuture(executor.submit(future));

assertFutureSuccess(future);
assertEquals(0, future.getAttemptSettings().getAttemptCount());

verify(tracer, times(1)).attemptStarted(0);
verify(tracer, times(1)).attemptSucceeded();
verifyNoMoreInteractions(tracer);
}

@Test
public void testSuccessWithFailures() throws Exception {
FailingCallable callable = new FailingCallable(5, "SUCCESS");
FailingCallable callable = new FailingCallable(5, "SUCCESS", tracer);
RetryingExecutorWithContext<String> executor =
getExecutor(getAlgorithm(FAST_RETRY_SETTINGS, 0, null));
RetryingFuture<String> future = executor.createFuture(callable, retryingContext);
future.setAttemptFuture(executor.submit(future));

assertFutureSuccess(future);
assertEquals(5, future.getAttemptSettings().getAttemptCount());

verify(tracer, times(6)).attemptStarted(anyInt());
verify(tracer, times(5)).attemptFailed(any(Throwable.class), any(Duration.class));
verify(tracer, times(1)).attemptSucceeded();
verifyNoMoreInteractions(tracer);
}

@Test
public void testSuccessWithFailuresPeekGetAttempt() throws Exception {
FailingCallable callable = new FailingCallable(5, "SUCCESS");
FailingCallable callable = new FailingCallable(5, "SUCCESS", tracer);
RetryingExecutorWithContext<String> executor =
getExecutor(getAlgorithm(FAST_RETRY_SETTINGS, 0, null));
RetryingFuture<String> future = executor.createFuture(callable, retryingContext);
Expand All @@ -113,14 +141,19 @@ public void testSuccessWithFailuresPeekGetAttempt() throws Exception {

@Test
public void testMaxRetriesExceeded() throws Exception {
FailingCallable callable = new FailingCallable(6, "FAILURE");
FailingCallable callable = new FailingCallable(6, "FAILURE", tracer);
RetryingExecutorWithContext<String> executor =
getExecutor(getAlgorithm(FAST_RETRY_SETTINGS, 0, null));
RetryingFuture<String> future = executor.createFuture(callable, retryingContext);
future.setAttemptFuture(executor.submit(future));

assertFutureFail(future, CustomException.class);
assertEquals(5, future.getAttemptSettings().getAttemptCount());

verify(tracer, times(6)).attemptStarted(anyInt());
verify(tracer, times(5)).attemptFailed(any(Throwable.class), any(Duration.class));
verify(tracer, times(1)).attemptFailedRetriesExhausted(any(Throwable.class));
verifyNoMoreInteractions(tracer);
}

@Test
Expand All @@ -133,17 +166,21 @@ public void testTotalTimeoutExceeded() throws Exception {
.build();
RetryingExecutorWithContext<String> executor =
getExecutor(getAlgorithm(retrySettings, 0, null));
FailingCallable callable = new FailingCallable(6, "FAILURE");
FailingCallable callable = new FailingCallable(6, "FAILURE", tracer);
RetryingFuture<String> future = executor.createFuture(callable, retryingContext);
future.setAttemptFuture(executor.submit(future));

assertFutureFail(future, CustomException.class);
assertTrue(future.getAttemptSettings().getAttemptCount() < 4);

verify(tracer, times(1)).attemptStarted(anyInt());
verify(tracer, times(1)).attemptFailedRetriesExhausted(any(Throwable.class));
verifyNoMoreInteractions(tracer);
}

@Test
public void testCancelOuterFutureBeforeStart() throws Exception {
FailingCallable callable = new FailingCallable(4, "SUCCESS");
FailingCallable callable = new FailingCallable(4, "SUCCESS", tracer);

RetrySettings retrySettings =
FAST_RETRY_SETTINGS
Expand All @@ -163,30 +200,46 @@ public void testCancelOuterFutureBeforeStart() throws Exception {

assertFutureCancel(future);
assertEquals(0, future.getAttemptSettings().getAttemptCount());

verifyNoMoreInteractions(tracer);
}

@Test
public void testCancelByRetryingAlgorithm() throws Exception {
FailingCallable callable = new FailingCallable(6, "FAILURE");
FailingCallable callable = new FailingCallable(6, "FAILURE", tracer);
RetryingExecutorWithContext<String> executor =
getExecutor(getAlgorithm(FAST_RETRY_SETTINGS, 5, new CancellationException()));
RetryingFuture<String> future = executor.createFuture(callable, retryingContext);
future.setAttemptFuture(executor.submit(future));

assertFutureCancel(future);
assertEquals(4, future.getAttemptSettings().getAttemptCount());

verify(tracer, times(5)).attemptStarted(anyInt());
// Pre-apocalypse failures
verify(tracer, times(4)).attemptFailed(any(Throwable.class), any(Duration.class));
// Apocalypse failure
verify(tracer, times(1)).attemptFailedRetriesExhausted(any(CancellationException.class));
verifyNoMoreInteractions(tracer);
}

@Test
public void testUnexpectedExceptionFromRetryAlgorithm() throws Exception {
FailingCallable callable = new FailingCallable(6, "FAILURE");
FailingCallable callable = new FailingCallable(6, "FAILURE", tracer);
RetryingExecutorWithContext<String> executor =
getExecutor(getAlgorithm(FAST_RETRY_SETTINGS, 5, new RuntimeException()));
RetryingFuture<String> future = executor.createFuture(callable, retryingContext);
future.setAttemptFuture(executor.submit(future));

assertFutureFail(future, RuntimeException.class);
assertEquals(4, future.getAttemptSettings().getAttemptCount());

verify(tracer, times(5)).attemptStarted(anyInt());
// Pre-apocalypse failures
verify(tracer, times(4)).attemptFailed(any(Throwable.class), any(Duration.class));
// Apocalypse failure
verify(tracer, times(1)).attemptPermanentFailure(any(RuntimeException.class));
verifyNoMoreInteractions(tracer);
}

@Test
Expand All @@ -204,12 +257,16 @@ public void testPollExceptionByPollAlgorithm() throws Exception {
new ExponentialPollAlgorithm(retrySettings, NanoClock.getDefaultClock()));

RetryingExecutorWithContext<String> executor = getExecutor(retryAlgorithm);
FailingCallable callable = new FailingCallable(6, "FAILURE");
FailingCallable callable = new FailingCallable(6, "FAILURE", tracer);
RetryingFuture<String> future = executor.createFuture(callable, retryingContext);
future.setAttemptFuture(executor.submit(future));

assertFutureFail(future, PollException.class);
assertTrue(future.getAttemptSettings().getAttemptCount() < 4);

verify(tracer, times(1)).attemptStarted(anyInt());
verify(tracer, times(1)).attemptPermanentFailure(any(PollException.class));
verifyNoMoreInteractions(tracer);
}

protected static class TestResultRetryAlgorithm<ResponseT>
Expand Down
12 changes: 10 additions & 2 deletions gax/src/test/java/com/google/api/gax/retrying/FailingCallable.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
*/
package com.google.api.gax.retrying;

import com.google.api.gax.tracing.ApiTracer;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import org.threeten.bp.Duration;
Expand All @@ -48,19 +49,26 @@ class FailingCallable implements Callable<String> {
.build();

private AtomicInteger attemptsCount = new AtomicInteger(0);
private final ApiTracer tracer;
private final int expectedFailuresCount;
private final String result;

FailingCallable(int expectedFailuresCount, String result) {
FailingCallable(int expectedFailuresCount, String result, ApiTracer tracer) {
this.tracer = tracer;
this.expectedFailuresCount = expectedFailuresCount;
this.result = result;
}

@Override
public String call() throws Exception {
if (attemptsCount.getAndIncrement() < expectedFailuresCount) {
int attemptNumber = attemptsCount.getAndIncrement();

tracer.attemptStarted(attemptNumber);

if (attemptNumber < expectedFailuresCount) {
throw new CustomException();
}

return result;
}

Expand Down
Loading