From 54df9916bf35de493812c250cb4639decf8fc55c Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Mon, 1 Oct 2018 17:44:40 -0400 Subject: [PATCH 1/2] RFC: rough sketch of retry tracing --- .../google/api/gax/grpc/GrpcCallContext.java | 15 ++++ .../google/api/gax/opencensus/NoopTracer.java | 74 +++++++++++++++ .../api/gax/opencensus/OpenCensusTracer.java | 89 +++++++++++++++++++ .../gax/opencensus/TracedUnaryCallable.java | 49 ++++++++++ .../com/google/api/gax/opencensus/Tracer.java | 26 ++++++ .../api/gax/retrying/BasicRetryingFuture.java | 14 ++- .../retrying/CallbackChainRetryingFuture.java | 6 +- .../gax/retrying/DirectRetryingExecutor.java | 2 +- .../retrying/ScheduledRetryingExecutor.java | 14 ++- .../google/api/gax/rpc/ApiCallContext.java | 6 ++ .../google/api/gax/rpc/AttemptCallable.java | 7 +- .../com/google/api/gax/rpc/Callables.java | 13 ++- .../google/api/gax/rpc/RetryingCallable.java | 40 ++++++--- 13 files changed, 328 insertions(+), 27 deletions(-) create mode 100644 gax/src/main/java/com/google/api/gax/opencensus/NoopTracer.java create mode 100644 gax/src/main/java/com/google/api/gax/opencensus/OpenCensusTracer.java create mode 100644 gax/src/main/java/com/google/api/gax/opencensus/TracedUnaryCallable.java create mode 100644 gax/src/main/java/com/google/api/gax/opencensus/Tracer.java diff --git a/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcCallContext.java b/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcCallContext.java index 11c768b73..13645ee77 100644 --- a/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcCallContext.java +++ b/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcCallContext.java @@ -32,6 +32,8 @@ import com.google.api.core.BetaApi; import com.google.api.core.InternalApi; import com.google.api.core.InternalExtensionOnly; +import com.google.api.gax.opencensus.NoopTracer; +import com.google.api.gax.opencensus.Tracer; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.DeadlineExceededException; import com.google.api.gax.rpc.TransportChannel; @@ -41,6 +43,7 @@ import com.google.common.collect.ImmutableMap; import io.grpc.CallCredentials; import io.grpc.CallOptions; +import io.grpc.CallOptions.Key; import io.grpc.Channel; import io.grpc.Deadline; import io.grpc.Metadata; @@ -64,6 +67,8 @@ @BetaApi("Reference ApiCallContext instead - this class is likely to experience breaking changes") @InternalExtensionOnly public final class GrpcCallContext implements ApiCallContext { + private static final CallOptions.Key TRACER_KEY = Key.createWithDefault("tracer", NoopTracer.create()); + private final Channel channel; private final CallOptions callOptions; @Nullable private final Duration streamWaitTimeout; @@ -296,6 +301,16 @@ public Duration getStreamIdleTimeout() { return streamIdleTimeout; } + @Override + public ApiCallContext withTracer(Tracer tracer) { + return withCallOptions(callOptions.withOption(TRACER_KEY, tracer)); + } + + @Override + public Tracer getTracer() { + return callOptions.getOption(TRACER_KEY); + } + /** The channel affinity for this context. */ @BetaApi("The surface for channel affinity is not stable yet and may change in the future.") @Nullable diff --git a/gax/src/main/java/com/google/api/gax/opencensus/NoopTracer.java b/gax/src/main/java/com/google/api/gax/opencensus/NoopTracer.java new file mode 100644 index 000000000..56429a014 --- /dev/null +++ b/gax/src/main/java/com/google/api/gax/opencensus/NoopTracer.java @@ -0,0 +1,74 @@ +package com.google.api.gax.opencensus; + +import org.threeten.bp.Duration; + +public class NoopTracer implements Tracer { + public static Tracer create() { + return new NoopTracer(); + } + + @Override + public void operationStarted() { + + } + + @Override + public void operationFailed(Throwable throwable) { + + } + + @Override + public void operationSucceeded() { + + } + + @Override + public void connectionSelected(int id) { + + } + + @Override + public void credentialsRefreshed() { + + } + + @Override + public void startAttempt() { + + } + + @Override + public void attemptSucceeded() { + + } + + @Override + public void retryableFailure(Throwable error, Duration delay) { + + } + + @Override + public void retriesExhausted() { + + } + + @Override + public void permanentFailure(Throwable error) { + + } + + @Override + public void receivedResponse() { + + } + + @Override + public void sentRequest() { + + } + + @Override + public void sentBatchRequest(int elementCount, int requestSize) { + + } +} diff --git a/gax/src/main/java/com/google/api/gax/opencensus/OpenCensusTracer.java b/gax/src/main/java/com/google/api/gax/opencensus/OpenCensusTracer.java new file mode 100644 index 000000000..bb234750d --- /dev/null +++ b/gax/src/main/java/com/google/api/gax/opencensus/OpenCensusTracer.java @@ -0,0 +1,89 @@ +package com.google.api.gax.opencensus; + +import java.io.Closeable; +import org.threeten.bp.Duration; + +public class OpenCensusTracer implements Tracer { + + public static OpenCensusTracer create(String spanName) { + + return null; + } + + public TracerContext enter() { + return new TracerContext(); + } + + @Override + public void operationStarted() { + + } + + @Override + public void operationFailed(Throwable throwable) { + + } + + @Override + public void operationSucceeded() { + + } + + @Override + public void connectionSelected(int id) { + + } + + @Override + public void credentialsRefreshed() { + + } + + @Override + public void startAttempt() { + + } + + @Override + public void attemptSucceeded() { + + } + + @Override + public void retryableFailure(Throwable error, Duration delay) { + + } + + @Override + public void retriesExhausted() { + + } + + @Override + public void permanentFailure(Throwable error) { + + } + + @Override + public void receivedResponse() { + + } + + @Override + public void sentRequest() { + + } + + @Override + public void sentBatchRequest(int elementCount, int requestSize) { + + } + + static class TracerContext implements AutoCloseable { + + @Override + public void close() { + + } + } +} diff --git a/gax/src/main/java/com/google/api/gax/opencensus/TracedUnaryCallable.java b/gax/src/main/java/com/google/api/gax/opencensus/TracedUnaryCallable.java new file mode 100644 index 000000000..b571cce29 --- /dev/null +++ b/gax/src/main/java/com/google/api/gax/opencensus/TracedUnaryCallable.java @@ -0,0 +1,49 @@ +package com.google.api.gax.opencensus; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.common.util.concurrent.MoreExecutors; + +public class TracedUnaryCallable extends UnaryCallable { + private final UnaryCallable innerCallable; + private final String spanName; + + public TracedUnaryCallable( + UnaryCallable innerCallable, String spanName) { + this.innerCallable = innerCallable; + this.spanName = spanName; + } + + @Override + public ApiFuture futureCall(RequestT request, ApiCallContext context) { + final OpenCensusTracer tracer = OpenCensusTracer.create(spanName); + + ApiFuture innerFuture; + + try (OpenCensusTracer.TracerContext ignored = tracer.enter()) { + tracer.operationStarted(); + context = context.withTracer(tracer); + + innerFuture = innerCallable.futureCall(request, context); + } + + ApiFutures.addCallback(innerFuture, new ApiFutureCallback() { + @Override + public void onFailure(Throwable throwable) { + tracer.operationFailed(throwable); + } + + @Override + public void onSuccess(ResponseT responseT) { + tracer.operationSucceeded(); + } + }, MoreExecutors.directExecutor()); + + return innerFuture; + } + + +} diff --git a/gax/src/main/java/com/google/api/gax/opencensus/Tracer.java b/gax/src/main/java/com/google/api/gax/opencensus/Tracer.java new file mode 100644 index 000000000..6b4e210ed --- /dev/null +++ b/gax/src/main/java/com/google/api/gax/opencensus/Tracer.java @@ -0,0 +1,26 @@ +package com.google.api.gax.opencensus; + +import com.google.api.core.InternalExtensionOnly; +import com.google.api.gax.retrying.RetryingFuture; +import org.threeten.bp.Duration; + +@InternalExtensionOnly +public interface Tracer { + void operationStarted(); + void operationFailed(Throwable throwable); + void operationSucceeded(); + + void connectionSelected(int id); + void credentialsRefreshed(); + + void startAttempt(); + void attemptSucceeded(); + + void retryableFailure(Throwable error, Duration delay); + void retriesExhausted(); + void permanentFailure(Throwable error); + + void receivedResponse(); + void sentRequest(); + void sentBatchRequest(int elementCount, int requestSize); +} diff --git a/gax/src/main/java/com/google/api/gax/retrying/BasicRetryingFuture.java b/gax/src/main/java/com/google/api/gax/retrying/BasicRetryingFuture.java index ce360e3b8..e694efba8 100644 --- a/gax/src/main/java/com/google/api/gax/retrying/BasicRetryingFuture.java +++ b/gax/src/main/java/com/google/api/gax/retrying/BasicRetryingFuture.java @@ -34,6 +34,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.gax.opencensus.Tracer; import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.MoreExecutors; import java.util.concurrent.Callable; @@ -58,14 +59,18 @@ class BasicRetryingFuture extends AbstractFuture private final RetryAlgorithm retryAlgorithm; + private final Tracer tracer; + private volatile TimedAttemptSettings attemptSettings; private volatile ApiFuture latestCompletedAttemptResult; private volatile ApiFuture attemptResult; - BasicRetryingFuture(Callable callable, RetryAlgorithm retryAlgorithm) { + BasicRetryingFuture(Callable callable, RetryAlgorithm retryAlgorithm, + Tracer tracer) { this.callable = checkNotNull(callable); this.retryAlgorithm = checkNotNull(retryAlgorithm); + this.tracer = tracer; this.attemptSettings = retryAlgorithm.createFirstAttempt(); @@ -136,9 +141,11 @@ void handleAttempt(Throwable throwable, ResponseT response) { try { clearAttemptServiceData(); if (throwable instanceof CancellationException) { + tracer.permanentFailure(throwable); // An attempt triggered cancellation. super.cancel(false); } else if (throwable instanceof RejectedExecutionException) { + tracer.permanentFailure(throwable); // external executor cannot continue retrying super.setException(throwable); } @@ -150,18 +157,23 @@ void handleAttempt(Throwable throwable, ResponseT response) { retryAlgorithm.createNextAttempt(throwable, response, attemptSettings); boolean shouldRetry = retryAlgorithm.shouldRetry(throwable, response, nextAttemptSettings); if (shouldRetry) { + tracer.retryableFailure(throwable, nextAttemptSettings.getRandomizedRetryDelay()); attemptSettings = nextAttemptSettings; setAttemptResult(throwable, response, true); // a new attempt will be (must be) scheduled by an external executor } else if (throwable != null) { + tracer.permanentFailure(throwable); super.setException(throwable); } else { + tracer.attemptSucceeded(); super.set(response); } } catch (CancellationException e) { + tracer.permanentFailure(e); // A retry algorithm triggered cancellation. super.cancel(false); } catch (Exception e) { + tracer.permanentFailure(e); // Should never happen, but still possible in case of buggy retry algorithm implementation. // Any bugs/exceptions (except CancellationException) in retry algorithms immediately // terminate retrying future and set the result to the thrown exception. diff --git a/gax/src/main/java/com/google/api/gax/retrying/CallbackChainRetryingFuture.java b/gax/src/main/java/com/google/api/gax/retrying/CallbackChainRetryingFuture.java index b65f4d296..b8332065e 100644 --- a/gax/src/main/java/com/google/api/gax/retrying/CallbackChainRetryingFuture.java +++ b/gax/src/main/java/com/google/api/gax/retrying/CallbackChainRetryingFuture.java @@ -32,6 +32,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.api.core.ApiFuture; +import com.google.api.gax.opencensus.Tracer; import com.google.common.util.concurrent.MoreExecutors; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -58,8 +59,9 @@ class CallbackChainRetryingFuture extends BasicRetryingFuture callable, RetryAlgorithm retryAlgorithm, - RetryingExecutor retryingExecutor) { - super(callable, retryAlgorithm); + RetryingExecutor retryingExecutor, + Tracer tracer) { + super(callable, retryAlgorithm, tracer); this.retryingExecutor = checkNotNull(retryingExecutor); } diff --git a/gax/src/main/java/com/google/api/gax/retrying/DirectRetryingExecutor.java b/gax/src/main/java/com/google/api/gax/retrying/DirectRetryingExecutor.java index 361ad1462..5c72e51ad 100644 --- a/gax/src/main/java/com/google/api/gax/retrying/DirectRetryingExecutor.java +++ b/gax/src/main/java/com/google/api/gax/retrying/DirectRetryingExecutor.java @@ -70,7 +70,7 @@ public DirectRetryingExecutor(RetryAlgorithm retryAlgorithm) { */ @Override public RetryingFuture createFuture(Callable callable) { - return new BasicRetryingFuture<>(callable, retryAlgorithm); + return new BasicRetryingFuture<>(callable, retryAlgorithm, tracer); } /** diff --git a/gax/src/main/java/com/google/api/gax/retrying/ScheduledRetryingExecutor.java b/gax/src/main/java/com/google/api/gax/retrying/ScheduledRetryingExecutor.java index c6bb8d855..22cc76b25 100644 --- a/gax/src/main/java/com/google/api/gax/retrying/ScheduledRetryingExecutor.java +++ b/gax/src/main/java/com/google/api/gax/retrying/ScheduledRetryingExecutor.java @@ -32,6 +32,8 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.core.ListenableFutureToApiFuture; +import com.google.api.gax.opencensus.NoopTracer; +import com.google.api.gax.opencensus.Tracer; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -57,7 +59,14 @@ public class ScheduledRetryingExecutor implements RetryingExecutor retryAlgorithm; private final ListeningScheduledExecutorService scheduler; + private final Tracer tracer; + + @Deprecated + public ScheduledRetryingExecutor( + RetryAlgorithm retryAlgorithm, ScheduledExecutorService scheduler) { + this(retryAlgorithm, scheduler, new NoopTracer()); + } /** * Creates a new scheduled retry executor, which will be using {@code scheduler} for actual * attempts scheduling and {@code retryAlgorithm} for retrying strategy. @@ -66,9 +75,10 @@ public class ScheduledRetryingExecutor implements RetryingExecutor retryAlgorithm, ScheduledExecutorService scheduler) { + RetryAlgorithm retryAlgorithm, ScheduledExecutorService scheduler, Tracer tracer) { this.retryAlgorithm = retryAlgorithm; this.scheduler = MoreExecutors.listeningDecorator(scheduler); + this.tracer = tracer; } /** @@ -81,7 +91,7 @@ public ScheduledRetryingExecutor( */ @Override public RetryingFuture createFuture(Callable callable) { - return new CallbackChainRetryingFuture<>(callable, retryAlgorithm, this); + return new CallbackChainRetryingFuture<>(callable, retryAlgorithm, this, tracer); } /** diff --git a/gax/src/main/java/com/google/api/gax/rpc/ApiCallContext.java b/gax/src/main/java/com/google/api/gax/rpc/ApiCallContext.java index 3dff571f8..54ab85096 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/ApiCallContext.java +++ b/gax/src/main/java/com/google/api/gax/rpc/ApiCallContext.java @@ -31,7 +31,9 @@ import com.google.api.core.BetaApi; import com.google.api.core.InternalExtensionOnly; +import com.google.api.gax.opencensus.Tracer; import com.google.auth.Credentials; +import com.sun.deploy.trace.Trace; import java.util.List; import java.util.Map; import javax.annotation.Nullable; @@ -120,6 +122,10 @@ public interface ApiCallContext { @Nullable Duration getStreamIdleTimeout(); + ApiCallContext withTracer(Tracer tracer); + + Tracer getTracer(); + /** If inputContext is not null, returns it; if it is null, returns the present instance. */ ApiCallContext nullToSelf(ApiCallContext inputContext); diff --git a/gax/src/main/java/com/google/api/gax/rpc/AttemptCallable.java b/gax/src/main/java/com/google/api/gax/rpc/AttemptCallable.java index 2290b9bec..068814344 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/AttemptCallable.java +++ b/gax/src/main/java/com/google/api/gax/rpc/AttemptCallable.java @@ -31,6 +31,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.gax.opencensus.Tracer; import com.google.api.gax.retrying.NonCancellableFuture; import com.google.api.gax.retrying.RetryingFuture; import java.util.concurrent.Callable; @@ -47,15 +48,17 @@ class AttemptCallable implements Callable { private final UnaryCallable callable; private final RequestT request; + private final Tracer tracer; private volatile RetryingFuture externalFuture; private volatile ApiCallContext callContext; AttemptCallable( - UnaryCallable callable, RequestT request, ApiCallContext callContext) { + UnaryCallable callable, RequestT request, ApiCallContext callContext, Tracer tracer) { this.callable = callable; this.request = request; this.callContext = callContext; + this.tracer = tracer; } public void setExternalFuture(RetryingFuture externalFuture) { @@ -64,6 +67,8 @@ public void setExternalFuture(RetryingFuture externalFuture) { @Override public ResponseT call() { + tracer.startAttempt(); + try { if (callContext != null) { callContext = callContext.withTimeout(externalFuture.getAttemptSettings().getRpcTimeout()); diff --git a/gax/src/main/java/com/google/api/gax/rpc/Callables.java b/gax/src/main/java/com/google/api/gax/rpc/Callables.java index e684c6dad..d655b80a4 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/Callables.java +++ b/gax/src/main/java/com/google/api/gax/rpc/Callables.java @@ -59,15 +59,12 @@ public static UnaryCallable retrying( return innerCallable; } - RetryAlgorithm retryAlgorithm = - new RetryAlgorithm<>( - new ApiResultRetryAlgorithm(), - new ExponentialRetryAlgorithm( - callSettings.getRetrySettings(), clientContext.getClock())); - RetryingExecutor retryingExecutor = - new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor()); return new RetryingCallable<>( - clientContext.getDefaultCallContext(), innerCallable, retryingExecutor); + innerCallable, + callSettings.getRetrySettings(), + clientContext.getClock(), + clientContext.getExecutor() + ); } @BetaApi("The surface for streaming is not stable yet and may change in the future.") diff --git a/gax/src/main/java/com/google/api/gax/rpc/RetryingCallable.java b/gax/src/main/java/com/google/api/gax/rpc/RetryingCallable.java index ba2307932..a8f2ff997 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/RetryingCallable.java +++ b/gax/src/main/java/com/google/api/gax/rpc/RetryingCallable.java @@ -29,9 +29,15 @@ */ package com.google.api.gax.rpc; +import com.google.api.core.ApiClock; +import com.google.api.gax.retrying.ExponentialRetryAlgorithm; +import com.google.api.gax.retrying.RetryAlgorithm; +import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.retrying.RetryingExecutor; import com.google.api.gax.retrying.RetryingFuture; +import com.google.api.gax.retrying.ScheduledRetryingExecutor; import com.google.common.base.Preconditions; +import java.util.concurrent.ScheduledExecutorService; /** * A UnaryCallable that will keep issuing calls to an inner callable until it succeeds or times out. @@ -39,26 +45,36 @@ *

Package-private for internal use. */ class RetryingCallable extends UnaryCallable { - private final ApiCallContext callContextPrototype; private final UnaryCallable callable; - private final RetryingExecutor executor; + private final ScheduledExecutorService executor; + private final RetrySettings retrySettings; + private final ApiClock clock; - RetryingCallable( - ApiCallContext callContextPrototype, - UnaryCallable callable, - RetryingExecutor executor) { - this.callContextPrototype = Preconditions.checkNotNull(callContextPrototype); - this.callable = Preconditions.checkNotNull(callable); - this.executor = Preconditions.checkNotNull(executor); + RetryingCallable(UnaryCallable innerCallable, + RetrySettings retrySettings, ApiClock clock, + ScheduledExecutorService executor) { + + this.callable = innerCallable; + this.retrySettings = retrySettings; + this.clock = clock; + this.executor = executor; } @Override - public RetryingFuture futureCall(RequestT request, ApiCallContext inputContext) { - ApiCallContext context = callContextPrototype.nullToSelf(inputContext); + public RetryingFuture futureCall(RequestT request, ApiCallContext context) { + RetryAlgorithm retryAlgorithm = + new RetryAlgorithm<>( + new ApiResultRetryAlgorithm(), + new ExponentialRetryAlgorithm( + retrySettings, clock)); + + RetryingExecutor retryingExecutor = + new ScheduledRetryingExecutor<>(retryAlgorithm, executor, context.getTracer()); + AttemptCallable retryCallable = new AttemptCallable<>(callable, request, context); - RetryingFuture retryingFuture = executor.createFuture(retryCallable); + RetryingFuture retryingFuture = retryingExecutor.createFuture(retryCallable); retryCallable.setExternalFuture(retryingFuture); retryCallable.call(); From 67e9d22105fbba3796b55c6c5b38dd87be33880d Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Mon, 1 Oct 2018 18:01:01 -0400 Subject: [PATCH 2/2] operation must start before entering --- .../com/google/api/gax/opencensus/TracedUnaryCallable.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gax/src/main/java/com/google/api/gax/opencensus/TracedUnaryCallable.java b/gax/src/main/java/com/google/api/gax/opencensus/TracedUnaryCallable.java index b571cce29..de72d511c 100644 --- a/gax/src/main/java/com/google/api/gax/opencensus/TracedUnaryCallable.java +++ b/gax/src/main/java/com/google/api/gax/opencensus/TracedUnaryCallable.java @@ -23,8 +23,9 @@ public ApiFuture futureCall(RequestT request, ApiCallContext context) ApiFuture innerFuture; + tracer.operationStarted(); + try (OpenCensusTracer.TracerContext ignored = tracer.enter()) { - tracer.operationStarted(); context = context.withTracer(tracer); innerFuture = innerCallable.futureCall(request, context);