diff --git a/gax/src/main/java/com/google/api/gax/batching/BatchedCallContext.java b/gax/src/main/java/com/google/api/gax/batching/BatchedCallContext.java new file mode 100644 index 000000000..3a4466bc4 --- /dev/null +++ b/gax/src/main/java/com/google/api/gax/batching/BatchedCallContext.java @@ -0,0 +1,71 @@ +/* + * Copyright 2021 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.batching; + +import com.google.api.core.InternalApi; +import com.google.auto.value.AutoValue; + +/** + * BatchedCallContext encapsulates context data in a batch call. + * + *

For internal use only. + */ +@InternalApi +@AutoValue +public abstract class BatchedCallContext { + + /** Gets element count of the current batch. */ + public abstract long getElementCount(); + + /** Gets byte count of the current batch. */ + public abstract long getByteCount(); + + /** Gets total throttled time of the current batch. */ + public abstract long getTotalThrottledTimeMs(); + + @AutoValue.Builder + public abstract static class Builder { + + public static Builder newBuilder() { + return new AutoValue_BatchedCallContext.Builder(); + } + + /** Gets element count of the current batch. */ + public abstract Builder setElementCount(long elementCount); + + /** Gets byte count of the current batch. */ + public abstract Builder setByteCount(long byteCount); + + /** Gets total throttled time of the current batch. */ + public abstract Builder setTotalThrottledTimeMs(long throttledTimeMs); + + public abstract BatchedCallContext build(); + } +} diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index 8d3bbac26..a69c2d596 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -41,8 +41,10 @@ import com.google.api.gax.batching.FlowController.FlowControlRuntimeException; import com.google.api.gax.batching.FlowController.LimitExceededBehavior; import com.google.api.gax.rpc.UnaryCallable; +import com.google.api.gax.tracing.TracedBatchedContextCallable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.Futures; import java.lang.ref.Reference; import java.lang.ref.ReferenceQueue; @@ -188,16 +190,18 @@ public ApiFuture add(ElementT element) { // class, which made it seem unnecessary to have blocking and non-blocking semaphore // implementations. Some refactoring may be needed for the optimized implementation. So we'll // defer it till we decide on if refactoring FlowController is necessary. + Stopwatch stopwatch = Stopwatch.createStarted(); try { flowController.reserve(1, batchingDescriptor.countBytes(element)); } catch (FlowControlException e) { // This exception will only be thrown if the FlowController is set to ThrowException behavior throw FlowControlRuntimeException.fromFlowControlException(e); } + long throttledTime = stopwatch.elapsed(TimeUnit.MILLISECONDS); SettableApiFuture result = SettableApiFuture.create(); synchronized (elementLock) { - currentOpenBatch.add(element, result); + currentOpenBatch.add(element, result, throttledTime); } if (currentOpenBatch.hasAnyThresholdReached()) { @@ -226,8 +230,21 @@ public void sendOutstanding() { currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats); } - final ApiFuture batchResponse = - unaryCallable.futureCall(accumulatedBatch.builder.build()); + final ApiFuture batchResponse; + if (unaryCallable instanceof TracedBatchedContextCallable) { + BatchedCallContext batchedCallContext = + BatchedCallContext.Builder.newBuilder() + .setElementCount(accumulatedBatch.elementCounter) + .setByteCount(accumulatedBatch.byteCounter) + .setTotalThrottledTimeMs(accumulatedBatch.totalThrottledTimeMs) + .build(); + + batchResponse = + ((TracedBatchedContextCallable) unaryCallable) + .futureCall(accumulatedBatch.builder.build(), batchedCallContext); + } else { + batchResponse = unaryCallable.futureCall(accumulatedBatch.builder.build()); + } numOfOutstandingBatches.incrementAndGet(); ApiFutures.addCallback( @@ -312,6 +329,7 @@ private static class Batch { private long elementCounter = 0; private long byteCounter = 0; + private long totalThrottledTimeMs = 0; private Batch( RequestT prototype, @@ -328,11 +346,12 @@ private Batch( this.batcherStats = batcherStats; } - void add(ElementT element, SettableApiFuture result) { + void add(ElementT element, SettableApiFuture result, long throttledTime) { builder.add(element); entries.add(BatchEntry.create(element, result)); elementCounter++; byteCounter += descriptor.countBytes(element); + totalThrottledTimeMs += throttledTime; } void onBatchSuccess(ResponseT response) { diff --git a/gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java b/gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java index bc329630e..25d8a58e7 100644 --- a/gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java +++ b/gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java @@ -143,6 +143,13 @@ public interface ApiTracer { */ void batchRequestSent(long elementCount, long requestSize); + /** + * Adds an annotation of total throttled time of a batch. + * + * @param throttledTimeMs total throttled time of this batch. + */ + void batchRequestThrottled(long throttledTimeMs); + /** * A context class to be used with {@link #inScope()} and a try-with-resources block. Closing a * {@link Scope} removes any context that the underlying implementation might've set in {@link diff --git a/gax/src/main/java/com/google/api/gax/tracing/BaseApiTracer.java b/gax/src/main/java/com/google/api/gax/tracing/BaseApiTracer.java index 4ff8e901f..dbbedce58 100644 --- a/gax/src/main/java/com/google/api/gax/tracing/BaseApiTracer.java +++ b/gax/src/main/java/com/google/api/gax/tracing/BaseApiTracer.java @@ -135,4 +135,9 @@ public void requestSent() { public void batchRequestSent(long elementCount, long requestSize) { // noop } + + @Override + public void batchRequestThrottled(long throttledTimeMs) { + // noop + } } diff --git a/gax/src/main/java/com/google/api/gax/tracing/BatchedContextCallable.java b/gax/src/main/java/com/google/api/gax/tracing/BatchedContextCallable.java new file mode 100644 index 000000000..e5375a9d6 --- /dev/null +++ b/gax/src/main/java/com/google/api/gax/tracing/BatchedContextCallable.java @@ -0,0 +1,60 @@ +/* + * Copyright 2021 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.tracing; + +import com.google.api.core.ApiFuture; +import com.google.api.core.InternalApi; +import com.google.api.gax.batching.BatchedCallContext; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.UnaryCallable; + +/** + * A {@link UnaryCallable} which sends batched requests with context of the current batch. + * + *

This is public only for technical reasons. + */ +@InternalApi +public abstract class BatchedContextCallable + extends UnaryCallable { + + /** + * Performs a call asynchronously with context data of the current batch. + * + * @param batchedCallContext {@link BatchedCallContext} to make the call with + */ + public abstract ApiFuture futureCall( + RequestT request, BatchedCallContext batchedCallContext); + + @Override + public UnaryCallable withDefaultCallContext( + final ApiCallContext defaultCallContext) { + throw new UnsupportedOperationException("withDefaultCallContext() not implemented"); + } +} diff --git a/gax/src/main/java/com/google/api/gax/tracing/TracedBatchedContextCallable.java b/gax/src/main/java/com/google/api/gax/tracing/TracedBatchedContextCallable.java new file mode 100644 index 000000000..9d0e740c1 --- /dev/null +++ b/gax/src/main/java/com/google/api/gax/tracing/TracedBatchedContextCallable.java @@ -0,0 +1,121 @@ +/* + * Copyright 2021 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.tracing; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.api.core.InternalApi; +import com.google.api.gax.batching.BatchedCallContext; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.api.gax.tracing.ApiTracerFactory.OperationType; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.MoreExecutors; + +/** + * This callable wraps a batching callable chain in an {@link ApiTracer} and annotates {@link + * BatchedCallContext} batching context data. + * + *

For internal use only. + */ +@InternalApi("For internal use by google-cloud-java clients only") +public class TracedBatchedContextCallable + extends UnaryCallable { + + private final ApiTracerFactory tracerFactory; + private ApiCallContext baseCallContext; + private final SpanName spanName; + private final UnaryCallable innerCallable; + + public TracedBatchedContextCallable( + UnaryCallable innerCallable, + ApiCallContext callContext, + ApiTracerFactory tracerFactory, + SpanName spanName) { + this.baseCallContext = Preconditions.checkNotNull(callContext); + this.tracerFactory = Preconditions.checkNotNull(tracerFactory); + this.spanName = Preconditions.checkNotNull(spanName); + this.innerCallable = Preconditions.checkNotNull(innerCallable); + } + + /** + * Creates an {@link ApiTracer} and annotates batching context data. Performs a call + * asynchronously. + */ + public ApiFuture futureCall(RequestT request, BatchedCallContext batchedCallContext) { + return futureCall( + request, + baseCallContext, + batchedCallContext.getTotalThrottledTimeMs(), + batchedCallContext.getElementCount(), + batchedCallContext.getByteCount()); + } + + /** Calls the wrapped {@link UnaryCallable} within the context of a new trace. */ + @Override + public ApiFuture futureCall(RequestT request, ApiCallContext context) { + ApiCallContext mergedContext = baseCallContext.merge(context); + + return futureCall(request, mergedContext, null, null, null); + } + + private ApiFuture futureCall( + RequestT request, + ApiCallContext callContext, + Long throttledTimeMs, + Long elementCount, + Long byteCount) { + ApiTracer tracer = + tracerFactory.newTracer(callContext.getTracer(), spanName, OperationType.Batching); + TraceFinisher finisher = new TraceFinisher<>(tracer); + + try { + if (throttledTimeMs != null) { + tracer.batchRequestThrottled(throttledTimeMs); + } + if (elementCount != null && byteCount != null) { + tracer.batchRequestSent(elementCount, byteCount); + } + callContext = callContext.withTracer(tracer); + ApiFuture future = innerCallable.futureCall(request, callContext); + ApiFutures.addCallback(future, finisher, MoreExecutors.directExecutor()); + return future; + } catch (RuntimeException e) { + finisher.onFailure(e); + throw e; + } + } + + public UnaryCallable withDefaultCallContext( + final ApiCallContext defaultCallContext) { + return new TracedBatchedContextCallable<>( + innerCallable, baseCallContext.merge(defaultCallContext), tracerFactory, spanName); + } +} diff --git a/gax/src/test/java/com/google/api/gax/tracing/TracedBatchedContextCallableTest.java b/gax/src/test/java/com/google/api/gax/tracing/TracedBatchedContextCallableTest.java new file mode 100644 index 000000000..31d9ec81d --- /dev/null +++ b/gax/src/test/java/com/google/api/gax/tracing/TracedBatchedContextCallableTest.java @@ -0,0 +1,110 @@ +/* + * Copyright 2021 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.google.api.gax.tracing; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.batching.BatchedCallContext; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.api.gax.rpc.testing.FakeCallContext; +import com.google.api.gax.tracing.ApiTracerFactory.OperationType; +import java.util.Random; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +public class TracedBatchedContextCallableTest { + private static final SpanName SPAN_NAME = SpanName.of("FakeClient", "FakeRpc"); + + @Rule public final MockitoRule rule = MockitoJUnit.rule(); + + @Mock private ApiTracerFactory tracerFactory; + @Mock private ApiTracer tracer; + @Mock private UnaryCallable innerCallable; + @Mock private BatchedCallContext batchedCallContext; + private SettableApiFuture innerResult; + + private FakeCallContext callContext; + private TracedBatchedContextCallable callable; + + @Before + public void setUp() { + when(tracerFactory.newTracer( + any(ApiTracer.class), any(SpanName.class), eq(OperationType.Batching))) + .thenReturn(tracer); + innerResult = SettableApiFuture.create(); + when(innerCallable.futureCall(anyString(), any(ApiCallContext.class))).thenReturn(innerResult); + + callContext = FakeCallContext.createDefault(); + callable = + new TracedBatchedContextCallable(innerCallable, callContext, tracerFactory, SPAN_NAME); + } + + @Test + public void testRootTracerCreated() { + callable.futureCall("test", batchedCallContext); + verify(tracerFactory, times(1)) + .newTracer(callContext.getTracer(), SPAN_NAME, OperationType.Batching); + } + + @Test + public void testThrottledTimeRecorded() { + long throttledTime = new Random().nextLong(); + when(batchedCallContext.getTotalThrottledTimeMs()).thenReturn(throttledTime); + callable.futureCall("test", batchedCallContext); + verify(tracer).batchRequestThrottled(throttledTime); + } + + @Test + public void testOperationFinish() { + innerResult.set("success"); + callable.futureCall("test", batchedCallContext); + verify(tracer, times(1)).operationSucceeded(); + } + + @Test + public void testOperationFailed() { + RuntimeException fakeException = new RuntimeException("Exception"); + innerResult.setException(fakeException); + callable.futureCall("test", batchedCallContext); + verify(tracer, times(1)).operationFailed(fakeException); + } +}