Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.models;

import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.common.base.Preconditions;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* Thrown by the MutateRows when at least one Mutation failed. If the last failure was caused by an
* RPC error (as opposed to a single entry failing), then this exception's cause will be set to that
* error and {@link #getFailedMutations()} will contain synthetic errors for all of the entries that
* were part of that RPC.
*/
public final class MutateRowsException extends ApiException {
// Synthetic status to use for this ApiException subclass.
private static final StatusCode LOCAL_STATUS =
new StatusCode() {
@Override
public Code getCode() {
return Code.INTERNAL;
}

@Override
public Object getTransportCode() {
return null;

This comment was marked as spam.

This comment was marked as spam.

}
};

private final List<FailedMutation> failedMutations;

/**
* This constructor is considered an internal implementation detail and not meant to be used by
* applications.
*/
@InternalApi
public MutateRowsException(

This comment was marked as spam.

This comment was marked as spam.

@Nullable Throwable rpcError,
@Nonnull List<FailedMutation> failedMutations,
boolean retryable) {
super("Some mutations failed to apply", rpcError, LOCAL_STATUS, retryable);
Preconditions.checkNotNull(failedMutations);
Preconditions.checkArgument(!failedMutations.isEmpty(), "failedMutations can't be empty");
this.failedMutations = failedMutations;
}

/**
* Retrieve all of the failed mutations. This list will contain failures for all of the mutations

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

* that have failed across all of the retry attempts so far.
*/
@Nonnull
public List<FailedMutation> getFailedMutations() {
return failedMutations;
}

/**
* Identifies which mutation failed and the reason it failed. The mutation is identified by it's
* index in the original request's {@link MutateRowsRequest#getEntriesList()}.
*/
@AutoValue
public abstract static class FailedMutation {
/**
* This method is considered an internal implementation detail and not meant to be used by
* applications.
*/
@InternalApi
@Nonnull
public static FailedMutation create(int index, ApiException error) {
return new AutoValue_MutateRowsException_FailedMutation(index, error);
}

/**
* The index of the mutation in the original request's {@link
* MutateRowsRequest#getEntriesList()}.
*/
public abstract int getIndex();

/**
* The error that prevented this mutation from being applied. Please note, that if the entire
* RPC attempt failed, all mutations that were part of the attempt will have take on the same

This comment was marked as spam.

This comment was marked as spam.

* error.
*/
@Nonnull
public abstract ApiException getError();

This comment was marked as spam.

This comment was marked as spam.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingExecutor;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.rpc.BatchingCallSettings;
import com.google.api.gax.rpc.Callables;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.SampleRowKeysRequest;
import com.google.bigtable.v2.SampleRowKeysResponse;
Expand All @@ -40,13 +40,14 @@
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsSpoolingCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsUserFacingCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import java.io.IOException;
import java.util.List;
import org.threeten.bp.Duration;
Expand Down Expand Up @@ -251,28 +252,41 @@ private UnaryCallable<RowMutation, Void> createMutateRowCallable() {
* <li>Convert a {@link RowMutation} into a {@link MutateRowsRequest} with a single entry.
* <li>Using gax's {@link com.google.api.gax.rpc.BatchingCallable} to spool the requests and
* aggregate the {@link MutateRowsRequest.Entry}s.
* <li>Spool the streamed responses.
* <li>Process the response and schedule retries. At the end of each attempt, entries that have
* been applied, are filtered from the next attempt. Also, any entries that failed with a
* nontransient error, are filtered from the next attempt. This will continue until there
* are no more entries or there are no more retry attempts left.
* <li>Wrap batch failures in a {@link
* com.google.cloud.bigtable.data.v2.models.MutateRowsException}.
* <li>Split the responses using {@link MutateRowsBatchingDescriptor}.
* <li>Apply retries to individual mutations
* </ul>
*/
private UnaryCallable<RowMutation, Void> createMutateRowsCallable() {
MutateRowsSpoolingCallable spooling = new MutateRowsSpoolingCallable(stub.mutateRowsCallable());
RetryAlgorithm<Void> retryAlgorithm =
new RetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
new ExponentialRetryAlgorithm(
settings.mutateRowsSettings().getRetrySettings(), clientContext.getClock()));
RetryingExecutor<Void> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());

UnaryCallable<MutateRowsRequest, Void> retrying =
new MutateRowsRetryingCallable(
clientContext.getDefaultCallContext(),
stub.mutateRowsCallable(),
retryingExecutor,
settings.mutateRowsSettings().getRetryableCodes());

// recreate BatchingCallSettings with the correct descriptor
BatchingCallSettings.Builder<MutateRowsRequest, MutateRowsResponse> batchingCallSettings =
BatchingCallSettings.newBuilder(
new MutateRowsBatchingDescriptor(settings.mutateRowsSettings().getRetryableCodes()))
BatchingCallSettings.Builder<MutateRowsRequest, Void> batchingCallSettings =
BatchingCallSettings.newBuilder(new MutateRowsBatchingDescriptor())
.setBatchingSettings(settings.mutateRowsSettings().getBatchingSettings());

UnaryCallable<MutateRowsRequest, MutateRowsResponse> batching =
Callables.batching(spooling, batchingCallSettings.build(), clientContext);

UnaryCallable<MutateRowsRequest, MutateRowsResponse> retrying =
Callables.retrying(batching, settings.mutateRowsSettings(), clientContext);
UnaryCallable<MutateRowsRequest, Void> batching =
Callables.batching(retrying, batchingCallSettings.build(), clientContext);

MutateRowsUserFacingCallable userFacing =
new MutateRowsUserFacingCallable(retrying, requestContext);
new MutateRowsUserFacingCallable(batching, requestContext);

return userFacing.withDefaultCallContext(clientContext.getDefaultCallContext());
}
Expand Down
Loading