diff --git a/gax/src/main/java/com/google/api/gax/retrying/BasicRetryingFuture.java b/gax/src/main/java/com/google/api/gax/retrying/BasicRetryingFuture.java index 331a1e304..5608b7449 100644 --- a/gax/src/main/java/com/google/api/gax/retrying/BasicRetryingFuture.java +++ b/gax/src/main/java/com/google/api/gax/retrying/BasicRetryingFuture.java @@ -34,6 +34,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.gax.tracing.ApiTracer; import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.MoreExecutors; import java.util.concurrent.Callable; @@ -137,14 +138,23 @@ void clearAttemptServiceData() { // "super." is used here to avoid infinite loops of callback chains void handleAttempt(Throwable throwable, ResponseT response) { + ApiTracer tracer = retryingContext.getTracer(); + synchronized (lock) { try { clearAttemptServiceData(); if (throwable instanceof CancellationException) { // An attempt triggered cancellation. + // In almost all cases, the operation caller caused the attempt to trigger the + // cancellation by invoking cancel() on the CallbackChainRetryingFuture, which cancelled + // the current attempt. + // In a theoretical scenario, the attempt callable might've thrown the exception on its + // own volition. However it's currently impossible to disambiguate the 2 scenarios. + tracer.attemptCancelled(); super.cancel(false); } else if (throwable instanceof RejectedExecutionException) { // external executor cannot continue retrying + tracer.attemptPermanentFailure(throwable); super.setException(throwable); } if (isDone()) { @@ -155,21 +165,30 @@ void handleAttempt(Throwable throwable, ResponseT response) { retryAlgorithm.createNextAttempt(throwable, response, attemptSettings); boolean shouldRetry = retryAlgorithm.shouldRetry(throwable, response, nextAttemptSettings); if (shouldRetry) { + tracer.attemptFailed(throwable, nextAttemptSettings.getRandomizedRetryDelay()); attemptSettings = nextAttemptSettings; setAttemptResult(throwable, response, true); // a new attempt will be (must be) scheduled by an external executor } else if (throwable != null) { + if (retryAlgorithm.getResultAlgorithm().shouldRetry(throwable, response)) { + tracer.attemptFailedRetriesExhausted(throwable); + } else { + tracer.attemptPermanentFailure(throwable); + } super.setException(throwable); } else { + tracer.attemptSucceeded(); super.set(response); } } catch (CancellationException e) { // A retry algorithm triggered cancellation. + tracer.attemptFailedRetriesExhausted(e); super.cancel(false); } catch (Exception e) { // Should never happen, but still possible in case of buggy retry algorithm implementation. // Any bugs/exceptions (except CancellationException) in retry algorithms immediately // terminate retrying future and set the result to the thrown exception. + tracer.attemptPermanentFailure(e); super.setException(e); } } diff --git a/gax/src/main/java/com/google/api/gax/rpc/AttemptCallable.java b/gax/src/main/java/com/google/api/gax/rpc/AttemptCallable.java index b0ac657e5..dbd9c995c 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/AttemptCallable.java +++ b/gax/src/main/java/com/google/api/gax/rpc/AttemptCallable.java @@ -78,6 +78,11 @@ public ResponseT call() { if (externalFuture.isDone()) { return null; } + + callContext + .getTracer() + .attemptStarted(externalFuture.getAttemptSettings().getOverallAttemptCount()); + ApiFuture internalFuture = callable.futureCall(request, callContext); externalFuture.setAttemptFuture(internalFuture); } catch (Throwable e) { diff --git a/gax/src/main/java/com/google/api/gax/rpc/CheckingAttemptCallable.java b/gax/src/main/java/com/google/api/gax/rpc/CheckingAttemptCallable.java index 85c2976fa..17670a5e7 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/CheckingAttemptCallable.java +++ b/gax/src/main/java/com/google/api/gax/rpc/CheckingAttemptCallable.java @@ -75,6 +75,11 @@ public ResponseT call() { if (externalFuture.isDone()) { return null; } + + callContext + .getTracer() + .attemptStarted(externalFuture.getAttemptSettings().getOverallAttemptCount()); + // NOTE: The callable here is an OperationCheckingCallable, which will compose its own // request using a resolved operation name and ignore anything that we pass here for the // request. diff --git a/gax/src/main/java/com/google/api/gax/rpc/ServerStreamingAttemptCallable.java b/gax/src/main/java/com/google/api/gax/rpc/ServerStreamingAttemptCallable.java index 61dab03f5..d94885b3b 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/ServerStreamingAttemptCallable.java +++ b/gax/src/main/java/com/google/api/gax/rpc/ServerStreamingAttemptCallable.java @@ -220,6 +220,10 @@ public Void call() { outerRetryingFuture.getAttemptSettings().getRpcTimeout()); } + attemptContext + .getTracer() + .attemptStarted(outerRetryingFuture.getAttemptSettings().getOverallAttemptCount()); + innerCallable.call( request, new StateCheckingResponseObserver() { diff --git a/gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java b/gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java index 8a137b10b..92cdf51e5 100644 --- a/gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java +++ b/gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java @@ -82,6 +82,9 @@ public interface ApiTracer { /** Adds an annotation that the attempt succeeded. */ void attemptSucceeded(); + /** Add an annotation that the attempt was cancelled by the user. */ + void attemptCancelled(); + /** * Adds an annotation that the attempt failed, but another attempt will be made after the delay. * @@ -93,8 +96,10 @@ public interface ApiTracer { /** * Adds an annotation that the attempt failed and that no further attempts will be made because * retry limits have been reached. + * + * @param error the last error received before retries were exhausted. */ - void attemptFailedRetriesExhausted(); + void attemptFailedRetriesExhausted(Throwable error); /** * Adds an annotation that the attempt failed and that no further attempts will be made because diff --git a/gax/src/main/java/com/google/api/gax/tracing/NoopApiTracer.java b/gax/src/main/java/com/google/api/gax/tracing/NoopApiTracer.java index 4b569b7f9..8b8bc2d62 100644 --- a/gax/src/main/java/com/google/api/gax/tracing/NoopApiTracer.java +++ b/gax/src/main/java/com/google/api/gax/tracing/NoopApiTracer.java @@ -85,13 +85,18 @@ public void attemptSucceeded() { // noop } + @Override + public void attemptCancelled() { + // noop + } + @Override public void attemptFailed(Throwable error, Duration delay) { // noop } @Override - public void attemptFailedRetriesExhausted() { + public void attemptFailedRetriesExhausted(Throwable error) { // noop } diff --git a/gax/src/test/java/com/google/api/gax/retrying/AbstractRetryingExecutorTest.java b/gax/src/test/java/com/google/api/gax/retrying/AbstractRetryingExecutorTest.java index 08f6562eb..cb2e6bf9c 100644 --- a/gax/src/test/java/com/google/api/gax/retrying/AbstractRetryingExecutorTest.java +++ b/gax/src/test/java/com/google/api/gax/retrying/AbstractRetryingExecutorTest.java @@ -36,24 +36,38 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import com.google.api.core.ApiFuture; import com.google.api.core.NanoClock; import com.google.api.gax.retrying.FailingCallable.CustomException; +import com.google.api.gax.rpc.testing.FakeCallContext; +import com.google.api.gax.tracing.ApiTracer; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; import org.threeten.bp.Duration; -@RunWith(MockitoJUnitRunner.class) +@RunWith(JUnit4.class) public abstract class AbstractRetryingExecutorTest { - @Mock protected RetryingContext retryingContext; + @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); + + @Mock protected ApiTracer tracer; + protected RetryingContext retryingContext; protected abstract RetryingExecutorWithContext getExecutor( RetryAlgorithm retryAlgorithm); @@ -61,9 +75,14 @@ protected abstract RetryingExecutorWithContext getExecutor( protected abstract RetryAlgorithm getAlgorithm( RetrySettings retrySettings, int apocalypseCountDown, RuntimeException apocalypseException); + @Before + public void setUp() { + retryingContext = FakeCallContext.createDefault().withTracer(tracer); + } + @Test public void testSuccess() throws Exception { - FailingCallable callable = new FailingCallable(0, "SUCCESS"); + FailingCallable callable = new FailingCallable(0, "SUCCESS", tracer); RetryingExecutorWithContext executor = getExecutor(getAlgorithm(FAST_RETRY_SETTINGS, 0, null)); RetryingFuture future = executor.createFuture(callable, retryingContext); @@ -71,11 +90,15 @@ public void testSuccess() throws Exception { assertFutureSuccess(future); assertEquals(0, future.getAttemptSettings().getAttemptCount()); + + verify(tracer, times(1)).attemptStarted(0); + verify(tracer, times(1)).attemptSucceeded(); + verifyNoMoreInteractions(tracer); } @Test public void testSuccessWithFailures() throws Exception { - FailingCallable callable = new FailingCallable(5, "SUCCESS"); + FailingCallable callable = new FailingCallable(5, "SUCCESS", tracer); RetryingExecutorWithContext executor = getExecutor(getAlgorithm(FAST_RETRY_SETTINGS, 0, null)); RetryingFuture future = executor.createFuture(callable, retryingContext); @@ -83,11 +106,16 @@ public void testSuccessWithFailures() throws Exception { assertFutureSuccess(future); assertEquals(5, future.getAttemptSettings().getAttemptCount()); + + verify(tracer, times(6)).attemptStarted(anyInt()); + verify(tracer, times(5)).attemptFailed(any(Throwable.class), any(Duration.class)); + verify(tracer, times(1)).attemptSucceeded(); + verifyNoMoreInteractions(tracer); } @Test public void testSuccessWithFailuresPeekGetAttempt() throws Exception { - FailingCallable callable = new FailingCallable(5, "SUCCESS"); + FailingCallable callable = new FailingCallable(5, "SUCCESS", tracer); RetryingExecutorWithContext executor = getExecutor(getAlgorithm(FAST_RETRY_SETTINGS, 0, null)); RetryingFuture future = executor.createFuture(callable, retryingContext); @@ -113,7 +141,7 @@ public void testSuccessWithFailuresPeekGetAttempt() throws Exception { @Test public void testMaxRetriesExceeded() throws Exception { - FailingCallable callable = new FailingCallable(6, "FAILURE"); + FailingCallable callable = new FailingCallable(6, "FAILURE", tracer); RetryingExecutorWithContext executor = getExecutor(getAlgorithm(FAST_RETRY_SETTINGS, 0, null)); RetryingFuture future = executor.createFuture(callable, retryingContext); @@ -121,6 +149,11 @@ public void testMaxRetriesExceeded() throws Exception { assertFutureFail(future, CustomException.class); assertEquals(5, future.getAttemptSettings().getAttemptCount()); + + verify(tracer, times(6)).attemptStarted(anyInt()); + verify(tracer, times(5)).attemptFailed(any(Throwable.class), any(Duration.class)); + verify(tracer, times(1)).attemptFailedRetriesExhausted(any(Throwable.class)); + verifyNoMoreInteractions(tracer); } @Test @@ -133,17 +166,21 @@ public void testTotalTimeoutExceeded() throws Exception { .build(); RetryingExecutorWithContext executor = getExecutor(getAlgorithm(retrySettings, 0, null)); - FailingCallable callable = new FailingCallable(6, "FAILURE"); + FailingCallable callable = new FailingCallable(6, "FAILURE", tracer); RetryingFuture future = executor.createFuture(callable, retryingContext); future.setAttemptFuture(executor.submit(future)); assertFutureFail(future, CustomException.class); assertTrue(future.getAttemptSettings().getAttemptCount() < 4); + + verify(tracer, times(1)).attemptStarted(anyInt()); + verify(tracer, times(1)).attemptFailedRetriesExhausted(any(Throwable.class)); + verifyNoMoreInteractions(tracer); } @Test public void testCancelOuterFutureBeforeStart() throws Exception { - FailingCallable callable = new FailingCallable(4, "SUCCESS"); + FailingCallable callable = new FailingCallable(4, "SUCCESS", tracer); RetrySettings retrySettings = FAST_RETRY_SETTINGS @@ -163,11 +200,13 @@ public void testCancelOuterFutureBeforeStart() throws Exception { assertFutureCancel(future); assertEquals(0, future.getAttemptSettings().getAttemptCount()); + + verifyNoMoreInteractions(tracer); } @Test public void testCancelByRetryingAlgorithm() throws Exception { - FailingCallable callable = new FailingCallable(6, "FAILURE"); + FailingCallable callable = new FailingCallable(6, "FAILURE", tracer); RetryingExecutorWithContext executor = getExecutor(getAlgorithm(FAST_RETRY_SETTINGS, 5, new CancellationException())); RetryingFuture future = executor.createFuture(callable, retryingContext); @@ -175,11 +214,18 @@ public void testCancelByRetryingAlgorithm() throws Exception { assertFutureCancel(future); assertEquals(4, future.getAttemptSettings().getAttemptCount()); + + verify(tracer, times(5)).attemptStarted(anyInt()); + // Pre-apocalypse failures + verify(tracer, times(4)).attemptFailed(any(Throwable.class), any(Duration.class)); + // Apocalypse failure + verify(tracer, times(1)).attemptFailedRetriesExhausted(any(CancellationException.class)); + verifyNoMoreInteractions(tracer); } @Test public void testUnexpectedExceptionFromRetryAlgorithm() throws Exception { - FailingCallable callable = new FailingCallable(6, "FAILURE"); + FailingCallable callable = new FailingCallable(6, "FAILURE", tracer); RetryingExecutorWithContext executor = getExecutor(getAlgorithm(FAST_RETRY_SETTINGS, 5, new RuntimeException())); RetryingFuture future = executor.createFuture(callable, retryingContext); @@ -187,6 +233,13 @@ public void testUnexpectedExceptionFromRetryAlgorithm() throws Exception { assertFutureFail(future, RuntimeException.class); assertEquals(4, future.getAttemptSettings().getAttemptCount()); + + verify(tracer, times(5)).attemptStarted(anyInt()); + // Pre-apocalypse failures + verify(tracer, times(4)).attemptFailed(any(Throwable.class), any(Duration.class)); + // Apocalypse failure + verify(tracer, times(1)).attemptPermanentFailure(any(RuntimeException.class)); + verifyNoMoreInteractions(tracer); } @Test @@ -204,12 +257,16 @@ public void testPollExceptionByPollAlgorithm() throws Exception { new ExponentialPollAlgorithm(retrySettings, NanoClock.getDefaultClock())); RetryingExecutorWithContext executor = getExecutor(retryAlgorithm); - FailingCallable callable = new FailingCallable(6, "FAILURE"); + FailingCallable callable = new FailingCallable(6, "FAILURE", tracer); RetryingFuture future = executor.createFuture(callable, retryingContext); future.setAttemptFuture(executor.submit(future)); assertFutureFail(future, PollException.class); assertTrue(future.getAttemptSettings().getAttemptCount() < 4); + + verify(tracer, times(1)).attemptStarted(anyInt()); + verify(tracer, times(1)).attemptPermanentFailure(any(PollException.class)); + verifyNoMoreInteractions(tracer); } protected static class TestResultRetryAlgorithm diff --git a/gax/src/test/java/com/google/api/gax/retrying/FailingCallable.java b/gax/src/test/java/com/google/api/gax/retrying/FailingCallable.java index 037b15c89..3cc9e9f1d 100644 --- a/gax/src/test/java/com/google/api/gax/retrying/FailingCallable.java +++ b/gax/src/test/java/com/google/api/gax/retrying/FailingCallable.java @@ -29,6 +29,7 @@ */ package com.google.api.gax.retrying; +import com.google.api.gax.tracing.ApiTracer; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; import org.threeten.bp.Duration; @@ -48,19 +49,26 @@ class FailingCallable implements Callable { .build(); private AtomicInteger attemptsCount = new AtomicInteger(0); + private final ApiTracer tracer; private final int expectedFailuresCount; private final String result; - FailingCallable(int expectedFailuresCount, String result) { + FailingCallable(int expectedFailuresCount, String result, ApiTracer tracer) { + this.tracer = tracer; this.expectedFailuresCount = expectedFailuresCount; this.result = result; } @Override public String call() throws Exception { - if (attemptsCount.getAndIncrement() < expectedFailuresCount) { + int attemptNumber = attemptsCount.getAndIncrement(); + + tracer.attemptStarted(attemptNumber); + + if (attemptNumber < expectedFailuresCount) { throw new CustomException(); } + return result; } diff --git a/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java b/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java index 2d4708871..f7281459e 100644 --- a/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java +++ b/gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java @@ -49,6 +49,7 @@ import org.junit.After; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; import org.threeten.bp.Duration; @@ -89,7 +90,7 @@ public void testSuccessWithFailuresPeekAttempt() throws Exception { final int maxRetries = 100; ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor(); - FailingCallable callable = new FailingCallable(15, "SUCCESS"); + FailingCallable callable = new FailingCallable(15, "SUCCESS", tracer); RetrySettings retrySettings = FAST_RETRY_SETTINGS @@ -139,7 +140,7 @@ public void testSuccessWithFailuresGetAttempt() throws Exception { final int maxRetries = 100; ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor(); - FailingCallable callable = new FailingCallable(15, "SUCCESS"); + FailingCallable callable = new FailingCallable(15, "SUCCESS", tracer); RetrySettings retrySettings = FAST_RETRY_SETTINGS .toBuilder() @@ -191,7 +192,7 @@ public void testCancelGetAttempt() throws Exception { ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor(); final int maxRetries = 100; - FailingCallable callable = new FailingCallable(maxRetries - 1, "SUCCESS"); + FailingCallable callable = new FailingCallable(maxRetries - 1, "SUCCESS", tracer); RetrySettings retrySettings = FAST_RETRY_SETTINGS .toBuilder() @@ -247,7 +248,7 @@ public void testCancelGetAttempt() throws Exception { public void testCancelOuterFutureAfterStart() throws Exception { for (int executionsCount = 0; executionsCount < EXECUTIONS_COUNT; executionsCount++) { ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor(); - FailingCallable callable = new FailingCallable(4, "SUCCESS"); + FailingCallable callable = new FailingCallable(4, "SUCCESS", tracer); RetrySettings retrySettings = FAST_RETRY_SETTINGS .toBuilder() @@ -270,12 +271,38 @@ public void testCancelOuterFutureAfterStart() throws Exception { } } + @Test + public void testCancelIsTraced() throws Exception { + ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor(); + FailingCallable callable = new FailingCallable(4, "SUCCESS", tracer); + RetrySettings retrySettings = + FAST_RETRY_SETTINGS + .toBuilder() + .setInitialRetryDelay(Duration.ofMillis(1_000L)) + .setMaxRetryDelay(Duration.ofMillis(1_000L)) + .setTotalTimeout(Duration.ofMillis(10_0000L)) + .build(); + RetryingExecutorWithContext executor = + getRetryingExecutor(getAlgorithm(retrySettings, 0, null), localExecutor); + RetryingFuture future = executor.createFuture(callable, retryingContext); + future.setAttemptFuture(executor.submit(future)); + + Thread.sleep(30L); + + boolean res = future.cancel(false); + assertTrue(res); + assertFutureCancel(future); + + Mockito.verify(tracer).attemptCancelled(); + localExecutor.shutdownNow(); + } + @Test public void testCancelProxiedFutureAfterStart() throws Exception { // this is a heavy test, which takes a lot of time, so only few executions. for (int executionsCount = 0; executionsCount < 2; executionsCount++) { ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor(); - FailingCallable callable = new FailingCallable(5, "SUCCESS"); + FailingCallable callable = new FailingCallable(5, "SUCCESS", tracer); RetrySettings retrySettings = FAST_RETRY_SETTINGS .toBuilder() diff --git a/gax/src/test/java/com/google/api/gax/rpc/testing/FakeCallContext.java b/gax/src/test/java/com/google/api/gax/rpc/testing/FakeCallContext.java index aa2a7dcb9..61f668573 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/testing/FakeCallContext.java +++ b/gax/src/test/java/com/google/api/gax/rpc/testing/FakeCallContext.java @@ -35,6 +35,7 @@ import com.google.api.gax.rpc.TransportChannel; import com.google.api.gax.rpc.internal.Headers; import com.google.api.gax.tracing.ApiTracer; +import com.google.api.gax.tracing.NoopApiTracer; import com.google.auth.Credentials; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; @@ -276,6 +277,9 @@ public Map> getExtraHeaders() { @Override @Nonnull public ApiTracer getTracer() { + if (tracer == null) { + return NoopApiTracer.getInstance(); + } return tracer; }