From 74dbd02089ccc6b03b8afe405e50032f307f70d5 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Wed, 23 Jun 2021 18:53:30 +0000 Subject: [PATCH 1/7] feat: add batch throttled time to metrics --- .../google/api/gax/batching/BatcherImpl.java | 25 +++- .../api/gax/batching/BatchingCallContext.java | 74 ++++++++++++ .../com/google/api/gax/tracing/ApiTracer.java | 7 ++ .../google/api/gax/tracing/BaseApiTracer.java | 5 + .../TracedBatchingContextCallable.java | 112 ++++++++++++++++++ .../TracedBatchingContextCallableTest.java | 110 +++++++++++++++++ 6 files changed, 329 insertions(+), 4 deletions(-) create mode 100644 gax/src/main/java/com/google/api/gax/batching/BatchingCallContext.java create mode 100644 gax/src/main/java/com/google/api/gax/tracing/TracedBatchingContextCallable.java create mode 100644 gax/src/test/java/com/google/api/gax/tracing/TracedBatchingContextCallableTest.java diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index 8d3bbac26..478d329c9 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -41,8 +41,10 @@ import com.google.api.gax.batching.FlowController.FlowControlRuntimeException; import com.google.api.gax.batching.FlowController.LimitExceededBehavior; import com.google.api.gax.rpc.UnaryCallable; +import com.google.api.gax.tracing.TracedBatchingContextCallable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.Futures; import java.lang.ref.Reference; import java.lang.ref.ReferenceQueue; @@ -188,16 +190,18 @@ public ApiFuture add(ElementT element) { // class, which made it seem unnecessary to have blocking and non-blocking semaphore // implementations. Some refactoring may be needed for the optimized implementation. So we'll // defer it till we decide on if refactoring FlowController is necessary. + Stopwatch stopwatch = Stopwatch.createStarted(); try { flowController.reserve(1, batchingDescriptor.countBytes(element)); } catch (FlowControlException e) { // This exception will only be thrown if the FlowController is set to ThrowException behavior throw FlowControlRuntimeException.fromFlowControlException(e); } + long throttledTime = stopwatch.elapsed(TimeUnit.MILLISECONDS); SettableApiFuture result = SettableApiFuture.create(); synchronized (elementLock) { - currentOpenBatch.add(element, result); + currentOpenBatch.add(element, result, throttledTime); } if (currentOpenBatch.hasAnyThresholdReached()) { @@ -226,8 +230,19 @@ public void sendOutstanding() { currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats); } - final ApiFuture batchResponse = - unaryCallable.futureCall(accumulatedBatch.builder.build()); + final ApiFuture batchResponse; + if (unaryCallable instanceof TracedBatchingContextCallable) { + BatchingCallContext batchingCallContext = + BatchingCallContext.create( + accumulatedBatch.elementCounter, + accumulatedBatch.byteCounter, + accumulatedBatch.totalThrottledTimeMs); + batchResponse = + ((TracedBatchingContextCallable) unaryCallable) + .futureCall(accumulatedBatch.builder.build(), batchingCallContext); + } else { + batchResponse = unaryCallable.futureCall(accumulatedBatch.builder.build()); + } numOfOutstandingBatches.incrementAndGet(); ApiFutures.addCallback( @@ -312,6 +327,7 @@ private static class Batch { private long elementCounter = 0; private long byteCounter = 0; + private long totalThrottledTimeMs = 0; private Batch( RequestT prototype, @@ -328,11 +344,12 @@ private Batch( this.batcherStats = batcherStats; } - void add(ElementT element, SettableApiFuture result) { + void add(ElementT element, SettableApiFuture result, long throttledTime) { builder.add(element); entries.add(BatchEntry.create(element, result)); elementCounter++; byteCounter += descriptor.countBytes(element); + totalThrottledTimeMs += throttledTime; } void onBatchSuccess(ResponseT response) { diff --git a/gax/src/main/java/com/google/api/gax/batching/BatchingCallContext.java b/gax/src/main/java/com/google/api/gax/batching/BatchingCallContext.java new file mode 100644 index 000000000..675af590f --- /dev/null +++ b/gax/src/main/java/com/google/api/gax/batching/BatchingCallContext.java @@ -0,0 +1,74 @@ +/* + * Copyright 2019 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.batching; + +import com.google.api.core.InternalApi; + +/** + * BatchingCallContext encapsulates context data in a batching call. + * + *

For internal use only. + */ +@InternalApi +public class BatchingCallContext { + + private long elementCount; + private long byteCount; + private long totalThrottledTimeMs; + + /** + * Creates a batching call context with this batch's element count, byte count and total throttled + * time. + */ + static BatchingCallContext create(long elementCount, long byteCount, long totalThrottledTimeMs) { + return new BatchingCallContext(elementCount, byteCount, totalThrottledTimeMs); + } + + private BatchingCallContext(long elementCount, long byteCount, long totalThrottledTimeMs) { + this.elementCount = elementCount; + this.byteCount = byteCount; + this.totalThrottledTimeMs = totalThrottledTimeMs; + } + + /** Gets element count of the current batch. */ + public long getElementCount() { + return elementCount; + } + + /** Gets byte count of the current batch. */ + public long getByteCount() { + return byteCount; + } + + /** Gets total throttled time of the current batch. */ + public long getTotalThrottledTimeMs() { + return totalThrottledTimeMs; + } +} diff --git a/gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java b/gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java index bc329630e..25d8a58e7 100644 --- a/gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java +++ b/gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java @@ -143,6 +143,13 @@ public interface ApiTracer { */ void batchRequestSent(long elementCount, long requestSize); + /** + * Adds an annotation of total throttled time of a batch. + * + * @param throttledTimeMs total throttled time of this batch. + */ + void batchRequestThrottled(long throttledTimeMs); + /** * A context class to be used with {@link #inScope()} and a try-with-resources block. Closing a * {@link Scope} removes any context that the underlying implementation might've set in {@link diff --git a/gax/src/main/java/com/google/api/gax/tracing/BaseApiTracer.java b/gax/src/main/java/com/google/api/gax/tracing/BaseApiTracer.java index 4ff8e901f..dbbedce58 100644 --- a/gax/src/main/java/com/google/api/gax/tracing/BaseApiTracer.java +++ b/gax/src/main/java/com/google/api/gax/tracing/BaseApiTracer.java @@ -135,4 +135,9 @@ public void requestSent() { public void batchRequestSent(long elementCount, long requestSize) { // noop } + + @Override + public void batchRequestThrottled(long throttledTimeMs) { + // noop + } } diff --git a/gax/src/main/java/com/google/api/gax/tracing/TracedBatchingContextCallable.java b/gax/src/main/java/com/google/api/gax/tracing/TracedBatchingContextCallable.java new file mode 100644 index 000000000..a3d503189 --- /dev/null +++ b/gax/src/main/java/com/google/api/gax/tracing/TracedBatchingContextCallable.java @@ -0,0 +1,112 @@ +/* + * Copyright 2019 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.tracing; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.api.core.InternalApi; +import com.google.api.gax.batching.BatchingCallContext; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.api.gax.tracing.ApiTracerFactory.OperationType; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.MoreExecutors; + +/** + * This callable wraps a batching callable chain in an {@link ApiTracer} and annotates {@link + * BatchingCallContext} batching context data. + * + *

For internal use only. + */ +@InternalApi("For internal use by google-cloud-java clients only") +public class TracedBatchingContextCallable + extends UnaryCallable { + + private final ApiTracerFactory tracerFactory; + private ApiCallContext baseCallContext; + private final SpanName spanName; + private final UnaryCallable innerCallable; + + public TracedBatchingContextCallable( + UnaryCallable innerCallable, + ApiCallContext callContext, + ApiTracerFactory tracerFactory, + SpanName spanName) { + this.baseCallContext = Preconditions.checkNotNull(callContext); + this.tracerFactory = Preconditions.checkNotNull(tracerFactory); + this.spanName = Preconditions.checkNotNull(spanName); + this.innerCallable = Preconditions.checkNotNull(innerCallable); + } + + /** + * Creates an {@link ApiTracer} and annotates batching context data. And perform a call + * asynchronously. + */ + public ApiFuture futureCall( + RequestT request, BatchingCallContext batchingCallContext) { + ApiTracer tracer = + tracerFactory.newTracer(baseCallContext.getTracer(), spanName, OperationType.Batching); + TraceFinisher finisher = new TraceFinisher<>(tracer); + + try { + tracer.batchRequestThrottled(batchingCallContext.getTotalThrottledTimeMs()); + tracer.batchRequestSent( + batchingCallContext.getElementCount(), batchingCallContext.getByteCount()); + baseCallContext = baseCallContext.withTracer(tracer); + ApiFuture future = innerCallable.futureCall(request, baseCallContext); + ApiFutures.addCallback(future, finisher, MoreExecutors.directExecutor()); + + return future; + } catch (RuntimeException e) { + finisher.onFailure(e); + throw e; + } + } + + @Override + public ApiFuture futureCall(RequestT request, ApiCallContext context) { + ApiCallContext mergedContext = baseCallContext.merge(context); + + ApiTracer tracer = + tracerFactory.newTracer(mergedContext.getTracer(), spanName, OperationType.Batching); + TraceFinisher finisher = new TraceFinisher<>(tracer); + + try { + mergedContext = mergedContext.withTracer(tracer); + ApiFuture future = innerCallable.futureCall(request, mergedContext); + ApiFutures.addCallback(future, finisher, MoreExecutors.directExecutor()); + + return future; + } catch (RuntimeException e) { + finisher.onFailure(e); + throw e; + } + } +} diff --git a/gax/src/test/java/com/google/api/gax/tracing/TracedBatchingContextCallableTest.java b/gax/src/test/java/com/google/api/gax/tracing/TracedBatchingContextCallableTest.java new file mode 100644 index 000000000..f0841fa09 --- /dev/null +++ b/gax/src/test/java/com/google/api/gax/tracing/TracedBatchingContextCallableTest.java @@ -0,0 +1,110 @@ +/* + * Copyright 2021 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.google.api.gax.tracing; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.batching.BatchingCallContext; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.api.gax.rpc.testing.FakeCallContext; +import com.google.api.gax.tracing.ApiTracerFactory.OperationType; +import java.util.Random; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +public class TracedBatchingContextCallableTest { + private static final SpanName SPAN_NAME = SpanName.of("FakeClient", "FakeRpc"); + + @Rule public final MockitoRule rule = MockitoJUnit.rule(); + + @Mock private ApiTracerFactory tracerFactory; + @Mock private ApiTracer tracer; + @Mock private UnaryCallable innerCallable; + @Mock private BatchingCallContext batchingCallContext; + private SettableApiFuture innerResult; + + private FakeCallContext callContext; + private TracedBatchingContextCallable callable; + + @Before + public void setUp() { + when(tracerFactory.newTracer( + any(ApiTracer.class), any(SpanName.class), eq(OperationType.Batching))) + .thenReturn(tracer); + innerResult = SettableApiFuture.create(); + when(innerCallable.futureCall(anyString(), any(ApiCallContext.class))).thenReturn(innerResult); + + callContext = FakeCallContext.createDefault(); + callable = + new TracedBatchingContextCallable(innerCallable, callContext, tracerFactory, SPAN_NAME); + } + + @Test + public void testRootTracerCreated() { + callable.futureCall("test", batchingCallContext); + verify(tracerFactory, times(1)) + .newTracer(callContext.getTracer(), SPAN_NAME, OperationType.Batching); + } + + @Test + public void testThrottledTimeRecorded() { + long throttledTime = new Random().nextLong(); + when(batchingCallContext.getTotalThrottledTimeMs()).thenReturn(throttledTime); + callable.futureCall("test", batchingCallContext); + verify(tracer).batchRequestThrottled(throttledTime); + } + + @Test + public void testOperationFinish() { + innerResult.set("success"); + callable.futureCall("test", batchingCallContext); + verify(tracer, times(1)).operationSucceeded(); + } + + @Test + public void testOperationFailed() { + RuntimeException fakeException = new RuntimeException("Exception"); + innerResult.setException(fakeException); + callable.futureCall("test", batchingCallContext); + verify(tracer, times(1)).operationFailed(fakeException); + } +} From 7f65395023335e9b6efb5c6b4f998aa41e4ddb82 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Wed, 23 Jun 2021 21:27:28 +0000 Subject: [PATCH 2/7] override withDefaultContext --- .../gax/tracing/TracedBatchingContextCallable.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/gax/src/main/java/com/google/api/gax/tracing/TracedBatchingContextCallable.java b/gax/src/main/java/com/google/api/gax/tracing/TracedBatchingContextCallable.java index a3d503189..13d0ddeab 100644 --- a/gax/src/main/java/com/google/api/gax/tracing/TracedBatchingContextCallable.java +++ b/gax/src/main/java/com/google/api/gax/tracing/TracedBatchingContextCallable.java @@ -109,4 +109,15 @@ public ApiFuture futureCall(RequestT request, ApiCallContext context) { throw e; } } + + public UnaryCallable withDefaultCallContext( + final ApiCallContext defaultCallContext) { + return new UnaryCallable() { + @Override + public ApiFuture futureCall(RequestT request, ApiCallContext thisCallContext) { + return TracedBatchingContextCallable.this.futureCall( + request, defaultCallContext.merge(thisCallContext).merge(baseCallContext)); + } + }; + } } From 1a9d01ccbd632170243e3c5596f03adce7aa1c35 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Thu, 24 Jun 2021 21:11:06 +0000 Subject: [PATCH 3/7] refactor --- ...llContext.java => BatchedCallContext.java} | 35 ++++-------- .../google/api/gax/batching/BatcherImpl.java | 12 ++--- .../gax/tracing/BatchedContextCallable.java | 53 +++++++++++++++++++ ...java => TracedBatchedContextCallable.java} | 27 +++++----- ... => TracedBatchedContextCallableTest.java} | 20 +++---- 5 files changed, 91 insertions(+), 56 deletions(-) rename gax/src/main/java/com/google/api/gax/batching/{BatchingCallContext.java => BatchedCallContext.java} (66%) create mode 100644 gax/src/main/java/com/google/api/gax/tracing/BatchedContextCallable.java rename gax/src/main/java/com/google/api/gax/tracing/{TracedBatchingContextCallable.java => TracedBatchedContextCallable.java} (84%) rename gax/src/test/java/com/google/api/gax/tracing/{TracedBatchingContextCallableTest.java => TracedBatchedContextCallableTest.java} (85%) diff --git a/gax/src/main/java/com/google/api/gax/batching/BatchingCallContext.java b/gax/src/main/java/com/google/api/gax/batching/BatchedCallContext.java similarity index 66% rename from gax/src/main/java/com/google/api/gax/batching/BatchingCallContext.java rename to gax/src/main/java/com/google/api/gax/batching/BatchedCallContext.java index 675af590f..4a6aef27c 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatchingCallContext.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatchedCallContext.java @@ -30,45 +30,30 @@ package com.google.api.gax.batching; import com.google.api.core.InternalApi; +import com.google.auto.value.AutoValue; /** - * BatchingCallContext encapsulates context data in a batching call. + * BatchedCallContext encapsulates context data in a batch call. * *

For internal use only. */ @InternalApi -public class BatchingCallContext { - - private long elementCount; - private long byteCount; - private long totalThrottledTimeMs; +@AutoValue +public abstract class BatchedCallContext { /** - * Creates a batching call context with this batch's element count, byte count and total throttled - * time. + * Creates a call context for a batch with its element count, byte count and total throttled time. */ - static BatchingCallContext create(long elementCount, long byteCount, long totalThrottledTimeMs) { - return new BatchingCallContext(elementCount, byteCount, totalThrottledTimeMs); - } - - private BatchingCallContext(long elementCount, long byteCount, long totalThrottledTimeMs) { - this.elementCount = elementCount; - this.byteCount = byteCount; - this.totalThrottledTimeMs = totalThrottledTimeMs; + static BatchedCallContext create(long elementCount, long byteCount, long totalThrottledTimeMs) { + return new AutoValue_BatchedCallContext(elementCount, byteCount, totalThrottledTimeMs); } /** Gets element count of the current batch. */ - public long getElementCount() { - return elementCount; - } + public abstract long getElementCount(); /** Gets byte count of the current batch. */ - public long getByteCount() { - return byteCount; - } + public abstract long getByteCount(); /** Gets total throttled time of the current batch. */ - public long getTotalThrottledTimeMs() { - return totalThrottledTimeMs; - } + public abstract long getTotalThrottledTimeMs(); } diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index 478d329c9..21649c5c1 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -41,7 +41,7 @@ import com.google.api.gax.batching.FlowController.FlowControlRuntimeException; import com.google.api.gax.batching.FlowController.LimitExceededBehavior; import com.google.api.gax.rpc.UnaryCallable; -import com.google.api.gax.tracing.TracedBatchingContextCallable; +import com.google.api.gax.tracing.TracedBatchedContextCallable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; @@ -231,15 +231,15 @@ public void sendOutstanding() { } final ApiFuture batchResponse; - if (unaryCallable instanceof TracedBatchingContextCallable) { - BatchingCallContext batchingCallContext = - BatchingCallContext.create( + if (unaryCallable instanceof TracedBatchedContextCallable) { + BatchedCallContext batchedCallContext = + BatchedCallContext.create( accumulatedBatch.elementCounter, accumulatedBatch.byteCounter, accumulatedBatch.totalThrottledTimeMs); batchResponse = - ((TracedBatchingContextCallable) unaryCallable) - .futureCall(accumulatedBatch.builder.build(), batchingCallContext); + ((TracedBatchedContextCallable) unaryCallable) + .futureCall(accumulatedBatch.builder.build(), batchedCallContext); } else { batchResponse = unaryCallable.futureCall(accumulatedBatch.builder.build()); } diff --git a/gax/src/main/java/com/google/api/gax/tracing/BatchedContextCallable.java b/gax/src/main/java/com/google/api/gax/tracing/BatchedContextCallable.java new file mode 100644 index 000000000..e5abeabc4 --- /dev/null +++ b/gax/src/main/java/com/google/api/gax/tracing/BatchedContextCallable.java @@ -0,0 +1,53 @@ +/* + * Copyright 2019 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.tracing; + +import com.google.api.core.ApiFuture; +import com.google.api.core.InternalApi; +import com.google.api.gax.batching.BatchedCallContext; +import com.google.api.gax.rpc.UnaryCallable; + +/** + * A {@link UnaryCallable} which sends batched requests with context of the current batch. + * + *

This is public only for technical reasons. + */ +@InternalApi +public abstract class BatchedContextCallable + extends UnaryCallable { + + /** + * Performs a call asynchronously with context data of the current batch. + * + * @param batchedCallContext {@link BatchedCallContext} to make the call with + */ + public abstract ApiFuture futureCall( + RequestT request, BatchedCallContext batchedCallContext); +} diff --git a/gax/src/main/java/com/google/api/gax/tracing/TracedBatchingContextCallable.java b/gax/src/main/java/com/google/api/gax/tracing/TracedBatchedContextCallable.java similarity index 84% rename from gax/src/main/java/com/google/api/gax/tracing/TracedBatchingContextCallable.java rename to gax/src/main/java/com/google/api/gax/tracing/TracedBatchedContextCallable.java index 13d0ddeab..52c832847 100644 --- a/gax/src/main/java/com/google/api/gax/tracing/TracedBatchingContextCallable.java +++ b/gax/src/main/java/com/google/api/gax/tracing/TracedBatchedContextCallable.java @@ -32,7 +32,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.core.InternalApi; -import com.google.api.gax.batching.BatchingCallContext; +import com.google.api.gax.batching.BatchedCallContext; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.UnaryCallable; import com.google.api.gax.tracing.ApiTracerFactory.OperationType; @@ -41,12 +41,12 @@ /** * This callable wraps a batching callable chain in an {@link ApiTracer} and annotates {@link - * BatchingCallContext} batching context data. + * BatchedCallContext} batching context data. * *

For internal use only. */ @InternalApi("For internal use by google-cloud-java clients only") -public class TracedBatchingContextCallable +public class TracedBatchedContextCallable extends UnaryCallable { private final ApiTracerFactory tracerFactory; @@ -54,7 +54,7 @@ public class TracedBatchingContextCallable private final SpanName spanName; private final UnaryCallable innerCallable; - public TracedBatchingContextCallable( + public TracedBatchedContextCallable( UnaryCallable innerCallable, ApiCallContext callContext, ApiTracerFactory tracerFactory, @@ -69,16 +69,15 @@ public TracedBatchingContextCallable( * Creates an {@link ApiTracer} and annotates batching context data. And perform a call * asynchronously. */ - public ApiFuture futureCall( - RequestT request, BatchingCallContext batchingCallContext) { + public ApiFuture futureCall(RequestT request, BatchedCallContext batchedCallContext) { ApiTracer tracer = tracerFactory.newTracer(baseCallContext.getTracer(), spanName, OperationType.Batching); TraceFinisher finisher = new TraceFinisher<>(tracer); try { - tracer.batchRequestThrottled(batchingCallContext.getTotalThrottledTimeMs()); + tracer.batchRequestThrottled(batchedCallContext.getTotalThrottledTimeMs()); tracer.batchRequestSent( - batchingCallContext.getElementCount(), batchingCallContext.getByteCount()); + batchedCallContext.getElementCount(), batchedCallContext.getByteCount()); baseCallContext = baseCallContext.withTracer(tracer); ApiFuture future = innerCallable.futureCall(request, baseCallContext); ApiFutures.addCallback(future, finisher, MoreExecutors.directExecutor()); @@ -90,6 +89,9 @@ public ApiFuture futureCall( } } + /** + * Calls the wrapped {@link UnaryCallable} within the context of a new trace. + */ @Override public ApiFuture futureCall(RequestT request, ApiCallContext context) { ApiCallContext mergedContext = baseCallContext.merge(context); @@ -112,12 +114,7 @@ public ApiFuture futureCall(RequestT request, ApiCallContext context) { public UnaryCallable withDefaultCallContext( final ApiCallContext defaultCallContext) { - return new UnaryCallable() { - @Override - public ApiFuture futureCall(RequestT request, ApiCallContext thisCallContext) { - return TracedBatchingContextCallable.this.futureCall( - request, defaultCallContext.merge(thisCallContext).merge(baseCallContext)); - } - }; + return new TracedBatchedContextCallable<>( + innerCallable, baseCallContext.merge(defaultCallContext), tracerFactory, spanName); } } diff --git a/gax/src/test/java/com/google/api/gax/tracing/TracedBatchingContextCallableTest.java b/gax/src/test/java/com/google/api/gax/tracing/TracedBatchedContextCallableTest.java similarity index 85% rename from gax/src/test/java/com/google/api/gax/tracing/TracedBatchingContextCallableTest.java rename to gax/src/test/java/com/google/api/gax/tracing/TracedBatchedContextCallableTest.java index f0841fa09..31d9ec81d 100644 --- a/gax/src/test/java/com/google/api/gax/tracing/TracedBatchingContextCallableTest.java +++ b/gax/src/test/java/com/google/api/gax/tracing/TracedBatchedContextCallableTest.java @@ -38,7 +38,7 @@ import static org.mockito.Mockito.when; import com.google.api.core.SettableApiFuture; -import com.google.api.gax.batching.BatchingCallContext; +import com.google.api.gax.batching.BatchedCallContext; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.UnaryCallable; import com.google.api.gax.rpc.testing.FakeCallContext; @@ -51,7 +51,7 @@ import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; -public class TracedBatchingContextCallableTest { +public class TracedBatchedContextCallableTest { private static final SpanName SPAN_NAME = SpanName.of("FakeClient", "FakeRpc"); @Rule public final MockitoRule rule = MockitoJUnit.rule(); @@ -59,11 +59,11 @@ public class TracedBatchingContextCallableTest { @Mock private ApiTracerFactory tracerFactory; @Mock private ApiTracer tracer; @Mock private UnaryCallable innerCallable; - @Mock private BatchingCallContext batchingCallContext; + @Mock private BatchedCallContext batchedCallContext; private SettableApiFuture innerResult; private FakeCallContext callContext; - private TracedBatchingContextCallable callable; + private TracedBatchedContextCallable callable; @Before public void setUp() { @@ -75,12 +75,12 @@ public void setUp() { callContext = FakeCallContext.createDefault(); callable = - new TracedBatchingContextCallable(innerCallable, callContext, tracerFactory, SPAN_NAME); + new TracedBatchedContextCallable(innerCallable, callContext, tracerFactory, SPAN_NAME); } @Test public void testRootTracerCreated() { - callable.futureCall("test", batchingCallContext); + callable.futureCall("test", batchedCallContext); verify(tracerFactory, times(1)) .newTracer(callContext.getTracer(), SPAN_NAME, OperationType.Batching); } @@ -88,15 +88,15 @@ public void testRootTracerCreated() { @Test public void testThrottledTimeRecorded() { long throttledTime = new Random().nextLong(); - when(batchingCallContext.getTotalThrottledTimeMs()).thenReturn(throttledTime); - callable.futureCall("test", batchingCallContext); + when(batchedCallContext.getTotalThrottledTimeMs()).thenReturn(throttledTime); + callable.futureCall("test", batchedCallContext); verify(tracer).batchRequestThrottled(throttledTime); } @Test public void testOperationFinish() { innerResult.set("success"); - callable.futureCall("test", batchingCallContext); + callable.futureCall("test", batchedCallContext); verify(tracer, times(1)).operationSucceeded(); } @@ -104,7 +104,7 @@ public void testOperationFinish() { public void testOperationFailed() { RuntimeException fakeException = new RuntimeException("Exception"); innerResult.setException(fakeException); - callable.futureCall("test", batchingCallContext); + callable.futureCall("test", batchedCallContext); verify(tracer, times(1)).operationFailed(fakeException); } } From 4ffaee946a3783d7fcebd4c7c4fda7df8641d88e Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Mon, 28 Jun 2021 14:24:25 +0000 Subject: [PATCH 4/7] format --- .../api/gax/batching/BatchedCallContext.java | 26 ++++++++++++++----- .../google/api/gax/batching/BatcherImpl.java | 10 ++++--- .../tracing/TracedBatchedContextCallable.java | 6 ++--- 3 files changed, 27 insertions(+), 15 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/BatchedCallContext.java b/gax/src/main/java/com/google/api/gax/batching/BatchedCallContext.java index 4a6aef27c..d70aa6408 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatchedCallContext.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatchedCallContext.java @@ -41,13 +41,6 @@ @AutoValue public abstract class BatchedCallContext { - /** - * Creates a call context for a batch with its element count, byte count and total throttled time. - */ - static BatchedCallContext create(long elementCount, long byteCount, long totalThrottledTimeMs) { - return new AutoValue_BatchedCallContext(elementCount, byteCount, totalThrottledTimeMs); - } - /** Gets element count of the current batch. */ public abstract long getElementCount(); @@ -56,4 +49,23 @@ static BatchedCallContext create(long elementCount, long byteCount, long totalTh /** Gets total throttled time of the current batch. */ public abstract long getTotalThrottledTimeMs(); + + @AutoValue.Builder + public abstract static class Builder { + + public static Builder newBuilder() { + return new AutoValue_BatchedCallContext.Builder(); + } + + /** Gets element count of the current batch. */ + public abstract Builder setElementCount(long elementCount); + + /** Gets byte count of the current batch. */ + public abstract Builder setByteCount(long byteCount); + + /** Gets total throttled time of the current batch. */ + public abstract Builder setTotalThrottledTimeMs(long throttledTimeMs); + + public abstract BatchedCallContext build(); + } } diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index 21649c5c1..a69c2d596 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -233,10 +233,12 @@ public void sendOutstanding() { final ApiFuture batchResponse; if (unaryCallable instanceof TracedBatchedContextCallable) { BatchedCallContext batchedCallContext = - BatchedCallContext.create( - accumulatedBatch.elementCounter, - accumulatedBatch.byteCounter, - accumulatedBatch.totalThrottledTimeMs); + BatchedCallContext.Builder.newBuilder() + .setElementCount(accumulatedBatch.elementCounter) + .setByteCount(accumulatedBatch.byteCounter) + .setTotalThrottledTimeMs(accumulatedBatch.totalThrottledTimeMs) + .build(); + batchResponse = ((TracedBatchedContextCallable) unaryCallable) .futureCall(accumulatedBatch.builder.build(), batchedCallContext); diff --git a/gax/src/main/java/com/google/api/gax/tracing/TracedBatchedContextCallable.java b/gax/src/main/java/com/google/api/gax/tracing/TracedBatchedContextCallable.java index 52c832847..e616bfb81 100644 --- a/gax/src/main/java/com/google/api/gax/tracing/TracedBatchedContextCallable.java +++ b/gax/src/main/java/com/google/api/gax/tracing/TracedBatchedContextCallable.java @@ -66,7 +66,7 @@ public TracedBatchedContextCallable( } /** - * Creates an {@link ApiTracer} and annotates batching context data. And perform a call + * Creates an {@link ApiTracer} and annotates batching context data. Performs a call * asynchronously. */ public ApiFuture futureCall(RequestT request, BatchedCallContext batchedCallContext) { @@ -89,9 +89,7 @@ public ApiFuture futureCall(RequestT request, BatchedCallContext batc } } - /** - * Calls the wrapped {@link UnaryCallable} within the context of a new trace. - */ + /** Calls the wrapped {@link UnaryCallable} within the context of a new trace. */ @Override public ApiFuture futureCall(RequestT request, ApiCallContext context) { ApiCallContext mergedContext = baseCallContext.merge(context); From 353668ff56072d1a3abc9003e00d012200fc2255 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Mon, 12 Jul 2021 15:21:14 +0000 Subject: [PATCH 5/7] fix comments --- .../google/api/gax/tracing/BatchedContextCallable.java | 9 ++++++++- .../api/gax/tracing/TracedBatchedContextCallable.java | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/tracing/BatchedContextCallable.java b/gax/src/main/java/com/google/api/gax/tracing/BatchedContextCallable.java index e5abeabc4..e5375a9d6 100644 --- a/gax/src/main/java/com/google/api/gax/tracing/BatchedContextCallable.java +++ b/gax/src/main/java/com/google/api/gax/tracing/BatchedContextCallable.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Google LLC + * Copyright 2021 Google LLC * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are @@ -32,6 +32,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.InternalApi; import com.google.api.gax.batching.BatchedCallContext; +import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.UnaryCallable; /** @@ -50,4 +51,10 @@ public abstract class BatchedContextCallable */ public abstract ApiFuture futureCall( RequestT request, BatchedCallContext batchedCallContext); + + @Override + public UnaryCallable withDefaultCallContext( + final ApiCallContext defaultCallContext) { + throw new UnsupportedOperationException("withDefaultCallContext() not implemented"); + } } diff --git a/gax/src/main/java/com/google/api/gax/tracing/TracedBatchedContextCallable.java b/gax/src/main/java/com/google/api/gax/tracing/TracedBatchedContextCallable.java index e616bfb81..2038b28bf 100644 --- a/gax/src/main/java/com/google/api/gax/tracing/TracedBatchedContextCallable.java +++ b/gax/src/main/java/com/google/api/gax/tracing/TracedBatchedContextCallable.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Google LLC + * Copyright 2021 Google LLC * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are From 89867441ad0a136fc18ecd985ddd682728ed6c69 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Mon, 12 Jul 2021 15:22:50 +0000 Subject: [PATCH 6/7] fix copyright --- .../java/com/google/api/gax/batching/BatchedCallContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/BatchedCallContext.java b/gax/src/main/java/com/google/api/gax/batching/BatchedCallContext.java index d70aa6408..3a4466bc4 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatchedCallContext.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatchedCallContext.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Google LLC + * Copyright 2021 Google LLC * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are From 12c8f000975409e147fd5d871c2abe08e647953c Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Thu, 15 Jul 2021 18:51:11 +0000 Subject: [PATCH 7/7] refactor TracedBatchedContextCallable --- .../tracing/TracedBatchedContextCallable.java | 45 ++++++++++--------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/tracing/TracedBatchedContextCallable.java b/gax/src/main/java/com/google/api/gax/tracing/TracedBatchedContextCallable.java index 2038b28bf..9d0e740c1 100644 --- a/gax/src/main/java/com/google/api/gax/tracing/TracedBatchedContextCallable.java +++ b/gax/src/main/java/com/google/api/gax/tracing/TracedBatchedContextCallable.java @@ -70,23 +70,12 @@ public TracedBatchedContextCallable( * asynchronously. */ public ApiFuture futureCall(RequestT request, BatchedCallContext batchedCallContext) { - ApiTracer tracer = - tracerFactory.newTracer(baseCallContext.getTracer(), spanName, OperationType.Batching); - TraceFinisher finisher = new TraceFinisher<>(tracer); - - try { - tracer.batchRequestThrottled(batchedCallContext.getTotalThrottledTimeMs()); - tracer.batchRequestSent( - batchedCallContext.getElementCount(), batchedCallContext.getByteCount()); - baseCallContext = baseCallContext.withTracer(tracer); - ApiFuture future = innerCallable.futureCall(request, baseCallContext); - ApiFutures.addCallback(future, finisher, MoreExecutors.directExecutor()); - - return future; - } catch (RuntimeException e) { - finisher.onFailure(e); - throw e; - } + return futureCall( + request, + baseCallContext, + batchedCallContext.getTotalThrottledTimeMs(), + batchedCallContext.getElementCount(), + batchedCallContext.getByteCount()); } /** Calls the wrapped {@link UnaryCallable} within the context of a new trace. */ @@ -94,15 +83,29 @@ public ApiFuture futureCall(RequestT request, BatchedCallContext batc public ApiFuture futureCall(RequestT request, ApiCallContext context) { ApiCallContext mergedContext = baseCallContext.merge(context); + return futureCall(request, mergedContext, null, null, null); + } + + private ApiFuture futureCall( + RequestT request, + ApiCallContext callContext, + Long throttledTimeMs, + Long elementCount, + Long byteCount) { ApiTracer tracer = - tracerFactory.newTracer(mergedContext.getTracer(), spanName, OperationType.Batching); + tracerFactory.newTracer(callContext.getTracer(), spanName, OperationType.Batching); TraceFinisher finisher = new TraceFinisher<>(tracer); try { - mergedContext = mergedContext.withTracer(tracer); - ApiFuture future = innerCallable.futureCall(request, mergedContext); + if (throttledTimeMs != null) { + tracer.batchRequestThrottled(throttledTimeMs); + } + if (elementCount != null && byteCount != null) { + tracer.batchRequestSent(elementCount, byteCount); + } + callContext = callContext.withTracer(tracer); + ApiFuture future = innerCallable.futureCall(request, callContext); ApiFutures.addCallback(future, finisher, MoreExecutors.directExecutor()); - return future; } catch (RuntimeException e) { finisher.onFailure(e);