Skip to content
This repository was archived by the owner on Sep 26, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2021 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching;

import com.google.api.core.InternalApi;
import com.google.auto.value.AutoValue;

/**
* BatchedCallContext encapsulates context data in a batch call.
*
* <p>For internal use only.
*/
@InternalApi
@AutoValue
public abstract class BatchedCallContext {

/** Gets element count of the current batch. */
public abstract long getElementCount();

/** Gets byte count of the current batch. */
public abstract long getByteCount();

/** Gets total throttled time of the current batch. */
public abstract long getTotalThrottledTimeMs();

@AutoValue.Builder
public abstract static class Builder {

public static Builder newBuilder() {
return new AutoValue_BatchedCallContext.Builder();
}

/** Gets element count of the current batch. */
public abstract Builder setElementCount(long elementCount);

/** Gets byte count of the current batch. */
public abstract Builder setByteCount(long byteCount);

/** Gets total throttled time of the current batch. */
public abstract Builder setTotalThrottledTimeMs(long throttledTimeMs);

public abstract BatchedCallContext build();
}
}
27 changes: 23 additions & 4 deletions gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@
import com.google.api.gax.batching.FlowController.FlowControlRuntimeException;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.tracing.TracedBatchedContextCallable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Futures;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
Expand Down Expand Up @@ -188,16 +190,18 @@ public ApiFuture<ElementResultT> add(ElementT element) {
// class, which made it seem unnecessary to have blocking and non-blocking semaphore
// implementations. Some refactoring may be needed for the optimized implementation. So we'll
// defer it till we decide on if refactoring FlowController is necessary.
Stopwatch stopwatch = Stopwatch.createStarted();
try {
flowController.reserve(1, batchingDescriptor.countBytes(element));
} catch (FlowControlException e) {
// This exception will only be thrown if the FlowController is set to ThrowException behavior
throw FlowControlRuntimeException.fromFlowControlException(e);
}
long throttledTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);

SettableApiFuture<ElementResultT> result = SettableApiFuture.create();
synchronized (elementLock) {
currentOpenBatch.add(element, result);
currentOpenBatch.add(element, result, throttledTime);
}

if (currentOpenBatch.hasAnyThresholdReached()) {
Expand Down Expand Up @@ -226,8 +230,21 @@ public void sendOutstanding() {
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats);
}

final ApiFuture<ResponseT> batchResponse =
unaryCallable.futureCall(accumulatedBatch.builder.build());
final ApiFuture<ResponseT> batchResponse;
if (unaryCallable instanceof TracedBatchedContextCallable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this instanceof? Instanceofs are areally not something we would want to use unless absolutely necessary, since it is usually an indication of mistakes in OOP design of the application.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a workaround.

The general idea here is that we want to export state out a batcher so that we can send it to opencensus and eventually to dataflow counters. We dont want to couple the implementation of the tracing to the batcher. So we want to pass the state as a call context. The only 2 approaches I can think of are:

  1. try to insert it in the existing ApiCallContext and have the next callable probe for it
  2. have a special callable and have Batcher probe it

The second approach ended up creating a lot less breakage and least amount of overhead

Copy link
Contributor

@vam-google vam-google Jul 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the other comment about ApiCallContext vs BatcherCcallContext, but overall, the more I understand about this change the more I lean towards merging BatcherCallContext thing into ApiCallContext (it still may remain as an aggregated context ojbect with this exact BatcherCallContext name if you want)

The second approach ended up creating a lot less breakage and least amount of

Can you please clarify what is the extra overhead of the second approach over the first one?

Main issue with the first approach (currently implemented in this PR) is that it breaks chain of responsibility pattern, and if someone adds another wrapper on top of TracedBatchedContextCallable (which should be completely safe and typical thing to do in the chain of responsibility paradigm) it will break this code, because it makes an assumption that TracedBatchedCallContextCallable is always the root callable supplied to this BatcherImpl class (which it may be now, but somebody in the future may add some sort of AwesomeTracedBatchedCallContextCallable wrapping the TracedBatchedCallContextCallable).

BatchedCallContext batchedCallContext =
BatchedCallContext.Builder.newBuilder()
.setElementCount(accumulatedBatch.elementCounter)
.setByteCount(accumulatedBatch.byteCounter)
.setTotalThrottledTimeMs(accumulatedBatch.totalThrottledTimeMs)
.build();

batchResponse =
((TracedBatchedContextCallable) unaryCallable)
.futureCall(accumulatedBatch.builder.build(), batchedCallContext);
} else {
batchResponse = unaryCallable.futureCall(accumulatedBatch.builder.build());
}

