Skip to content
This repository was archived by the owner on Sep 26, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.gax.opencensus.NoopTracer;
import com.google.api.gax.opencensus.Tracer;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.api.gax.rpc.TransportChannel;
Expand All @@ -41,6 +43,7 @@
import com.google.common.collect.ImmutableMap;
import io.grpc.CallCredentials;
import io.grpc.CallOptions;
import io.grpc.CallOptions.Key;
import io.grpc.Channel;
import io.grpc.Deadline;
import io.grpc.Metadata;
Expand All @@ -64,6 +67,8 @@
@BetaApi("Reference ApiCallContext instead - this class is likely to experience breaking changes")
@InternalExtensionOnly
public final class GrpcCallContext implements ApiCallContext {
private static final CallOptions.Key<Tracer> TRACER_KEY = Key.createWithDefault("tracer", NoopTracer.create());

private final Channel channel;
private final CallOptions callOptions;
@Nullable private final Duration streamWaitTimeout;
Expand Down Expand Up @@ -296,6 +301,16 @@ public Duration getStreamIdleTimeout() {
return streamIdleTimeout;
}

@Override
public ApiCallContext withTracer(Tracer tracer) {
return withCallOptions(callOptions.withOption(TRACER_KEY, tracer));
}

@Override
public Tracer getTracer() {
return callOptions.getOption(TRACER_KEY);
}

/** The channel affinity for this context. */
@BetaApi("The surface for channel affinity is not stable yet and may change in the future.")
@Nullable
Expand Down
74 changes: 74 additions & 0 deletions gax/src/main/java/com/google/api/gax/opencensus/NoopTracer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.google.api.gax.opencensus;

import org.threeten.bp.Duration;

public class NoopTracer implements Tracer {
public static Tracer create() {
return new NoopTracer();
}

@Override
public void operationStarted() {

}

@Override
public void operationFailed(Throwable throwable) {

}

@Override
public void operationSucceeded() {

}

@Override
public void connectionSelected(int id) {

}

@Override
public void credentialsRefreshed() {

}

@Override
public void startAttempt() {

}

@Override
public void attemptSucceeded() {

}

@Override
public void retryableFailure(Throwable error, Duration delay) {

}

@Override
public void retriesExhausted() {

}

@Override
public void permanentFailure(Throwable error) {

}

@Override
public void receivedResponse() {

}

@Override
public void sentRequest() {

}

@Override
public void sentBatchRequest(int elementCount, int requestSize) {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.google.api.gax.opencensus;

import java.io.Closeable;
import org.threeten.bp.Duration;

public class OpenCensusTracer implements Tracer {

public static OpenCensusTracer create(String spanName) {

return null;
}

public TracerContext enter() {
return new TracerContext();
}

@Override
public void operationStarted() {

}

@Override
public void operationFailed(Throwable throwable) {

}

@Override
public void operationSucceeded() {

}

@Override
public void connectionSelected(int id) {

}

@Override
public void credentialsRefreshed() {

}

@Override
public void startAttempt() {

}

@Override
public void attemptSucceeded() {

}

@Override
public void retryableFailure(Throwable error, Duration delay) {

}

@Override
public void retriesExhausted() {

}

@Override
public void permanentFailure(Throwable error) {

}

@Override
public void receivedResponse() {

}

@Override
public void sentRequest() {

}

@Override
public void sentBatchRequest(int elementCount, int requestSize) {

}

static class TracerContext implements AutoCloseable {

@Override
public void close() {

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.google.api.gax.opencensus;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.common.util.concurrent.MoreExecutors;

public class TracedUnaryCallable<RequestT, ResponseT> extends UnaryCallable<RequestT, ResponseT> {
private final UnaryCallable<RequestT, ResponseT> innerCallable;
private final String spanName;

public TracedUnaryCallable(
UnaryCallable<RequestT, ResponseT> innerCallable, String spanName) {
this.innerCallable = innerCallable;
this.spanName = spanName;
}

@Override
public ApiFuture<ResponseT> futureCall(RequestT request, ApiCallContext context) {
final OpenCensusTracer tracer = OpenCensusTracer.create(spanName);

ApiFuture<ResponseT> innerFuture;

tracer.operationStarted();

try (OpenCensusTracer.TracerContext ignored = tracer.enter()) {
context = context.withTracer(tracer);

innerFuture = innerCallable.futureCall(request, context);
}

ApiFutures.addCallback(innerFuture, new ApiFutureCallback<ResponseT>() {
@Override
public void onFailure(Throwable throwable) {
tracer.operationFailed(throwable);
}

@Override
public void onSuccess(ResponseT responseT) {
tracer.operationSucceeded();
}
}, MoreExecutors.directExecutor());

return innerFuture;
}


}
26 changes: 26 additions & 0 deletions gax/src/main/java/com/google/api/gax/opencensus/Tracer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.google.api.gax.opencensus;

import com.google.api.core.InternalExtensionOnly;
import com.google.api.gax.retrying.RetryingFuture;
import org.threeten.bp.Duration;

@InternalExtensionOnly
public interface Tracer {
void operationStarted();
void operationFailed(Throwable throwable);
void operationSucceeded();

void connectionSelected(int id);
void credentialsRefreshed();

void startAttempt();
void attemptSucceeded();

void retryableFailure(Throwable error, Duration delay);
void retriesExhausted();
void permanentFailure(Throwable error);

void receivedResponse();
void sentRequest();
void sentBatchRequest(int elementCount, int requestSize);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.opencensus.Tracer;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.Callable;
Expand All @@ -58,14 +59,18 @@ class BasicRetryingFuture<ResponseT> extends AbstractFuture<ResponseT>

private final RetryAlgorithm<ResponseT> retryAlgorithm;

private final Tracer tracer;

private volatile TimedAttemptSettings attemptSettings;

private volatile ApiFuture<ResponseT> latestCompletedAttemptResult;
private volatile ApiFuture<ResponseT> attemptResult;

BasicRetryingFuture(Callable<ResponseT> callable, RetryAlgorithm<ResponseT> retryAlgorithm) {
BasicRetryingFuture(Callable<ResponseT> callable, RetryAlgorithm<ResponseT> retryAlgorithm,
Tracer tracer) {
this.callable = checkNotNull(callable);
this.retryAlgorithm = checkNotNull(retryAlgorithm);
this.tracer = tracer;

this.attemptSettings = retryAlgorithm.createFirstAttempt();

Expand Down Expand Up @@ -136,9 +141,11 @@ void handleAttempt(Throwable throwable, ResponseT response) {
try {
clearAttemptServiceData();
if (throwable instanceof CancellationException) {
tracer.permanentFailure(throwable);
// An attempt triggered cancellation.
super.cancel(false);
} else if (throwable instanceof RejectedExecutionException) {
tracer.permanentFailure(throwable);
// external executor cannot continue retrying
super.setException(throwable);
}
Expand All @@ -150,18 +157,23 @@ void handleAttempt(Throwable throwable, ResponseT response) {
retryAlgorithm.createNextAttempt(throwable, response, attemptSettings);
boolean shouldRetry = retryAlgorithm.shouldRetry(throwable, response, nextAttemptSettings);
if (shouldRetry) {
tracer.retryableFailure(throwable, nextAttemptSettings.getRandomizedRetryDelay());
attemptSettings = nextAttemptSettings;
setAttemptResult(throwable, response, true);
// a new attempt will be (must be) scheduled by an external executor
} else if (throwable != null) {
tracer.permanentFailure(throwable);
super.setException(throwable);
} else {
tracer.attemptSucceeded();
super.set(response);
}
} catch (CancellationException e) {
tracer.permanentFailure(e);
// A retry algorithm triggered cancellation.
super.cancel(false);
} catch (Exception e) {
tracer.permanentFailure(e);
// Should never happen, but still possible in case of buggy retry algorithm implementation.
// Any bugs/exceptions (except CancellationException) in retry algorithms immediately
// terminate retrying future and set the result to the thrown exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.core.ApiFuture;
import com.google.api.gax.opencensus.Tracer;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand All @@ -58,8 +59,9 @@ class CallbackChainRetryingFuture<ResponseT> extends BasicRetryingFuture<Respons
CallbackChainRetryingFuture(
Callable<ResponseT> callable,
RetryAlgorithm<ResponseT> retryAlgorithm,
RetryingExecutor<ResponseT> retryingExecutor) {
super(callable, retryAlgorithm);
RetryingExecutor<ResponseT> retryingExecutor,
Tracer tracer) {
super(callable, retryAlgorithm, tracer);
this.retryingExecutor = checkNotNull(retryingExecutor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public DirectRetryingExecutor(RetryAlgorithm<ResponseT> retryAlgorithm) {
*/
@Override
public RetryingFuture<ResponseT> createFuture(Callable<ResponseT> callable) {
return new BasicRetryingFuture<>(callable, retryAlgorithm);
return new BasicRetryingFuture<>(callable, retryAlgorithm, tracer);
}

/**
Expand Down
Loading