diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/MutateRowsException.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/MutateRowsException.java new file mode 100644 index 000000000000..d1c0eda844c2 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/MutateRowsException.java @@ -0,0 +1,105 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.models; + +import com.google.api.core.InternalApi; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.StatusCode; +import com.google.auto.value.AutoValue; +import com.google.bigtable.v2.MutateRowsRequest; +import com.google.common.base.Preconditions; +import java.util.List; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Thrown by the MutateRows when at least one Mutation failed. If the last failure was caused by an + * RPC error (as opposed to a single entry failing), then this exception's cause will be set to that + * error and {@link #getFailedMutations()} will contain synthetic errors for all of the entries that + * were part of that RPC. + */ +public final class MutateRowsException extends ApiException { + // Synthetic status to use for this ApiException subclass. + private static final StatusCode LOCAL_STATUS = + new StatusCode() { + @Override + public Code getCode() { + return Code.INTERNAL; + } + + @Override + public Object getTransportCode() { + return null; + } + }; + + private final List failedMutations; + + /** + * This constructor is considered an internal implementation detail and not meant to be used by + * applications. + */ + @InternalApi + public MutateRowsException( + @Nullable Throwable rpcError, + @Nonnull List failedMutations, + boolean retryable) { + super("Some mutations failed to apply", rpcError, LOCAL_STATUS, retryable); + Preconditions.checkNotNull(failedMutations); + Preconditions.checkArgument(!failedMutations.isEmpty(), "failedMutations can't be empty"); + this.failedMutations = failedMutations; + } + + /** + * Retrieve all of the failed mutations. This list will contain failures for all of the mutations + * that have failed across all of the retry attempts so far. + */ + @Nonnull + public List getFailedMutations() { + return failedMutations; + } + + /** + * Identifies which mutation failed and the reason it failed. The mutation is identified by it's + * index in the original request's {@link MutateRowsRequest#getEntriesList()}. + */ + @AutoValue + public abstract static class FailedMutation { + /** + * This method is considered an internal implementation detail and not meant to be used by + * applications. + */ + @InternalApi + @Nonnull + public static FailedMutation create(int index, ApiException error) { + return new AutoValue_MutateRowsException_FailedMutation(index, error); + } + + /** + * The index of the mutation in the original request's {@link + * MutateRowsRequest#getEntriesList()}. + */ + public abstract int getIndex(); + + /** + * The error that prevented this mutation from being applied. Please note, that if the entire + * RPC attempt failed, all mutations that were part of the attempt will have take on the same + * error. + */ + @Nonnull + public abstract ApiException getError(); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 379e751d71f9..f3aef39f6268 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -15,10 +15,11 @@ */ package com.google.cloud.bigtable.data.v2.stub; -import com.google.api.core.ApiFuture; import com.google.api.core.InternalApi; -import com.google.api.gax.retrying.RetrySettings; -import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.retrying.ExponentialRetryAlgorithm; +import com.google.api.gax.retrying.RetryAlgorithm; +import com.google.api.gax.retrying.RetryingExecutor; +import com.google.api.gax.retrying.ScheduledRetryingExecutor; import com.google.api.gax.rpc.BatchingCallSettings; import com.google.api.gax.rpc.Callables; import com.google.api.gax.rpc.ClientContext; @@ -26,7 +27,6 @@ import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.UnaryCallable; import com.google.bigtable.v2.MutateRowsRequest; -import com.google.bigtable.v2.MutateRowsResponse; import com.google.bigtable.v2.ReadRowsRequest; import com.google.bigtable.v2.SampleRowKeysRequest; import com.google.bigtable.v2.SampleRowKeysResponse; @@ -40,13 +40,14 @@ import com.google.cloud.bigtable.data.v2.models.RowAdapter; import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor; -import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsSpoolingCallable; +import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable; import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsUserFacingCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable; +import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm; import java.io.IOException; import java.util.List; import org.threeten.bp.Duration; @@ -251,28 +252,41 @@ private UnaryCallable createMutateRowCallable() { *
  • Convert a {@link RowMutation} into a {@link MutateRowsRequest} with a single entry. *
  • Using gax's {@link com.google.api.gax.rpc.BatchingCallable} to spool the requests and * aggregate the {@link MutateRowsRequest.Entry}s. - *
  • Spool the streamed responses. + *
  • Process the response and schedule retries. At the end of each attempt, entries that have + * been applied, are filtered from the next attempt. Also, any entries that failed with a + * nontransient error, are filtered from the next attempt. This will continue until there + * are no more entries or there are no more retry attempts left. + *
  • Wrap batch failures in a {@link + * com.google.cloud.bigtable.data.v2.models.MutateRowsException}. *
  • Split the responses using {@link MutateRowsBatchingDescriptor}. - *
  • Apply retries to individual mutations * */ private UnaryCallable createMutateRowsCallable() { - MutateRowsSpoolingCallable spooling = new MutateRowsSpoolingCallable(stub.mutateRowsCallable()); + RetryAlgorithm retryAlgorithm = + new RetryAlgorithm<>( + new ApiResultRetryAlgorithm(), + new ExponentialRetryAlgorithm( + settings.mutateRowsSettings().getRetrySettings(), clientContext.getClock())); + RetryingExecutor retryingExecutor = + new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor()); + + UnaryCallable retrying = + new MutateRowsRetryingCallable( + clientContext.getDefaultCallContext(), + stub.mutateRowsCallable(), + retryingExecutor, + settings.mutateRowsSettings().getRetryableCodes()); // recreate BatchingCallSettings with the correct descriptor - BatchingCallSettings.Builder batchingCallSettings = - BatchingCallSettings.newBuilder( - new MutateRowsBatchingDescriptor(settings.mutateRowsSettings().getRetryableCodes())) + BatchingCallSettings.Builder batchingCallSettings = + BatchingCallSettings.newBuilder(new MutateRowsBatchingDescriptor()) .setBatchingSettings(settings.mutateRowsSettings().getBatchingSettings()); - UnaryCallable batching = - Callables.batching(spooling, batchingCallSettings.build(), clientContext); - - UnaryCallable retrying = - Callables.retrying(batching, settings.mutateRowsSettings(), clientContext); + UnaryCallable batching = + Callables.batching(retrying, batchingCallSettings.build(), clientContext); MutateRowsUserFacingCallable userFacing = - new MutateRowsUserFacingCallable(retrying, requestContext); + new MutateRowsUserFacingCallable(batching, requestContext); return userFacing.withDefaultCallContext(clientContext.getDefaultCallContext()); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java new file mode 100644 index 000000000000..1023e9ad7d3b --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java @@ -0,0 +1,330 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.mutaterows; + +import com.google.api.core.ApiFunction; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.retrying.RetryingFuture; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.ApiExceptionFactory; +import com.google.api.gax.rpc.StatusCode; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.bigtable.v2.MutateRowsRequest; +import com.google.bigtable.v2.MutateRowsRequest.Builder; +import com.google.bigtable.v2.MutateRowsResponse; +import com.google.bigtable.v2.MutateRowsResponse.Entry; +import com.google.cloud.bigtable.data.v2.models.MutateRowsException; +import com.google.cloud.bigtable.data.v2.models.MutateRowsException.FailedMutation; +import com.google.cloud.bigtable.gaxx.retrying.NonCancellableFuture; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.rpc.Code; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +// TODO: Try to generalize this using a BatchingDescriptor +/** + * A stateful {@link Callable} that can be used with gax's {@link + * com.google.api.gax.retrying.RetryingExecutor} and {@link RetryingFuture}. + * + *

    For every {@link #call()}, it will: + * + *

      + *
    • Send the current request to the {@link #innerCallable} + *
    • On RPC completion it will inspect the response. If any entries failed (including the whole + * RPC), it will update its internal state so that the next {@link #call()} will only contain + * entries that have transient failures. + *
    + * + *

    Expected usage: + * + *

    {@code
    + * MutateRowsAttemptCallable attemptCallable = new MutateRowsAttemptCallable(
    + *   stub.mutateRowsCallable().all(),
    + *   request,
    + *   clientContext.getDefaultCallContext(),
    + *   ImmutableSet.of(Code.UNAVAILABLE));
    + *
    + * RetryingExecutor retryingExecutor = ...;
    + * RetryingFuture retryingFuture = retryingExecutor.createFuture(attemptCallable);
    + * retryCallable.setExternalFuture(retryingFuture);
    + * retryCallable.call();
    + *
    + * try {
    + *   retryingFuture.get();
    + * } catch(ExecutionException executionError) {
    + *   MutateRowsException e = (MutateRowsException) executionError.getCause();
    + *
    + *   for (FailedMutation m : e.getFailedMutations() {
    + *     // handle permanent failures
    + *   }
    + * }
    + * }
    + * + *

    Package-private for internal use. + */ +class MutateRowsAttemptCallable implements Callable { + // Synthetic status for Mutations that didn't get a result (because the whole RPC failed). It will + // be exposed in MutateRowsException's FailedMutations. + private static final StatusCode LOCAL_UNKNOWN_STATUS = + new StatusCode() { + @Override + public Code getCode() { + return Code.UNKNOWN; + } + + @Override + public Object getTransportCode() { + return null; + } + }; + + // Everything needed to issue an RPC + private final UnaryCallable> innerCallable; + private final ApiCallContext callContext; + private MutateRowsRequest currentRequest; + + // Everything needed to build a retry request + private List originalIndexes; + private final Set retryableCodes; + private final List permanentFailures; + + // Parent controller + private RetryingFuture externalFuture; + + // Simple wrappers for handling result futures + private final ApiFunction, Void> attemptSuccessfulCallback = + new ApiFunction, Void>() { + @Override + public Void apply(List responses) { + handleAttemptSuccess(responses); + return null; + } + }; + + private final ApiFunction> attemptFailedCallback = + new ApiFunction>() { + @Override + public List apply(Throwable throwable) { + handleAttemptError(throwable); + return null; + } + }; + + MutateRowsAttemptCallable( + @Nonnull UnaryCallable> innerCallable, + @Nonnull MutateRowsRequest originalRequest, + @Nullable ApiCallContext callContext, + @Nonnull Set retryableCodes) { + this.innerCallable = Preconditions.checkNotNull(innerCallable); + this.currentRequest = Preconditions.checkNotNull(originalRequest); + this.callContext = callContext; + this.retryableCodes = Preconditions.checkNotNull(retryableCodes); + + permanentFailures = Lists.newArrayList(); + } + + public void setExternalFuture(RetryingFuture externalFuture) { + this.externalFuture = externalFuture; + } + + /** + * Send the current request and the parent {@link RetryingFuture} with this attempt's future. + * + *

    On RPC completion this method will preprocess all errors (both RPC level and entry level) + * and wrap them in a {@link MutateRowsException}. Please note that the results of the RPC are + * only available in the attempt future that is set on the parent {@link RetryingFuture} and the + * return of this method should just be ignored. + */ + @Override + public Void call() { + try { + Preconditions.checkNotNull( + externalFuture, "External future must be set before starting an attempt"); + + Preconditions.checkState( + currentRequest.getEntriesCount() > 0, "Request doesn't have any mutations to send"); + + // Configure the deadline + ApiCallContext currentCallContext = null; + if (callContext != null) { + currentCallContext = + callContext.withTimeout(externalFuture.getAttemptSettings().getRpcTimeout()); + } + + // Handle concurrent cancellation + externalFuture.setAttemptFuture(new NonCancellableFuture()); + if (externalFuture.isDone()) { + return null; + } + + // Make the actual call + ApiFuture> innerFuture = + innerCallable.futureCall(currentRequest, currentCallContext); + + // Handle RPC level errors by wrapping them in a MutateRowsException + ApiFuture> catching = + ApiFutures.catching(innerFuture, Throwable.class, attemptFailedCallback); + + // Inspect the results and either propagate the success, or prepare to retry the failed + // mutations + ApiFuture transformed = ApiFutures.transform(catching, attemptSuccessfulCallback); + + // Notify the parent of the attempt + externalFuture.setAttemptFuture(transformed); + } catch (Throwable e) { + externalFuture.setAttemptFuture(ApiFutures.immediateFailedFuture(e)); + } + + return null; + } + + /** + * Handle an RPC level failure by generating a {@link FailedMutation} for each expected entry. The + * newly generated {@link FailedMutation}s will be combined with the permanentFailures to give the + * caller the whole picture since the first call. This method will always throw a {@link + * MutateRowsException}. + */ + private void handleAttemptError(Throwable rpcError) { + ApiException entryError = createSyntheticErrorForRpcFailure(rpcError); + ImmutableList.Builder allFailures = ImmutableList.builder(); + MutateRowsRequest lastRequest = currentRequest; + + allFailures.addAll(permanentFailures); + + Builder builder = lastRequest.toBuilder().clearEntries(); + List newOriginalIndexes = Lists.newArrayList(); + + for (int i = 0; i < currentRequest.getEntriesCount(); i++) { + int origIndex = getOriginalIndex(i); + + FailedMutation failedMutation = FailedMutation.create(origIndex, entryError); + allFailures.add(failedMutation); + + if (!failedMutation.getError().isRetryable()) { + permanentFailures.add(failedMutation); + } else { + // Schedule the mutation entry for the next RPC by adding it to the request builder and + // recording its original index + newOriginalIndexes.add(origIndex); + builder.addEntries(lastRequest.getEntries(i)); + } + } + + currentRequest = builder.build(); + originalIndexes = newOriginalIndexes; + + throw new MutateRowsException(rpcError, allFailures.build(), entryError.isRetryable()); + } + + /** + * Handle entry level failures. All new response entries are inspected for failure. If any + * transient failures are found, their corresponding mutations are scheduled for the next RPC. The + * caller is notified of both new found errors and pre-existing permanent errors in the thrown + * {@link MutateRowsException}. If no errors exist, then the attempt future is successfully + * completed. + */ + private void handleAttemptSuccess(List responses) { + List allFailures = Lists.newArrayList(permanentFailures); + MutateRowsRequest lastRequest = currentRequest; + + Builder builder = lastRequest.toBuilder().clearEntries(); + List newOriginalIndexes = Lists.newArrayList(); + + for (MutateRowsResponse response : responses) { + for (Entry entry : response.getEntriesList()) { + if (entry.getStatus().getCode() == Code.OK_VALUE) { + continue; + } + + int origIndex = getOriginalIndex((int) entry.getIndex()); + + FailedMutation failedMutation = + FailedMutation.create(origIndex, createEntryError(entry.getStatus())); + + allFailures.add(failedMutation); + + if (!failedMutation.getError().isRetryable()) { + permanentFailures.add(failedMutation); + } else { + // Schedule the mutation entry for the next RPC by adding it to the request builder and + // recording it's original index + newOriginalIndexes.add(origIndex); + builder.addEntries(lastRequest.getEntries((int) entry.getIndex())); + } + } + } + + currentRequest = builder.build(); + originalIndexes = newOriginalIndexes; + + if (!allFailures.isEmpty()) { + boolean isRetryable = builder.getEntriesCount() > 0; + throw new MutateRowsException(null, allFailures, isRetryable); + } + } + + /** + * Remap the entry index in the current attempt back to its original index in the first request. + */ + private int getOriginalIndex(int index) { + return (originalIndexes != null) ? originalIndexes.get(index) : index; + } + + /** Convert an entry's status from a protobuf to an {@link ApiException}. */ + private ApiException createEntryError(com.google.rpc.Status protoStatus) { + io.grpc.Status grpcStatus = + io.grpc.Status.fromCodeValue(protoStatus.getCode()) + .withDescription(protoStatus.getMessage()); + + StatusCode gaxStatusCode = GrpcStatusCode.of(grpcStatus.getCode()); + + return ApiExceptionFactory.createException( + grpcStatus.asRuntimeException(), + gaxStatusCode, + retryableCodes.contains(gaxStatusCode.getCode())); + } + + /** + * Create a synthetic {@link ApiException} for an individual entry. When the entire RPC fails, it + * implies that all entries failed as well. This helper is used to make that behavior explicit. + * The generated exception will have the overall error as its cause. + */ + private static ApiException createSyntheticErrorForRpcFailure(Throwable overallRequestError) { + if (overallRequestError instanceof ApiException) { + ApiException requestApiException = (ApiException) overallRequestError; + + return ApiExceptionFactory.createException( + "Didn't receive a result for this mutation entry", + overallRequestError, + requestApiException.getStatusCode(), + requestApiException.isRetryable()); + } + + return ApiExceptionFactory.createException( + "Didn't receive a result for this mutation entry", + overallRequestError, + LOCAL_UNKNOWN_STATUS, + false); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsBatchingDescriptor.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsBatchingDescriptor.java index cab22ffa95b2..f9979d84d0e8 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsBatchingDescriptor.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsBatchingDescriptor.java @@ -18,52 +18,28 @@ import com.google.api.core.InternalApi; import com.google.api.gax.batching.PartitionKey; import com.google.api.gax.batching.RequestBuilder; -import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.rpc.ApiException; -import com.google.api.gax.rpc.ApiExceptionFactory; import com.google.api.gax.rpc.BatchedRequestIssuer; import com.google.api.gax.rpc.BatchingDescriptor; -import com.google.api.gax.rpc.StatusCode; import com.google.bigtable.v2.MutateRowsRequest; -import com.google.bigtable.v2.MutateRowsResponse; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; -import com.google.common.primitives.Ints; -import com.google.rpc.Code; -import com.google.rpc.Status; -import io.grpc.StatusException; -import io.grpc.StatusRuntimeException; +import com.google.cloud.bigtable.data.v2.models.MutateRowsException; +import com.google.cloud.bigtable.data.v2.models.MutateRowsException.FailedMutation; +import com.google.common.base.Function; +import com.google.common.collect.Maps; import java.util.Collection; -import java.util.Set; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; /** - * A custom implementation of a {@link BatchingDescriptor} to split individual results of a bulk - * MutateRowsResponse. Each individual result will be matched with its issuer. Since the embedded - * results bypass gax's result processing chains, this class is responsible for wrapping errors in - * {@link ApiException}s and marking each error as retryable. + * A custom implementation of a {@link BatchingDescriptor} to split individual results in a {@link + * MutateRowsException}. Each individual result will be matched with its issuer. * *

    This class is considered an internal implementation detail and not meant to be used by * applications directly. */ @InternalApi -public class MutateRowsBatchingDescriptor - implements BatchingDescriptor { - - // Shared response to notify individual issuers of a successful mutation. - private static final MutateRowsResponse OK_RESPONSE = - MutateRowsResponse.newBuilder() - .addEntries( - MutateRowsResponse.Entry.newBuilder() - .setIndex(0) - .setStatus(Status.newBuilder().setCode(Code.OK_VALUE))) - .build(); - - private final ImmutableSet retryableCodes; - - public MutateRowsBatchingDescriptor(Set retryableCodes) { - this.retryableCodes = ImmutableSet.copyOf(retryableCodes); - } - +public class MutateRowsBatchingDescriptor implements BatchingDescriptor { /** Return the target table name. This will be used to combine batcheable requests */ @Override public PartitionKey getBatchPartitionKey(MutateRowsRequest request) { @@ -76,44 +52,58 @@ public RequestBuilder getRequestBuilder() { return new MyRequestBuilder(); } - /** {@inheritDoc} */ @Override public void splitResponse( - MutateRowsResponse batchResponse, - Collection> batch) { - - // Sort the result entries by index. - Status[] sortedEntries = new Status[batchResponse.getEntriesCount()]; + Void batchResponse, Collection> batch) { - for (MutateRowsResponse.Entry entry : batchResponse.getEntriesList()) { - int index = Ints.checkedCast(entry.getIndex()); - Preconditions.checkState( - sortedEntries[index] == null, "Got multiple results for the same sub-mutation"); - sortedEntries[index] = entry.getStatus(); + for (BatchedRequestIssuer issuer : batch) { + issuer.setResponse(null); } + } - // Notify all of issuers of the corresponding result. - int i = 0; - for (BatchedRequestIssuer issuer : batch) { - Status entry = sortedEntries[i++]; - Preconditions.checkState(entry != null, "Missing result for entry"); + @Override + public void splitException( + Throwable throwable, Collection> batch) { - if (entry.getCode() == Code.OK_VALUE) { - issuer.setResponse(OK_RESPONSE); - } else { - issuer.setException(createElementException(entry)); + if (!(throwable instanceof MutateRowsException)) { + for (BatchedRequestIssuer issuer : batch) { + issuer.setException(throwable); } + return; } - } - /** {@inheritDoc} */ - @Override - public void splitException( - Throwable throwable, Collection> batch) { - throwable = createElementException(throwable); + List failedMutations = ((MutateRowsException) throwable).getFailedMutations(); + + Map errorsByIndex = + Maps.uniqueIndex( + failedMutations, + new Function() { + @Nullable + @Override + public Integer apply(@Nullable FailedMutation input) { + return input.getIndex(); + } + }); - for (BatchedRequestIssuer responder : batch) { - responder.setException(throwable); + int i = 0; + for (BatchedRequestIssuer issuer : batch) { + // NOTE: the gax batching api doesn't allow for a single issuer to get different exceptions + // for different entries. However this does not affect this client because BulkMutationBatcher + // only allows a single mutation per call. So just use the last error per entry. + ApiException lastError = null; + + for (int j = 0; j < issuer.getMessageCount(); j++) { + FailedMutation failure = errorsByIndex.get(i++); + if (failure != null) { + lastError = failure.getError(); + } + } + + if (lastError == null) { + issuer.setResponse(null); + } else { + issuer.setException(lastError); + } } } @@ -129,38 +119,6 @@ public long countBytes(MutateRowsRequest request) { return request.getSerializedSize(); } - /** Convert an element error Status into an ApiException */ - private ApiException createElementException(Status protoStatus) { - Preconditions.checkArgument(protoStatus.getCode() != Code.OK_VALUE, "OK is not an error"); - - StatusRuntimeException throwable = - io.grpc.Status.fromCodeValue(protoStatus.getCode()) - .withDescription(protoStatus.getMessage()) - .asRuntimeException(); - - return createElementException(throwable); - } - - /** Convert a Throwable into an ApiException, marking it as retryable when appropriate. */ - private ApiException createElementException(Throwable throwable) { - final io.grpc.Status.Code code; - - if (throwable instanceof ApiException) { - return (ApiException) throwable; - } else if (throwable instanceof StatusRuntimeException) { - code = ((StatusRuntimeException) throwable).getStatus().getCode(); - } else if (throwable instanceof StatusException) { - code = ((StatusException) throwable).getStatus().getCode(); - } else { - code = io.grpc.Status.Code.UNKNOWN; - } - - GrpcStatusCode gaxStatusCode = GrpcStatusCode.of(code); - boolean isRetryable = retryableCodes.contains(gaxStatusCode.getCode()); - - return ApiExceptionFactory.createException(throwable, gaxStatusCode, isRetryable); - } - /** A {@link com.google.api.gax.batching.RequestBuilder} that can aggregate MutateRowsRequest */ static class MyRequestBuilder implements RequestBuilder { private MutateRowsRequest.Builder builder; diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryingCallable.java new file mode 100644 index 000000000000..ab5b872af962 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryingCallable.java @@ -0,0 +1,76 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.mutaterows; + +import com.google.api.core.InternalApi; +import com.google.api.gax.retrying.RetryingExecutor; +import com.google.api.gax.retrying.RetryingFuture; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StatusCode; +import com.google.api.gax.rpc.StatusCode.Code; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.bigtable.v2.MutateRowsRequest; +import com.google.bigtable.v2.MutateRowsResponse; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import java.util.Set; +import javax.annotation.Nonnull; + +/** + * A UnaryCallable wrapper around {@link MutateRowsAttemptCallable}. + * + *

    This class is considered an internal implementation detail and not meant to be used by + * applications directly. + * + * @see MutateRowsAttemptCallable for more details. + */ +@InternalApi +public class MutateRowsRetryingCallable extends UnaryCallable { + private final ApiCallContext callContextPrototype; + private final ServerStreamingCallable callable; + private final RetryingExecutor executor; + private final ImmutableSet retryCodes; + + public MutateRowsRetryingCallable( + @Nonnull ApiCallContext callContextPrototype, + @Nonnull ServerStreamingCallable callable, + @Nonnull RetryingExecutor executor, + @Nonnull Set retryCodes) { + this.callContextPrototype = Preconditions.checkNotNull(callContextPrototype); + this.callable = Preconditions.checkNotNull(callable); + this.executor = Preconditions.checkNotNull(executor); + this.retryCodes = ImmutableSet.copyOf(retryCodes); + } + + @Override + public RetryingFuture futureCall(MutateRowsRequest request, ApiCallContext inputContext) { + ApiCallContext context = callContextPrototype.nullToSelf(inputContext); + MutateRowsAttemptCallable retryCallable = + new MutateRowsAttemptCallable(callable.all(), request, context, retryCodes); + + RetryingFuture retryingFuture = executor.createFuture(retryCallable); + retryCallable.setExternalFuture(retryingFuture); + retryCallable.call(); + + return retryingFuture; + } + + @Override + public String toString() { + return String.format("retrying(%s)", callable); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsSpoolingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsSpoolingCallable.java deleted file mode 100644 index 303d274ada4a..000000000000 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsSpoolingCallable.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright 2018 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.bigtable.data.v2.stub.mutaterows; - -import com.google.api.core.ApiFunction; -import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutures; -import com.google.api.core.InternalApi; -import com.google.api.gax.rpc.ApiCallContext; -import com.google.api.gax.rpc.ServerStreamingCallable; -import com.google.api.gax.rpc.UnaryCallable; -import com.google.bigtable.v2.MutateRowsRequest; -import com.google.bigtable.v2.MutateRowsResponse; -import java.util.List; - -/** - * Converts a stream of {@link MutateRowsResponse}s into a unary MutateRowsResponse. This is - * necessary to adapt Cloud Bigtable API to work with gax's batching infrastructure. - * - *

    This class is considered an internal implementation detail and not meant to be used by - * applications. - */ -@InternalApi -public class MutateRowsSpoolingCallable - extends UnaryCallable { - - private final ServerStreamingCallable inner; - - public MutateRowsSpoolingCallable( - ServerStreamingCallable inner) { - this.inner = inner; - } - - @Override - public ApiFuture futureCall( - MutateRowsRequest request, ApiCallContext context) { - ApiFuture> rawResponse = inner.all().futureCall(request, context); - - return ApiFutures.transform( - rawResponse, - new ApiFunction, MutateRowsResponse>() { - @Override - public MutateRowsResponse apply(List input) { - return convertResponse(input); - } - }); - } - - private MutateRowsResponse convertResponse(List responses) { - if (responses.size() == 1) { - return responses.get(0); - } else { - MutateRowsResponse.Builder fullResponseBuilder = MutateRowsResponse.newBuilder(); - for (MutateRowsResponse subResponse : responses) { - fullResponseBuilder.addAllEntries(subResponse.getEntriesList()); - } - return fullResponseBuilder.build(); - } - } -} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsUserFacingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsUserFacingCallable.java index 1d53dd955864..ca9773ac99c8 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsUserFacingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsUserFacingCallable.java @@ -15,14 +15,11 @@ */ package com.google.cloud.bigtable.data.v2.stub.mutaterows; -import com.google.api.core.ApiFunction; import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutures; import com.google.api.core.InternalApi; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.UnaryCallable; import com.google.bigtable.v2.MutateRowsRequest; -import com.google.bigtable.v2.MutateRowsResponse; import com.google.cloud.bigtable.data.v2.internal.RequestContext; import com.google.cloud.bigtable.data.v2.models.RowMutation; @@ -34,11 +31,11 @@ */ @InternalApi public class MutateRowsUserFacingCallable extends UnaryCallable { - private final UnaryCallable inner; + private final UnaryCallable inner; private final RequestContext requestContext; public MutateRowsUserFacingCallable( - UnaryCallable inner, RequestContext requestContext) { + UnaryCallable inner, RequestContext requestContext) { this.inner = inner; this.requestContext = requestContext; @@ -46,16 +43,6 @@ public MutateRowsUserFacingCallable( @Override public ApiFuture futureCall(RowMutation request, ApiCallContext context) { - ApiFuture rawResponse = - inner.futureCall(request.toBulkProto(requestContext), context); - - return ApiFutures.transform( - rawResponse, - new ApiFunction() { - @Override - public Void apply(MutateRowsResponse mutateRowsResponse) { - return null; - } - }); + return inner.futureCall(request.toBulkProto(requestContext), context); } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ApiResultRetryAlgorithm.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ApiResultRetryAlgorithm.java new file mode 100644 index 000000000000..776208d95026 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/ApiResultRetryAlgorithm.java @@ -0,0 +1,48 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.gaxx.retrying; + +import com.google.api.gax.retrying.ResultRetryAlgorithm; +import com.google.api.gax.retrying.TimedAttemptSettings; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.DeadlineExceededException; +import org.threeten.bp.Duration; + +public class ApiResultRetryAlgorithm implements ResultRetryAlgorithm { + // Duration to sleep on if the error is DEADLINE_EXCEEDED. + public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1); + + @Override + public TimedAttemptSettings createNextAttempt( + Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) { + if (prevThrowable != null && prevThrowable instanceof DeadlineExceededException) { + return TimedAttemptSettings.newBuilder() + .setGlobalSettings(prevSettings.getGlobalSettings()) + .setRetryDelay(prevSettings.getRetryDelay()) + .setRpcTimeout(prevSettings.getRpcTimeout()) + .setRandomizedRetryDelay(DEADLINE_SLEEP_DURATION) + .setAttemptCount(prevSettings.getAttemptCount() + 1) + .setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos()) + .build(); + } + return null; + } + + @Override + public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) { + return (prevThrowable instanceof ApiException) && ((ApiException) prevThrowable).isRetryable(); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/NonCancellableFuture.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/NonCancellableFuture.java new file mode 100644 index 000000000000..539695a1c32c --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/retrying/NonCancellableFuture.java @@ -0,0 +1,48 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.gaxx.retrying; + +import com.google.api.core.AbstractApiFuture; +import com.google.api.core.InternalApi; + +// TODO: Remove this class once the gax version becomes public +/** + * A future which cannot be cancelled from the external package. Its a copy of {@link + * com.google.api.gax.retrying.NonCancellableFuture}, which is marked as {@link InternalApi}. + * + *

    For internal use, public for technical reasons. + * + * @param future response type + */ +@InternalApi +public final class NonCancellableFuture extends AbstractApiFuture { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + void cancelPrivately() { + super.cancel(false); + } + + boolean setPrivately(ResponseT value) { + return super.set(value); + } + + boolean setExceptionPrivately(Throwable throwable) { + return super.setException(throwable); + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/BulkMutateRowsRetryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/BulkMutateRowsRetryTest.java index bbf467c4873d..a5fd4ce810d0 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/BulkMutateRowsRetryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/BulkMutateRowsRetryTest.java @@ -110,17 +110,17 @@ public void setUp() throws Exception { } @Test - public void simpleNoErrorsTest() throws Exception { + public void simpleNoErrorsTest() { service.expectations.add(RpcExpectation.create().addEntry("key1", Code.OK)); ApiFuture result = bulkMutations.add(RowMutation.create(TABLE_ID, "key1")); verifyOk(result); - assertThat(service.expectations).isEmpty(); + service.verifyOk(); } @Test - public void batchingNoErrorsTest() throws Exception { + public void batchingNoErrorsTest() { service.expectations.add( RpcExpectation.create().addEntry("key1", Code.OK).addEntry("key2", Code.OK)); @@ -130,22 +130,22 @@ public void batchingNoErrorsTest() throws Exception { verifyOk(result1); verifyOk(result2); - assertThat(service.expectations).isEmpty(); + service.verifyOk(); } @Test - public void fullRequestRetryTest() throws Exception { + public void fullRequestRetryTest() { service.expectations.add(RpcExpectation.create(Code.DEADLINE_EXCEEDED).addEntry("key1", null)); service.expectations.add(RpcExpectation.create().addEntry("key1", Code.OK)); ApiFuture result = bulkMutations.add(RowMutation.create(TABLE_ID, "key1")); verifyOk(result); - assertThat(service.expectations).isEmpty(); + service.verifyOk(); } @Test - public void partialRetryTest() throws Exception { + public void partialRetryTest() { service.expectations.add( RpcExpectation.create().addEntry("key1", Code.DEADLINE_EXCEEDED).addEntry("key2", Code.OK)); service.expectations.add(RpcExpectation.create().addEntry("key1", Code.OK)); @@ -155,11 +155,11 @@ public void partialRetryTest() throws Exception { verifyOk(result1); verifyOk(result2); - assertThat(service.expectations).isEmpty(); + service.verifyOk(); } @Test - public void partialNoRetriesTest() throws Exception { + public void partialNoRetriesTest() { service.expectations.add( RpcExpectation.create().addEntry("key1", Code.INVALID_ARGUMENT).addEntry("key2", Code.OK)); @@ -169,11 +169,11 @@ public void partialNoRetriesTest() throws Exception { verifyError(result1, StatusCode.Code.INVALID_ARGUMENT); verifyOk(result2); - assertThat(service.expectations).isEmpty(); + service.verifyOk(); } @Test - public void partialRetryFailsEventuallyTest() throws Exception { + public void partialRetryFailsEventuallyTest() { // Create a bunch of failures RpcExpectation rpcExpectation = RpcExpectation.create().addEntry("key1", Code.DEADLINE_EXCEEDED); @@ -184,11 +184,11 @@ public void partialRetryFailsEventuallyTest() throws Exception { ApiFuture result1 = bulkMutations.add(RowMutation.create(TABLE_ID, "key1")); verifyError(result1, StatusCode.Code.DEADLINE_EXCEEDED); - assertThat(service.expectations).isEmpty(); + service.verifyOk(); } @Test - public void elementCountTest() throws Exception { + public void elementCountTest() { // First request RpcExpectation rpcExpectation1 = RpcExpectation.create(); int i = 0; @@ -209,7 +209,7 @@ public void elementCountTest() throws Exception { } verifyOk(ApiFutures.allAsList(results)); - assertThat(service.expectations).isEmpty(); + service.verifyOk(); } private void verifyOk(ApiFuture result) { @@ -267,10 +267,26 @@ RpcExpectation addEntry(String key, Code code) { static class TestBigtableService extends BigtableGrpc.BigtableImplBase { Queue expectations = Queues.newArrayDeque(); + private final List errors = Lists.newArrayList(); + + void verifyOk() { + assertThat(expectations).isEmpty(); + assertThat(errors).isEmpty(); + } @Override public void mutateRows( MutateRowsRequest request, StreamObserver responseObserver) { + try { + mutateRowsUnsafe(request, responseObserver); + } catch (Throwable t) { + errors.add(t); + throw t; + } + } + + private void mutateRowsUnsafe( + MutateRowsRequest request, StreamObserver responseObserver) { RpcExpectation expectedRpc = expectations.poll(); // Make sure that this isn't an extra request. diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallableTest.java new file mode 100644 index 000000000000..ae4397c4db48 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallableTest.java @@ -0,0 +1,387 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.mutaterows; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.core.AbstractApiFuture; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.retrying.RetryingFuture; +import com.google.api.gax.retrying.TimedAttemptSettings; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.StatusCode.Code; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.api.gax.rpc.UnavailableException; +import com.google.bigtable.v2.MutateRowsRequest; +import com.google.bigtable.v2.MutateRowsRequest.Entry; +import com.google.bigtable.v2.MutateRowsResponse; +import com.google.cloud.bigtable.data.v2.models.MutateRowsException; +import com.google.cloud.bigtable.data.v2.models.MutateRowsException.FailedMutation; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; +import com.google.rpc.Status; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.threeten.bp.Duration; + +@RunWith(JUnit4.class) +public class MutateRowsAttemptCallableTest { + private static final Status OK_STATUS_PROTO = + Status.newBuilder().setCode(com.google.rpc.Code.OK_VALUE).build(); + + private static final Status TRANSIENT_ERROR_STATUS_PROTO = + Status.newBuilder().setCode(com.google.rpc.Code.UNAVAILABLE_VALUE).build(); + + private static final Status PERMENANT_ERROR_STATUS_PROTO = + Status.newBuilder().setCode(com.google.rpc.Code.INVALID_ARGUMENT_VALUE).build(); + + private MockInnerCallable innerCallable; + private Set retryCodes; + private ApiCallContext callContext; + private MockRetryingFuture parentFuture; + + @Before + public void setUp() { + innerCallable = new MockInnerCallable(); + retryCodes = ImmutableSet.of(Code.DEADLINE_EXCEEDED, Code.UNAVAILABLE); + callContext = GrpcCallContext.createDefault(); + parentFuture = new MockRetryingFuture(); + } + + @Test + public void singleEntrySuccessTest() throws Exception { + MutateRowsRequest request = + MutateRowsRequest.newBuilder().addEntries(Entry.getDefaultInstance()).build(); + innerCallable.response.add( + MutateRowsResponse.newBuilder() + .addEntries( + MutateRowsResponse.Entry.newBuilder().setIndex(0).setStatus(OK_STATUS_PROTO)) + .build()); + + MutateRowsAttemptCallable attemptCallable = + new MutateRowsAttemptCallable(innerCallable, request, callContext, retryCodes); + attemptCallable.setExternalFuture(parentFuture); + attemptCallable.call(); + + // Attempt completed successfully and the useless response has been suppressed + assertThat(parentFuture.attemptFuture.get()).isNull(); + // innerCallable received the request + assertThat(innerCallable.lastRequest).isEqualTo(request); + } + + @Test + public void mixedTest() { + // Setup the request & response + MutateRowsRequest request = + MutateRowsRequest.newBuilder() + .addEntries(Entry.getDefaultInstance()) + .addEntries(Entry.getDefaultInstance()) + .addEntries(Entry.getDefaultInstance()) + .build(); + + innerCallable.response.add( + MutateRowsResponse.newBuilder() + .addEntries( + MutateRowsResponse.Entry.newBuilder().setIndex(0).setStatus(OK_STATUS_PROTO)) + .addEntries( + MutateRowsResponse.Entry.newBuilder() + .setIndex(1) + .setStatus(TRANSIENT_ERROR_STATUS_PROTO)) + .addEntries( + MutateRowsResponse.Entry.newBuilder() + .setIndex(2) + .setStatus(PERMENANT_ERROR_STATUS_PROTO)) + .build()); + + MutateRowsAttemptCallable attemptCallable = + new MutateRowsAttemptCallable(innerCallable, request, callContext, retryCodes); + attemptCallable.setExternalFuture(parentFuture); + + // Make the only call + attemptCallable.call(); + + // Overall error expectations + Throwable actualError = null; + try { + parentFuture.attemptFuture.get(); + } catch (Throwable t) { + actualError = t.getCause(); + } + + assertThat(actualError).isInstanceOf(MutateRowsException.class); + assertThat(((MutateRowsException) actualError).isRetryable()).isTrue(); + + // Entry expectations + @SuppressWarnings("ConstantConditions") + List failedMutations = ((MutateRowsException) actualError).getFailedMutations(); + assertThat(failedMutations).hasSize(2); + + assertThat(failedMutations.get(0).getIndex()).isEqualTo(1); + assertThat(failedMutations.get(0).getError().getStatusCode().getCode()) + .isEqualTo(Code.UNAVAILABLE); + assertThat(failedMutations.get(0).getError().isRetryable()).isTrue(); + + assertThat(failedMutations.get(1).getIndex()).isEqualTo(2); + assertThat(failedMutations.get(1).getError().getStatusCode().getCode()) + .isEqualTo(Code.INVALID_ARGUMENT); + assertThat(failedMutations.get(1).getError().isRetryable()).isFalse(); + } + + @Test + public void nextAttemptTest() { + // Setup the request & response for the first call + MutateRowsRequest request = + MutateRowsRequest.newBuilder() + .addEntries(Entry.newBuilder().setRowKey(ByteString.copyFromUtf8("0-ok"))) + .addEntries(Entry.newBuilder().setRowKey(ByteString.copyFromUtf8("1-unavailable"))) + .addEntries(Entry.newBuilder().setRowKey(ByteString.copyFromUtf8("2-invalid"))) + .build(); + + innerCallable.response.add( + MutateRowsResponse.newBuilder() + .addEntries( + MutateRowsResponse.Entry.newBuilder().setIndex(0).setStatus(OK_STATUS_PROTO)) + .addEntries( + MutateRowsResponse.Entry.newBuilder() + .setIndex(1) + .setStatus(TRANSIENT_ERROR_STATUS_PROTO)) + .addEntries( + MutateRowsResponse.Entry.newBuilder() + .setIndex(2) + .setStatus(PERMENANT_ERROR_STATUS_PROTO)) + .build()); + + MutateRowsAttemptCallable attemptCallable = + new MutateRowsAttemptCallable(innerCallable, request, callContext, retryCodes); + attemptCallable.setExternalFuture(parentFuture); + + // Make the first call + attemptCallable.call(); + + // Setup the request & response for the next call + innerCallable.response = + Lists.newArrayList( + MutateRowsResponse.newBuilder() + .addEntries( + MutateRowsResponse.Entry.newBuilder().setIndex(0).setStatus(OK_STATUS_PROTO)) + .build()); + attemptCallable.call(); + + // Make sure that only the entry with the transient error is resubmitted + assertThat(innerCallable.lastRequest.getEntriesCount()).isEqualTo(1); + assertThat(innerCallable.lastRequest.getEntries(0).getRowKey()) + .isEqualTo(ByteString.copyFromUtf8("1-unavailable")); + + // Overall error expectations + Throwable actualError = null; + try { + parentFuture.attemptFuture.get(); + } catch (Throwable t) { + actualError = t.getCause(); + } + assertThat(actualError).isInstanceOf(MutateRowsException.class); + assertThat(((MutateRowsException) actualError).isRetryable()).isFalse(); + + // Entry expectations + @SuppressWarnings("ConstantConditions") + List failedMutations = ((MutateRowsException) actualError).getFailedMutations(); + assertThat(failedMutations).hasSize(1); + + assertThat(failedMutations.get(0).getIndex()).isEqualTo(2); + assertThat(failedMutations.get(0).getError().getStatusCode().getCode()) + .isEqualTo(Code.INVALID_ARGUMENT); + assertThat(failedMutations.get(0).getError().isRetryable()).isFalse(); + } + + @Test + public void rpcRetryableError() { + // Setup the request & response + MutateRowsRequest request = + MutateRowsRequest.newBuilder() + .addEntries(Entry.getDefaultInstance()) + .addEntries(Entry.getDefaultInstance()) + .build(); + + final UnavailableException rpcError = + new UnavailableException( + "fake error", null, GrpcStatusCode.of(io.grpc.Status.Code.UNAVAILABLE), true); + + UnaryCallable> innerCallable = + new UnaryCallable>() { + @Override + public ApiFuture> futureCall( + MutateRowsRequest request, ApiCallContext context) { + return ApiFutures.immediateFailedFuture(rpcError); + } + }; + + // Make the call + MutateRowsAttemptCallable attemptCallable = + new MutateRowsAttemptCallable(innerCallable, request, callContext, retryCodes); + attemptCallable.setExternalFuture(parentFuture); + attemptCallable.call(); + + // Overall expectations: retryable error + Throwable actualError = null; + try { + parentFuture.attemptFuture.get(); + } catch (Throwable t) { + actualError = t.getCause(); + } + assertThat(actualError).isInstanceOf(MutateRowsException.class); + assertThat(((MutateRowsException) actualError).isRetryable()).isTrue(); + + // Entry expectations: both entries failed with an error whose cause is the rpc error + @SuppressWarnings("ConstantConditions") + List failedMutations = ((MutateRowsException) actualError).getFailedMutations(); + assertThat(failedMutations).hasSize(2); + + assertThat(failedMutations.get(0).getIndex()).isEqualTo(0); + assertThat(failedMutations.get(0).getError().isRetryable()).isTrue(); + assertThat(failedMutations.get(0).getError().getCause()).isEqualTo(rpcError); + + assertThat(failedMutations.get(1).getIndex()).isEqualTo(1); + assertThat(failedMutations.get(1).getError().isRetryable()).isTrue(); + assertThat(failedMutations.get(1).getError().getCause()).isEqualTo(rpcError); + } + + @Test + public void rpcPermanentError() { + // Setup the request & response + MutateRowsRequest request = + MutateRowsRequest.newBuilder() + .addEntries(Entry.getDefaultInstance()) + .addEntries(Entry.getDefaultInstance()) + .build(); + + final UnavailableException rpcError = + new UnavailableException( + "fake error", null, GrpcStatusCode.of(io.grpc.Status.Code.INVALID_ARGUMENT), false); + + UnaryCallable> innerCallable = + new UnaryCallable>() { + @Override + public ApiFuture> futureCall( + MutateRowsRequest request, ApiCallContext context) { + return ApiFutures.immediateFailedFuture(rpcError); + } + }; + + // Make the call + MutateRowsAttemptCallable attemptCallable = + new MutateRowsAttemptCallable(innerCallable, request, callContext, retryCodes); + attemptCallable.setExternalFuture(parentFuture); + attemptCallable.call(); + + // Overall expectations: retryable error + Throwable actualError = null; + try { + parentFuture.attemptFuture.get(); + } catch (Throwable t) { + actualError = t.getCause(); + } + assertThat(actualError).isInstanceOf(MutateRowsException.class); + assertThat(((MutateRowsException) actualError).isRetryable()).isFalse(); + + // Entry expectations: both entries failed with an error whose cause is the rpc error + @SuppressWarnings("ConstantConditions") + List failedMutations = ((MutateRowsException) actualError).getFailedMutations(); + assertThat(failedMutations).hasSize(2); + + assertThat(failedMutations.get(0).getIndex()).isEqualTo(0); + assertThat(failedMutations.get(0).getError().isRetryable()).isFalse(); + assertThat(failedMutations.get(0).getError().getCause()).isEqualTo(rpcError); + + assertThat(failedMutations.get(1).getIndex()).isEqualTo(1); + assertThat(failedMutations.get(1).getError().isRetryable()).isFalse(); + assertThat(failedMutations.get(1).getError().getCause()).isEqualTo(rpcError); + } + + // + static class MockInnerCallable + extends UnaryCallable> { + List response = Lists.newArrayList(); + MutateRowsRequest lastRequest; + ApiCallContext lastContext; + + @Override + public ApiFuture> futureCall( + MutateRowsRequest request, ApiCallContext context) { + lastRequest = request; + lastContext = context; + + return ApiFutures.immediateFuture(response); + } + } + + static class MockRetryingFuture extends AbstractApiFuture implements RetryingFuture { + ApiFuture attemptFuture; + + final TimedAttemptSettings timedAttemptSettings; + + MockRetryingFuture() { + this(Duration.ofSeconds(5)); + } + + MockRetryingFuture(Duration totalTimeout) { + this.timedAttemptSettings = + TimedAttemptSettings.newBuilder() + .setRpcTimeout(Duration.ofSeconds(1)) + .setRetryDelay(Duration.ZERO) + .setRandomizedRetryDelay(Duration.ZERO) + .setAttemptCount(0) + .setFirstAttemptStartTimeNanos(0) + .setGlobalSettings(RetrySettings.newBuilder().setTotalTimeout(totalTimeout).build()) + .build(); + } + + @Override + public void setAttemptFuture(ApiFuture attemptFuture) { + this.attemptFuture = attemptFuture; + } + + @Override + public TimedAttemptSettings getAttemptSettings() { + return timedAttemptSettings; + } + + @Override + public Callable getCallable() { + throw new UnsupportedOperationException("not used"); + } + + @Override + public ApiFuture peekAttemptResult() { + throw new UnsupportedOperationException("not used"); + } + + @Override + public ApiFuture getAttemptResult() { + throw new UnsupportedOperationException("not used"); + } + } + // +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsBatchingDescriptorTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsBatchingDescriptorTest.java index 52820eb19e8a..67d4ebf0a0a6 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsBatchingDescriptorTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsBatchingDescriptorTest.java @@ -23,21 +23,21 @@ import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.BatchedFuture; import com.google.api.gax.rpc.BatchedRequestIssuer; -import com.google.api.gax.rpc.StatusCode.Code; +import com.google.api.gax.rpc.DeadlineExceededException; +import com.google.api.gax.rpc.UnavailableException; import com.google.bigtable.v2.MutateRowsRequest; import com.google.bigtable.v2.MutateRowsRequest.Entry; import com.google.bigtable.v2.MutateRowsRequest.Entry.Builder; -import com.google.bigtable.v2.MutateRowsResponse; import com.google.bigtable.v2.Mutation; import com.google.bigtable.v2.Mutation.SetCell; import com.google.bigtable.v2.TableName; +import com.google.cloud.bigtable.data.v2.models.MutateRowsException; +import com.google.cloud.bigtable.data.v2.models.MutateRowsException.FailedMutation; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import io.grpc.Status; -import io.grpc.StatusRuntimeException; import java.util.List; -import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -50,14 +50,12 @@ public class MutateRowsBatchingDescriptorTest { private static final TableName TABLE_NAME = TableName.of("fake-project", "fake-instance", "fake-table"); - private static final Set RETRYABLE_CODES = - ImmutableSet.of(Code.UNAVAILABLE, Code.DEADLINE_EXCEEDED, Code.RESOURCE_EXHAUSTED); private MutateRowsBatchingDescriptor descriptor; @Before - public void setUp() throws Exception { - descriptor = new MutateRowsBatchingDescriptor(RETRYABLE_CODES); + public void setUp() { + descriptor = new MutateRowsBatchingDescriptor(); } @Test @@ -106,17 +104,14 @@ public void requestBuilderTest() { } @Test - public void splitExceptionTest() throws TimeoutException, InterruptedException { - BatchedFuture result1 = BatchedFuture.create(); - BatchedRequestIssuer issuer1 = new BatchedRequestIssuer<>(result1, 1); - BatchedFuture result2 = BatchedFuture.create(); - BatchedRequestIssuer issuer2 = new BatchedRequestIssuer<>(result2, 1); - - ImmutableList> issuers = - new ImmutableList.Builder>() - .add(issuer1) - .add(issuer2) - .build(); + public void splitRpcExceptionTest() throws Exception { + BatchedFuture result1 = BatchedFuture.create(); + BatchedRequestIssuer issuer1 = new BatchedRequestIssuer<>(result1, 1); + BatchedFuture result2 = BatchedFuture.create(); + BatchedRequestIssuer issuer2 = new BatchedRequestIssuer<>(result2, 1); + + ImmutableList> issuers = + new ImmutableList.Builder>().add(issuer1).add(issuer2).build(); ApiException serverError = new ApiException(null, GrpcStatusCode.of(Status.Code.UNAVAILABLE), true); @@ -142,74 +137,30 @@ public void splitExceptionTest() throws TimeoutException, InterruptedException { } @Test - public void splitResponseOkTest() - throws InterruptedException, ExecutionException, TimeoutException { - BatchedFuture result1 = BatchedFuture.create(); - BatchedRequestIssuer issuer1 = new BatchedRequestIssuer<>(result1, 1); - BatchedFuture result2 = BatchedFuture.create(); - BatchedRequestIssuer issuer2 = new BatchedRequestIssuer<>(result2, 1); - - List> issuers = - new ImmutableList.Builder>() - .add(issuer1) - .add(issuer2) - .build(); - - MutateRowsResponse.Entry.Builder okEntryBuilder = - MutateRowsResponse.Entry.newBuilder() - .setStatus(com.google.rpc.Status.newBuilder().setCode(com.google.rpc.Code.OK_VALUE)); - - MutateRowsResponse response = - MutateRowsResponse.newBuilder() - .addEntries(okEntryBuilder.setIndex(0).build()) - .addEntries(okEntryBuilder.setIndex(1).build()) - .build(); - - descriptor.splitResponse(response, issuers); - issuer1.sendResult(); - issuer2.sendResult(); - - assertThat(result1.get(1, TimeUnit.SECONDS)) - .isEqualTo( - MutateRowsResponse.newBuilder().addEntries(okEntryBuilder.setIndex(0).build()).build()); + public void splitEntryErrorTest() throws Exception { + BatchedFuture result1 = BatchedFuture.create(); + BatchedRequestIssuer issuer1 = new BatchedRequestIssuer<>(result1, 1); + BatchedFuture result2 = BatchedFuture.create(); + BatchedRequestIssuer issuer2 = new BatchedRequestIssuer<>(result2, 1); + + List> issuers = + new ImmutableList.Builder>().add(issuer1).add(issuer2).build(); + + MutateRowsException serverError = + new MutateRowsException( + null, + Lists.newArrayList( + FailedMutation.create( + 0, + new UnavailableException( + null, GrpcStatusCode.of(Status.Code.UNAVAILABLE), true)), + FailedMutation.create( + 1, + new DeadlineExceededException( + null, GrpcStatusCode.of(Status.Code.DEADLINE_EXCEEDED), true))), + true); - assertThat(result2.get(1, TimeUnit.SECONDS)) - .isEqualTo( - MutateRowsResponse.newBuilder().addEntries(okEntryBuilder.setIndex(0).build()).build()); - } - - @Test - public void splitResponseErrorWrapTest() - throws InterruptedException, ExecutionException, TimeoutException { - MutateRowsResponse batchResponse = - MutateRowsResponse.newBuilder() - .addEntries( - MutateRowsResponse.Entry.newBuilder() - .setIndex(0) - .setStatus( - com.google.rpc.Status.newBuilder() - .setCode(com.google.rpc.Code.DEADLINE_EXCEEDED_VALUE))) - .addEntries( - MutateRowsResponse.Entry.newBuilder() - .setIndex(1) - .setStatus( - com.google.rpc.Status.newBuilder() - .setCode(com.google.rpc.Code.INTERNAL_VALUE))) - .build(); - - BatchedFuture result1 = BatchedFuture.create(); - BatchedRequestIssuer issuer1 = new BatchedRequestIssuer<>(result1, 1); - - BatchedFuture result2 = BatchedFuture.create(); - BatchedRequestIssuer issuer2 = new BatchedRequestIssuer<>(result2, 1); - - List> issuers = - new ImmutableList.Builder>() - .add(issuer1) - .add(issuer2) - .build(); - - descriptor.splitResponse(batchResponse, issuers); + descriptor.splitException(serverError, issuers); issuer1.sendResult(); issuer2.sendResult(); @@ -220,13 +171,7 @@ public void splitResponseErrorWrapTest() } catch (ExecutionException e) { actualError1 = e.getCause(); } - - assertThat(actualError1).isInstanceOf(ApiException.class); - assertThat(((ApiException) actualError1).isRetryable()).isTrue(); - - assertThat(actualError1.getCause()).isInstanceOf(StatusRuntimeException.class); - assertThat(((StatusRuntimeException) actualError1.getCause()).getStatus().getCode()) - .isEqualTo(Status.Code.DEADLINE_EXCEEDED); + assertThat(actualError1).isEqualTo(serverError.getFailedMutations().get(0).getError()); Throwable actualError2 = null; @@ -235,13 +180,27 @@ public void splitResponseErrorWrapTest() } catch (ExecutionException e) { actualError2 = e.getCause(); } + assertThat(actualError2).isEqualTo(serverError.getFailedMutations().get(1).getError()); + } + + @Test + public void splitResponseOkTest() + throws InterruptedException, ExecutionException, TimeoutException { + BatchedFuture result1 = BatchedFuture.create(); + BatchedRequestIssuer issuer1 = new BatchedRequestIssuer<>(result1, 1); + BatchedFuture result2 = BatchedFuture.create(); + BatchedRequestIssuer issuer2 = new BatchedRequestIssuer<>(result2, 1); + + List> issuers = + new ImmutableList.Builder>().add(issuer1).add(issuer2).build(); + + descriptor.splitResponse(null, issuers); + issuer1.sendResult(); + issuer2.sendResult(); - assertThat(actualError2).isInstanceOf(ApiException.class); - assertThat(((ApiException) actualError2).isRetryable()).isFalse(); + assertThat(result1.get(1, TimeUnit.SECONDS)).isEqualTo(null); - assertThat(actualError2.getCause()).isInstanceOf(StatusRuntimeException.class); - assertThat(((StatusRuntimeException) actualError2.getCause()).getStatus().getCode()) - .isEqualTo(Status.Code.INTERNAL); + assertThat(result2.get(1, TimeUnit.SECONDS)).isEqualTo(null); } private static MutateRowsRequest createRequest(int count) {