numOfOutstandingBatches.incrementAndGet();
ApiFutures.addCallback(
Expand Down Expand Up @@ -312,6 +329,7 @@ private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {

private long elementCounter = 0;
private long byteCounter = 0;
private long totalThrottledTimeMs = 0;

private Batch(
RequestT prototype,
Expand All @@ -328,11 +346,12 @@ private Batch(
this.batcherStats = batcherStats;
}

void add(ElementT element, SettableApiFuture<ElementResultT> result) {
void add(ElementT element, SettableApiFuture<ElementResultT> result, long throttledTime) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an InternalApi, but it says that it is for use by google-cloud-java clients, so it is a breaking chagne for them. Can we make it in anon-breaking way (adding a method overload instead of directly changing the method signature).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is on a private class, so it doesnt leak to clients

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yes, sorry, I did not realize it belonged to private Batch class, and thought it was part of BatcherImpl itself.

builder.add(element);
entries.add(BatchEntry.create(element, result));
elementCounter++;
byteCounter += descriptor.countBytes(element);
totalThrottledTimeMs += throttledTime;
}

void onBatchSuccess(ResponseT response) {
Expand Down
7 changes: 7 additions & 0 deletions gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ public interface ApiTracer {
*/
void batchRequestSent(long elementCount, long requestSize);

/**
* Adds an annotation of total throttled time of a batch.
*
* @param throttledTimeMs total throttled time of this batch.
*/
void batchRequestThrottled(long throttledTimeMs);

/**
* A context class to be used with {@link #inScope()} and a try-with-resources block. Closing a
* {@link Scope} removes any context that the underlying implementation might've set in {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,9 @@ public void requestSent() {
public void batchRequestSent(long elementCount, long requestSize) {
// noop
}

@Override
public void batchRequestThrottled(long throttledTimeMs) {
// noop
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2021 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.tracing;

import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.api.gax.batching.BatchedCallContext;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;

/**
* A {@link UnaryCallable} which sends batched requests with context of the current batch.
*
* <p>This is public only for technical reasons.
*/
@InternalApi
public abstract class BatchedContextCallable<RequestT, ResponseT>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering, can we make some of those InternalApi classes package-private?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class crosses the boundary between tracing and batching so its shared by 2 packages

extends UnaryCallable<RequestT, ResponseT> {

/**
* Performs a call asynchronously with context data of the current batch.
*
* @param batchedCallContext {@link BatchedCallContext} to make the call with
*/
public abstract ApiFuture<ResponseT> futureCall(
RequestT request, BatchedCallContext batchedCallContext);

@Override
public UnaryCallable<RequestT, ResponseT> withDefaultCallContext(
final ApiCallContext defaultCallContext) {
throw new UnsupportedOperationException("withDefaultCallContext() not implemented");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since callables are wrapped in other callables, forming a long "chain of responsibility" pattern it is basically very hard to predic tin which context this method will be called. I.e. if it throws UnsupportedOperatoinException, it is likely that it will actualy get thrown in some valid execution workflow. Can we actually implement this method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, no. This is here to make sure that this class is not misused. We can't implement it generically because the Batcher wont be able to pass it the context

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright 2021 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.tracing;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.api.gax.batching.BatchedCallContext;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.tracing.ApiTracerFactory.OperationType;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;

/**
* This callable wraps a batching callable chain in an {@link ApiTracer} and annotates {@link
* BatchedCallContext} batching context data.
*
* <p>For internal use only.
*/
@InternalApi("For internal use by google-cloud-java clients only")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who else would even use GAX? I would prefer to reserve internal api annotations for things internal to Gax.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a dataflow adapter would use this information to publish a counter to let the dataflow service know that the job is being throttled

public class TracedBatchedContextCallable<RequestT, ResponseT>
extends UnaryCallable<RequestT, ResponseT> {

private final ApiTracerFactory tracerFactory;
private ApiCallContext baseCallContext;
private final SpanName spanName;
private final UnaryCallable<RequestT, ResponseT> innerCallable;

public TracedBatchedContextCallable(
UnaryCallable<RequestT, ResponseT> innerCallable,
ApiCallContext callContext,
ApiTracerFactory tracerFactory,
SpanName spanName) {
this.baseCallContext = Preconditions.checkNotNull(callContext);
this.tracerFactory = Preconditions.checkNotNull(tracerFactory);
this.spanName = Preconditions.checkNotNull(spanName);
this.innerCallable = Preconditions.checkNotNull(innerCallable);
}

/**
* Creates an {@link ApiTracer} and annotates batching context data. Performs a call
* asynchronously.
*/
public ApiFuture<ResponseT> futureCall(RequestT request, BatchedCallContext batchedCallContext) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can batchedCallContext just become a part of ApiCallContext? So we will not have to have two methods here, one of which (this one) does not even fit in the overall callables hierarchy (the idea is that callables have their implementation of futureCall(request, apiCallContext), but don't have their own special contexts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving this to ApiCallContext is an option. There are 2 approaches:

  1. allowing ApiCallContext to generically store random data (similar to CallOptions). This will have a bit of overhead and makes this a lot more dynamically typed and I think harder to debug
  2. adding a specific getter/setter for batching context - this will make a generic construct like ApiContext have to know about specific things like batching that not relevant most of the time.

Localizing the interaction between a batcher and the next callable seems to be the least of the 3 evils

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having another futureCall method for a callable to actually do the sole thing why this callable exists does not really make it fit in the chain of responsibility pattern. Instead of seamlessly doing its custom thing, the callable must be called explicitly using the other method (the one with BatchedCallContext), which is not present in other callables in the same chain. This is basically the exact reason, why there is the instanceof statement (the one I commented about) in the BatcherImpl class.

In other words, the current implementation does make the job done, but it does not use the existing architecture in its idiomatic way. Since using callable must be done via calling futureCall(RequestT request, ApiCallContext context), if the custom implementaiton requires some extra information, it must be supplied as part of the generic ApiCallContext. This is basically why ApiCallContext is called in such generic way "context" meaning it has whatever may be needed in the call chain.

Or think about it the other way: if somebody decides to add another wrapper on top of TracedBatchedContextCallable and pass that in BatherImpl it will simply break the logic because instanceof will not work anymore. Note, an ability to add arbitrary number of wrappers (each with its features, which it adds to the chain of other features provided by other callables) is a must feature in this whole chain of wrappers, otherwise it just falls apart.

In other words, if you know how to make BatchedCallContext a part of ApiCallContext, please go ahead and do it.

return futureCall(
request,
baseCallContext,
batchedCallContext.getTotalThrottledTimeMs(),
batchedCallContext.getElementCount(),
batchedCallContext.getByteCount());
}

/** Calls the wrapped {@link UnaryCallable} within the context of a new trace. */
@Override
public ApiFuture futureCall(RequestT request, ApiCallContext context) {
ApiCallContext mergedContext = baseCallContext.merge(context);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there is a lot of duplication between these two futureCall methods. Can you please rewrite them such that the code is reused, instead of being duplicated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored the methods.


return futureCall(request, mergedContext, null, null, null);
}

private ApiFuture futureCall(
RequestT request,
ApiCallContext callContext,
Long throttledTimeMs,
Long elementCount,
Long byteCount) {
ApiTracer tracer =
tracerFactory.newTracer(callContext.getTracer(), spanName, OperationType.Batching);
TraceFinisher<ResponseT> finisher = new TraceFinisher<>(tracer);

try {
if (throttledTimeMs != null) {
tracer.batchRequestThrottled(throttledTimeMs);
}
if (elementCount != null && byteCount != null) {
tracer.batchRequestSent(elementCount, byteCount);
}
callContext = callContext.withTracer(tracer);
ApiFuture<ResponseT> future = innerCallable.futureCall(request, callContext);
ApiFutures.addCallback(future, finisher, MoreExecutors.directExecutor());
return future;
} catch (RuntimeException e) {
finisher.onFailure(e);
throw e;
}
}

public UnaryCallable<RequestT, ResponseT> withDefaultCallContext(
final ApiCallContext defaultCallContext) {
return new TracedBatchedContextCallable<>(
innerCallable, baseCallContext.merge(defaultCallContext), tracerFactory, spanName);
}
}
Loading