From 351aae9e999f03b43c31b30490a3ed44991aaad9 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Thu, 1 Mar 2018 23:10:02 -0500 Subject: [PATCH 1/9] BIgtable: 11. Implement ReadRows retries --- google-cloud-bigtable/pom.xml | 4 + .../data/v2/stub/EnhancedBigtableStub.java | 29 +- .../readrows/ReadRowsResumptionStrategy.java | 151 ++++++++++ .../data/v2/stub/readrows/package-info.java | 40 +++ .../v2/stub/readrows/ReadRowsRetryTest.java | 273 ++++++++++++++++++ 5 files changed, 496 insertions(+), 1 deletion(-) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/package-info.java create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java diff --git a/google-cloud-bigtable/pom.xml b/google-cloud-bigtable/pom.xml index a1b58577b547..9a7cc9c9a475 100644 --- a/google-cloud-bigtable/pom.xml +++ b/google-cloud-bigtable/pom.xml @@ -91,6 +91,10 @@ testlib test + + io.grpc + grpc-testing + diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index a10f12e4a2eb..d7abc7630418 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -17,9 +17,11 @@ import com.google.api.core.ApiFuture; import com.google.api.core.InternalApi; +import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.Callables; import com.google.api.gax.rpc.ClientContext; +import com.google.api.gax.rpc.ServerStreamingCallSettings; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.UnaryCallable; import com.google.bigtable.v2.ReadRowsRequest; @@ -35,10 +37,12 @@ import com.google.cloud.bigtable.data.v2.models.RowAdapter; import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable; +import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable; import java.io.IOException; import java.util.List; +import org.threeten.bp.Duration; /** * The core client that converts method calls to RPCs. @@ -75,6 +79,15 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings) .setEndpoint(settings.getEndpoint()) .setCredentialsProvider(settings.getCredentialsProvider()); + // ReadRow retries are handled in the overlay: disable retries in the base layer (but make + // sure to preserve the exception callable settings. + baseSettingsBuilder + .readRowsSettings() + .setSimpleTimeoutNoRetries(Duration.ofHours(2)) + .setRetryableCodes(settings.readRowsSettings().getRetryableCodes()) + .setTimeoutCheckInterval(Duration.ZERO) + .setIdleTimeout(Duration.ZERO); + // SampleRowKeys retries are handled in the overlay: disable retries in the base layer (but make // sure to preserve the exception callable settings. baseSettingsBuilder @@ -147,7 +160,21 @@ public ServerStreamingCallable createReadRowsCallable( ServerStreamingCallable merging = new RowMergingCallable<>(stub.readRowsCallable(), rowAdapter); - FilterMarkerRowsCallable filtering = new FilterMarkerRowsCallable<>(merging, rowAdapter); + // Copy settings for the middle ReadRowsRequest -> RowT callable (as opposed to the outer + // Query -> RowT callable or the inner ReadRowsRequest -> ReadRowsResponse callable). + ServerStreamingCallSettings innerSettings = + ServerStreamingCallSettings.newBuilder() + .setResumptionStrategy(new ReadRowsResumptionStrategy<>(rowAdapter)) + .setRetryableCodes(settings.readRowsSettings().getRetryableCodes()) + .setRetrySettings(settings.readRowsSettings().getRetrySettings()) + .setTimeoutCheckInterval(settings.readRowsSettings().getTimeoutCheckInterval()) + .setIdleTimeout(settings.readRowsSettings().getIdleTimeout()) + .build(); + + ServerStreamingCallable retrying = + Callables.retrying(merging, innerSettings, clientContext); + + FilterMarkerRowsCallable filtering = new FilterMarkerRowsCallable<>(retrying, rowAdapter); ServerStreamingCallable withContext = filtering.withDefaultCallContext(clientContext.getDefaultCallContext()); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java new file mode 100644 index 000000000000..f799e82237f2 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java @@ -0,0 +1,151 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.readrows; + +import com.google.api.core.InternalApi; +import com.google.api.gax.retrying.StreamResumptionStrategy; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.ReadRowsRequest.Builder; +import com.google.bigtable.v2.RowRange; +import com.google.bigtable.v2.RowSet; +import com.google.cloud.bigtable.data.v2.internal.ByteStringComparator; +import com.google.cloud.bigtable.data.v2.models.RowAdapter; +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; + +/** + * An implementation of a {@link StreamResumptionStrategy} for merged rows. This class tracks the + * last complete row seen and upon retry can build a request to resume the stream from were it left + * off. + * + *

This class is considered an internal implementation detail and not meant to be used by + * applications. + */ +@InternalApi +public class ReadRowsResumptionStrategy + implements StreamResumptionStrategy { + private final RowAdapter rowAdapter; + private ByteString lastKey = ByteString.EMPTY; + private long numProcessed = 0; + + public ReadRowsResumptionStrategy(RowAdapter rowAdapter) { + this.rowAdapter = rowAdapter; + } + + @Override + public boolean canResume() { + return true; + } + + @Override + public StreamResumptionStrategy createNew() { + return new ReadRowsResumptionStrategy<>(rowAdapter); + } + + @Override + public void onProgress(RowT response) { + // Last key can come from both the last processed row key and a synthetic row marker. The + // synthetic row marker is emitted when the server has read a lot of data that was filtered out. + // So it can trim the start of the scan, but does not contribute to the row limit. + lastKey = rowAdapter.getKey(response); + if (!rowAdapter.isScanMarkerRow(response)) { + // Only real rows count towards the rows limit. + numProcessed++; + } + } + + @Override + public ReadRowsRequest getResumeRequest(ReadRowsRequest request) { + if (lastKey.isEmpty()) { + return request; + } + Builder builder = request.toBuilder(); + + if (request.getRowsLimit() > 0) { + Preconditions.checkState( + request.getRowsLimit() >= numProcessed, + "Detected too many responses for the current row limit during a retry."); + builder.setRowsLimit(request.getRowsLimit() - numProcessed); + } + + // Reset rows to scan. + builder.clearRows(); + + RowSet.Builder newRowSet = RowSet.newBuilder(); + + // Special case: empty query implies full table scan + if (request.getRows().getRowKeysList().isEmpty() + && request.getRows().getRowRangesList().isEmpty()) { + newRowSet.addRowRanges(RowRange.newBuilder().setStartKeyOpen(lastKey).build()); + } else { + for (ByteString key : request.getRows().getRowKeysList()) { + if (ByteStringComparator.INSTANCE.compare(key, lastKey) > 0) { + newRowSet.addRowKeys(key); + } + } + + for (RowRange rowRange : request.getRows().getRowRangesList()) { + RowRange.Builder rangeBuilder = RowRange.newBuilder(); + + switch (rowRange.getEndKeyCase()) { + case END_KEY_CLOSED: + if (ByteStringComparator.INSTANCE.compare(rowRange.getEndKeyClosed(), lastKey) <= 0) { + continue; + } else { + rangeBuilder.setEndKeyClosed(rowRange.getEndKeyClosed()); + } + break; + case END_KEY_OPEN: + if (ByteStringComparator.INSTANCE.compare(rowRange.getEndKeyOpen(), lastKey) <= 0) { + continue; + } else { + rangeBuilder.setEndKeyOpen(rowRange.getEndKeyOpen()); + } + break; + case ENDKEY_NOT_SET: + rangeBuilder.clearEndKey(); + } + + switch (rowRange.getStartKeyCase()) { + case STARTKEY_NOT_SET: + rangeBuilder.setStartKeyOpen(lastKey); + break; + case START_KEY_OPEN: + if (ByteStringComparator.INSTANCE.compare(rowRange.getStartKeyOpen(), lastKey) < 0) { + rangeBuilder.setStartKeyOpen(lastKey); + } else { + rangeBuilder.setStartKeyOpen(rowRange.getStartKeyOpen()); + } + break; + case START_KEY_CLOSED: + if (ByteStringComparator.INSTANCE.compare(rowRange.getStartKeyClosed(), lastKey) <= 0) { + rangeBuilder.setStartKeyOpen(lastKey); + } else { + rangeBuilder.setStartKeyClosed(rowRange.getStartKeyClosed()); + } + break; + default: + throw new IllegalArgumentException( + "Unknown startKeyCase: " + rowRange.getStartKeyCase()); + } + newRowSet.addRowRanges(rangeBuilder.build()); + } + } + + builder.setRows(newRowSet.build()); + return builder.build(); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/package-info.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/package-info.java new file mode 100644 index 000000000000..dd763441a6b9 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/package-info.java @@ -0,0 +1,40 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Implementation details for {@link + * com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub#readRowsCallable()}. + * + *

The ReadRows protocol is optimized for transmission and cannot be consumed directly. This + * package implements significant customizations on top of the raw GAPIC generated stub to handle + * row merging and retries. + * + *

    + *
  • ReadRowsUserCallable: Creates protobuf {@link com.google.bigtable.v2.ReadRowsRequest}s from + * user facing wrappers. + *
  • RowMergingCallable (+ helpers): Implements a state machine that merges chunks into logical + * rows. The logical row representation is configurable via a RowAdapter. Please note that + * this will also emit special marker rows that help with retries in the next stage, but need + * to be filtered out. + *
  • ReadRowsTracker (+ helpers): Implements resuming retries for gax's Callables#retrying + * middleware. + *
  • FilterMarkerRowsCallable: Filters out marker rows that are used for efficient retry + * resumes. This is necessary because + *
+ * + *

This package is considered an internal implementation detail and is not meant to be used by + * applications directly. + */ +package com.google.cloud.bigtable.data.v2.stub.readrows; diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java new file mode 100644 index 000000000000..2321388d3137 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java @@ -0,0 +1,273 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.readrows; + +import com.google.api.client.util.Lists; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.ServerStream; +import com.google.bigtable.admin.v2.InstanceName; +import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.ReadRowsResponse; +import com.google.bigtable.v2.ReadRowsResponse.CellChunk; +import com.google.bigtable.v2.RowRange; +import com.google.bigtable.v2.TableName; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.common.collect.Queues; +import com.google.common.collect.Range; +import com.google.common.truth.Truth; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.StringValue; +import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcServerRule; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Queue; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ReadRowsRetryTest { + private static final InstanceName instanceName = InstanceName.of("fake-project", "fake-instance"); + + private static final TableName tableName = + TableName.of(instanceName.getProject(), instanceName.getInstance(), "fake-table"); + + @Rule public GrpcServerRule serverRule = new GrpcServerRule(); + private TestBigtableService service; + private BigtableDataClient client; + + @Before + public void setUp() throws IOException { + service = new TestBigtableService(); + serverRule.getServiceRegistry().addService(service); + + BigtableDataSettings settings = + BigtableDataSettings.newBuilder() + .setInstanceName(instanceName) + .setCredentialsProvider(NoCredentialsProvider.create()) + .setTransportChannelProvider( + FixedTransportChannelProvider.create( + GrpcTransportChannel.create(serverRule.getChannel()))) + .build(); + + client = BigtableDataClient.create(settings); + } + + @After + public void tearDown() throws Exception { + if (client != null) { + client.close(); + } + } + + @Test + public void happyPathTest() { + service.expectations.add( + RpcExpectation.create() + .expectRequest("k1") + .expectRequest(Range.closedOpen("r1", "r3")) + .respondWith("k1", "r1", "r2")); + + List actualResults = + getResults(Query.create(tableName.getTable()).rowKey("k1").range("r1", "r3")); + Truth.assertThat(actualResults).containsExactly("k1", "r1", "r2").inOrder(); + } + + @Test + public void immediateRetryTest() { + service.expectations.add( + RpcExpectation.create() + .expectRequest("k1") + .expectRequest(Range.closedOpen("r1", "r3")) + .respondWithStatus(Code.UNAVAILABLE)); + service.expectations.add( + RpcExpectation.create() + .expectRequest("k1") + .expectRequest(Range.closedOpen("r1", "r3")) + .respondWith("k1", "r1", "r2")); + + List actualResults = + getResults(Query.create(tableName.getTable()).rowKey("k1").range("r1", "r3")); + Truth.assertThat(actualResults).containsExactly("k1", "r1", "r2").inOrder(); + } + + @Test + public void multipleRetryTest() { + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.closedOpen("r1", "r9")) + .respondWith("r1", "r2", "r3", "r4") + .respondWithStatus(Code.UNAVAILABLE)); + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.open("r4", "r9")) + .respondWithStatus(Code.UNAVAILABLE)); + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.open("r4", "r9")) + .respondWith("r5", "r6", "r7") + .respondWithStatus(Code.UNAVAILABLE)); + service.expectations.add( + RpcExpectation.create().expectRequest(Range.open("r7", "r9")).respondWith("r8")); + + List actualResults = getResults(Query.create(tableName.getTable()).range("r1", "r9")); + Truth.assertThat(actualResults) + .containsExactly("r1", "r2", "r3", "r4", "r5", "r6", "r7", "r8") + .inOrder(); + } + + @Test + public void rowLimitTest() { + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.closedOpen("r1", "r3")) + .expectRowLimit(2) + .respondWith("r1") + .respondWithStatus(Code.UNAVAILABLE)); + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.open("r1", "r3")) + .expectRowLimit(1) + .respondWith("r2")); + + List actualResults = + getResults(Query.create(tableName.getTable()).range("r1", "r3").limit(2)); + Truth.assertThat(actualResults).containsExactly("r1", "r2").inOrder(); + } + + private List getResults(Query query) { + ServerStream actualRows = client.readRows(query); + List actualValues = Lists.newArrayList(); + for (Row row : actualRows) { + actualValues.add(row.getKey().toStringUtf8()); + } + return actualValues; + } + + private static class TestBigtableService extends BigtableGrpc.BigtableImplBase { + Queue expectations = Queues.newArrayDeque(); + + @Override + public void readRows( + ReadRowsRequest request, StreamObserver responseObserver) { + + RpcExpectation expectedRpc = expectations.poll(); + + Truth.assertWithMessage("Unexpected request: " + request.toString()) + .that(expectedRpc) + .isNotNull(); + Truth.assertThat(request).isEqualTo(expectedRpc.getExpectedRequest()); + + for (String key : expectedRpc.responses) { + ReadRowsResponse response = + ReadRowsResponse.newBuilder() + .addChunks( + CellChunk.newBuilder() + .setRowKey(ByteString.copyFromUtf8(key)) + .setFamilyName(StringValue.newBuilder().setValue("family")) + .setQualifier(BytesValue.newBuilder().setValue(ByteString.EMPTY)) + .setTimestampMicros(1_000) + .setValue(ByteString.copyFromUtf8("value")) + .setCommitRow(true)) + .build(); + + responseObserver.onNext(response); + } + if (expectedRpc.statusCode.toStatus().isOk()) { + responseObserver.onCompleted(); + } else { + responseObserver.onError(expectedRpc.statusCode.toStatus().asRuntimeException()); + } + } + } + + private static class RpcExpectation { + ReadRowsRequest.Builder requestBuilder; + Status.Code statusCode; + List responses; + + private RpcExpectation() { + this.requestBuilder = ReadRowsRequest.newBuilder().setTableName(tableName.toString()); + this.statusCode = Status.Code.OK; + this.responses = Lists.newArrayList(); + } + + static RpcExpectation create() { + return new RpcExpectation(); + } + + RpcExpectation expectRequest(String key) { + requestBuilder.getRowsBuilder().addRowKeys(ByteString.copyFromUtf8(key)); + return this; + } + + RpcExpectation expectRequest(Range range) { + RowRange.Builder rowRange = requestBuilder.getRowsBuilder().addRowRangesBuilder(); + + switch (range.lowerBoundType()) { + case CLOSED: + rowRange.setStartKeyClosed(ByteString.copyFromUtf8(range.lowerEndpoint())); + break; + case OPEN: + rowRange.setStartKeyOpen(ByteString.copyFromUtf8(range.lowerEndpoint())); + break; + } + switch (range.upperBoundType()) { + case CLOSED: + rowRange.setEndKeyClosed(ByteString.copyFromUtf8(range.upperEndpoint())); + break; + case OPEN: + rowRange.setEndKeyOpen(ByteString.copyFromUtf8(range.upperEndpoint())); + break; + } + + return this; + } + + RpcExpectation expectRowLimit(int limit) { + requestBuilder.setRowsLimit(limit); + return this; + } + + RpcExpectation respondWithStatus(Status.Code code) { + this.statusCode = code; + return this; + } + + RpcExpectation respondWith(String... responses) { + this.responses.addAll(Arrays.asList(responses)); + return this; + } + + ReadRowsRequest getExpectedRequest() { + return requestBuilder.build(); + } + } +} From fd8ced6db99281232e89d9edd3ba84c7e909841f Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Thu, 1 Mar 2018 23:48:27 -0500 Subject: [PATCH 2/9] add default case --- .../data/v2/stub/readrows/ReadRowsResumptionStrategy.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java index f799e82237f2..77f1899908ef 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java @@ -117,6 +117,8 @@ public ReadRowsRequest getResumeRequest(ReadRowsRequest request) { break; case ENDKEY_NOT_SET: rangeBuilder.clearEndKey(); + default: + throw new IllegalArgumentException("Unknown endKeyCase: " + rowRange.getEndKeyCase()); } switch (rowRange.getStartKeyCase()) { From d1b8d8a04c43edf59716b7e2fa31952353858a0e Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Fri, 2 Mar 2018 08:33:50 -0500 Subject: [PATCH 3/9] fix scope for testing dep --- google-cloud-bigtable/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/google-cloud-bigtable/pom.xml b/google-cloud-bigtable/pom.xml index 9a7cc9c9a475..d4cd857245c6 100644 --- a/google-cloud-bigtable/pom.xml +++ b/google-cloud-bigtable/pom.xml @@ -94,6 +94,7 @@ io.grpc grpc-testing + test From afe93219bbc7746762d035337c00f22278ecc6a1 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Fri, 2 Mar 2018 08:40:16 -0500 Subject: [PATCH 4/9] codacy fixes --- .../data/v2/stub/readrows/ReadRowsResumptionStrategy.java | 2 +- .../bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java index 77f1899908ef..a24cd37e305e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java @@ -39,7 +39,7 @@ public class ReadRowsResumptionStrategy implements StreamResumptionStrategy { private final RowAdapter rowAdapter; private ByteString lastKey = ByteString.EMPTY; - private long numProcessed = 0; + private long numProcessed; public ReadRowsResumptionStrategy(RowAdapter rowAdapter) { this.rowAdapter = rowAdapter; diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java index 2321388d3137..494ff6813488 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java @@ -238,6 +238,9 @@ RpcExpectation expectRequest(Range range) { case OPEN: rowRange.setStartKeyOpen(ByteString.copyFromUtf8(range.lowerEndpoint())); break; + default: + throw new IllegalArgumentException( + "Unexpected lowerBoundType: " + range.lowerBoundType()); } switch (range.upperBoundType()) { case CLOSED: @@ -246,6 +249,9 @@ RpcExpectation expectRequest(Range range) { case OPEN: rowRange.setEndKeyOpen(ByteString.copyFromUtf8(range.upperEndpoint())); break; + default: + throw new IllegalArgumentException( + "Unexpected upperBoundType: " + range.upperBoundType()); } return this; From d8887c4b9d0a1368a7dba34468312d6acea093d1 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Mon, 5 Mar 2018 09:11:36 -0500 Subject: [PATCH 5/9] address feedback --- .../cloud/bigtable/data/v2/models/Range.java | 4 +- .../readrows/ReadRowsResumptionStrategy.java | 133 ++++++------ .../data/v2/stub/readrows/package-info.java | 3 +- .../v2/stub/readrows/ReadRowsRetryTest.java | 195 ++++++++++++++---- 4 files changed, 231 insertions(+), 104 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Range.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Range.java index fb0a7cc09f45..226eaa73318a 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Range.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Range.java @@ -15,6 +15,7 @@ */ package com.google.cloud.bigtable.data.v2.models; +import com.google.api.core.InternalExtensionOnly; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import javax.annotation.Nonnull; @@ -40,7 +41,8 @@ * ByteStringRange r2 = r1.clone().endUnbounded(); * } */ -abstract class Range> { +@InternalExtensionOnly +public abstract class Range> { public enum BoundType { OPEN, CLOSED, diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java index a24cd37e305e..7ade3c863c22 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java @@ -28,7 +28,7 @@ /** * An implementation of a {@link StreamResumptionStrategy} for merged rows. This class tracks the - * last complete row seen and upon retry can build a request to resume the stream from were it left + * last complete row seen and upon retry can build a request to resume the stream from where it left * off. * *

This class is considered an internal implementation detail and not meant to be used by @@ -39,6 +39,7 @@ public class ReadRowsResumptionStrategy implements StreamResumptionStrategy { private final RowAdapter rowAdapter; private ByteString lastKey = ByteString.EMPTY; + // Number of rows processed excluding Marker row. private long numProcessed; public ReadRowsResumptionStrategy(RowAdapter rowAdapter) { @@ -67,87 +68,103 @@ public void onProgress(RowT response) { } } + /** + * {@inheritDoc} + * + *

Given a request, this implementation will narrow that request to exclude all row keys and + * ranges that would produce rows that come before {@link #lastKey}. Furthermore this + * implementation takes care to update the row limit of the request to account for all of the + * received rows. + */ @Override public ReadRowsRequest getResumeRequest(ReadRowsRequest request) { + // An empty lastKey means that we have not successfully read the first row, + // resume with the original request object. if (lastKey.isEmpty()) { return request; } Builder builder = request.toBuilder(); + // NOTE: It is considered an an unrecoverable error for the server to send all of the rows, + // followed by an error status. if (request.getRowsLimit() > 0) { Preconditions.checkState( - request.getRowsLimit() >= numProcessed, - "Detected too many responses for the current row limit during a retry."); + request.getRowsLimit() > numProcessed, + "Detected too many rows for the current row limit during a retry."); builder.setRowsLimit(request.getRowsLimit() - numProcessed); } // Reset rows to scan. builder.clearRows(); - RowSet.Builder newRowSet = RowSet.newBuilder(); + RowSet.Builder rowSetBuilder = RowSet.newBuilder(); // Special case: empty query implies full table scan if (request.getRows().getRowKeysList().isEmpty() && request.getRows().getRowRangesList().isEmpty()) { - newRowSet.addRowRanges(RowRange.newBuilder().setStartKeyOpen(lastKey).build()); - } else { - for (ByteString key : request.getRows().getRowKeysList()) { - if (ByteStringComparator.INSTANCE.compare(key, lastKey) > 0) { - newRowSet.addRowKeys(key); - } + rowSetBuilder.addRowRanges(RowRange.newBuilder().setStartKeyOpen(lastKey).build()); + + builder.setRows(rowSetBuilder.build()); + return builder.build(); + } + + // Normal flow: narrow the request keys & ranges + for (ByteString key : request.getRows().getRowKeysList()) { + if (ByteStringComparator.INSTANCE.compare(key, lastKey) > 0) { + rowSetBuilder.addRowKeys(key); + } + } + + for (RowRange rowRange : request.getRows().getRowRangesList()) { + RowRange.Builder rowRangeBuilder = RowRange.newBuilder(); + + switch (rowRange.getEndKeyCase()) { + case END_KEY_CLOSED: + if (ByteStringComparator.INSTANCE.compare(rowRange.getEndKeyClosed(), lastKey) <= 0) { + continue; + } else { + rowRangeBuilder.setEndKeyClosed(rowRange.getEndKeyClosed()); + } + break; + case END_KEY_OPEN: + if (ByteStringComparator.INSTANCE.compare(rowRange.getEndKeyOpen(), lastKey) <= 0) { + continue; + } else { + rowRangeBuilder.setEndKeyOpen(rowRange.getEndKeyOpen()); + } + break; + case ENDKEY_NOT_SET: + rowRangeBuilder.clearEndKey(); + break; + default: + throw new IllegalArgumentException("Unknown endKeyCase: " + rowRange.getEndKeyCase()); } - for (RowRange rowRange : request.getRows().getRowRangesList()) { - RowRange.Builder rangeBuilder = RowRange.newBuilder(); - - switch (rowRange.getEndKeyCase()) { - case END_KEY_CLOSED: - if (ByteStringComparator.INSTANCE.compare(rowRange.getEndKeyClosed(), lastKey) <= 0) { - continue; - } else { - rangeBuilder.setEndKeyClosed(rowRange.getEndKeyClosed()); - } - break; - case END_KEY_OPEN: - if (ByteStringComparator.INSTANCE.compare(rowRange.getEndKeyOpen(), lastKey) <= 0) { - continue; - } else { - rangeBuilder.setEndKeyOpen(rowRange.getEndKeyOpen()); - } - break; - case ENDKEY_NOT_SET: - rangeBuilder.clearEndKey(); - default: - throw new IllegalArgumentException("Unknown endKeyCase: " + rowRange.getEndKeyCase()); - } - - switch (rowRange.getStartKeyCase()) { - case STARTKEY_NOT_SET: - rangeBuilder.setStartKeyOpen(lastKey); - break; - case START_KEY_OPEN: - if (ByteStringComparator.INSTANCE.compare(rowRange.getStartKeyOpen(), lastKey) < 0) { - rangeBuilder.setStartKeyOpen(lastKey); - } else { - rangeBuilder.setStartKeyOpen(rowRange.getStartKeyOpen()); - } - break; - case START_KEY_CLOSED: - if (ByteStringComparator.INSTANCE.compare(rowRange.getStartKeyClosed(), lastKey) <= 0) { - rangeBuilder.setStartKeyOpen(lastKey); - } else { - rangeBuilder.setStartKeyClosed(rowRange.getStartKeyClosed()); - } - break; - default: - throw new IllegalArgumentException( - "Unknown startKeyCase: " + rowRange.getStartKeyCase()); - } - newRowSet.addRowRanges(rangeBuilder.build()); + switch (rowRange.getStartKeyCase()) { + case STARTKEY_NOT_SET: + rowRangeBuilder.setStartKeyOpen(lastKey); + break; + case START_KEY_OPEN: + if (ByteStringComparator.INSTANCE.compare(rowRange.getStartKeyOpen(), lastKey) < 0) { + rowRangeBuilder.setStartKeyOpen(lastKey); + } else { + rowRangeBuilder.setStartKeyOpen(rowRange.getStartKeyOpen()); + } + break; + case START_KEY_CLOSED: + if (ByteStringComparator.INSTANCE.compare(rowRange.getStartKeyClosed(), lastKey) <= 0) { + rowRangeBuilder.setStartKeyOpen(lastKey); + } else { + rowRangeBuilder.setStartKeyClosed(rowRange.getStartKeyClosed()); + } + break; + default: + throw new IllegalArgumentException("Unknown startKeyCase: " + rowRange.getStartKeyCase()); } + rowSetBuilder.addRowRanges(rowRangeBuilder.build()); } - builder.setRows(newRowSet.build()); + builder.setRows(rowSetBuilder.build()); return builder.build(); } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/package-info.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/package-info.java index dd763441a6b9..4502800762fa 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/package-info.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/package-info.java @@ -28,8 +28,7 @@ * rows. The logical row representation is configurable via a RowAdapter. Please note that * this will also emit special marker rows that help with retries in the next stage, but need * to be filtered out. - *

  • ReadRowsTracker (+ helpers): Implements resuming retries for gax's Callables#retrying - * middleware. + *
  • RowMerger (+ helpers): Implements resuming retries for gax's Callables#retrying middleware. *
  • FilterMarkerRowsCallable: Filters out marker rows that are used for efficient retry * resumes. This is necessary because * diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java index 494ff6813488..9a3827a28988 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java @@ -30,6 +30,7 @@ import com.google.cloud.bigtable.data.v2.BigtableDataClient; import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.cloud.bigtable.data.v2.models.Row; import com.google.common.collect.Queues; import com.google.common.collect.Range; @@ -42,7 +43,6 @@ import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcServerRule; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.Queue; import org.junit.After; @@ -82,9 +82,7 @@ public void setUp() throws IOException { @After public void tearDown() throws Exception { - if (client != null) { - client.close(); - } + client.close(); } @Test @@ -162,6 +160,96 @@ public void rowLimitTest() { Truth.assertThat(actualResults).containsExactly("r1", "r2").inOrder(); } + @Test + public void errorAfterRowLimitMetTest() { + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.closedOpen("r1", "r3")) + .expectRowLimit(2) + .respondWith("r1", "r2") + .respondWithStatus(Code.UNAVAILABLE)); + + Throwable error = null; + try { + getResults(Query.create(tableName.getTable()).range("r1", "r3").limit(2)); + } catch (Throwable t) { + error = t; + } + + Truth.assertThat(error.getCause()).isInstanceOf(IllegalStateException.class); + } + + @Test + public void pointTest() { + service.expectations.add( + RpcExpectation.create() + .expectRequest("r1", "r2") + .respondWith("r1") + .respondWithStatus(Code.UNAVAILABLE)); + service.expectations.add(RpcExpectation.create().expectRequest("r2").respondWith("r2")); + + List actualResults = + getResults(Query.create(tableName.getTable()).rowKey("r1").rowKey("r2")); + Truth.assertThat(actualResults).containsExactly("r1", "r2").inOrder(); + } + + @Test + public void fullTableScanTest() { + service.expectations.add( + RpcExpectation.create().respondWith("r1").respondWithStatus(Code.UNAVAILABLE)); + service.expectations.add( + RpcExpectation.create().expectRequest(Range.greaterThan("r1")).respondWith("r2")); + List actualResults = getResults(Query.create(tableName.getTable())); + Truth.assertThat(actualResults).containsExactly("r1", "r2").inOrder(); + } + + @Test + public void retryUnboundedStartTest() { + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.lessThan("r9")) + .respondWith("r1") + .respondWithStatus(Code.UNAVAILABLE)); + service.expectations.add( + RpcExpectation.create().expectRequest(Range.open("r1", "r9")).respondWith("r2")); + + List actualResults = + getResults( + Query.create(tableName.getTable()).range(ByteStringRange.unbounded().endOpen("r9"))); + Truth.assertThat(actualResults).containsExactly("r1", "r2").inOrder(); + } + + @Test + public void retryUnboundedEndTest() { + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.atLeast("r1")) + .respondWith("r1") + .respondWithStatus(Code.UNAVAILABLE)); + service.expectations.add( + RpcExpectation.create().expectRequest(Range.greaterThan("r1")).respondWith("r2")); + + List actualResults = + getResults( + Query.create(tableName.getTable()) + .range(ByteStringRange.unbounded().startClosed("r1"))); + Truth.assertThat(actualResults).containsExactly("r1", "r2").inOrder(); + } + + @Test + public void retryWithLastScannedKeyTest() { + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.closedOpen("r1", "r9")) + .respondWithLastScannedKey("r5") + .respondWithStatus(Code.UNAVAILABLE)); + service.expectations.add( + RpcExpectation.create().expectRequest(Range.open("r5", "r9")).respondWith("r7")); + List actualResults = + getResults(Query.create(tableName.getTable()).range(ByteStringRange.create("r1", "r9"))); + Truth.assertThat(actualResults).containsExactly("r7").inOrder(); + } + private List getResults(Query query) { ServerStream actualRows = client.readRows(query); List actualValues = Lists.newArrayList(); @@ -173,31 +261,23 @@ private List getResults(Query query) { private static class TestBigtableService extends BigtableGrpc.BigtableImplBase { Queue expectations = Queues.newArrayDeque(); + int i = -1; @Override public void readRows( ReadRowsRequest request, StreamObserver responseObserver) { RpcExpectation expectedRpc = expectations.poll(); + i++; - Truth.assertWithMessage("Unexpected request: " + request.toString()) + Truth.assertWithMessage("Unexpected request#" + i + ":" + request.toString()) .that(expectedRpc) .isNotNull(); - Truth.assertThat(request).isEqualTo(expectedRpc.getExpectedRequest()); - - for (String key : expectedRpc.responses) { - ReadRowsResponse response = - ReadRowsResponse.newBuilder() - .addChunks( - CellChunk.newBuilder() - .setRowKey(ByteString.copyFromUtf8(key)) - .setFamilyName(StringValue.newBuilder().setValue("family")) - .setQualifier(BytesValue.newBuilder().setValue(ByteString.EMPTY)) - .setTimestampMicros(1_000) - .setValue(ByteString.copyFromUtf8("value")) - .setCommitRow(true)) - .build(); + Truth.assertWithMessage("Unexpected request#" + i) + .that(request) + .isEqualTo(expectedRpc.getExpectedRequest()); + for (ReadRowsResponse response : expectedRpc.responses) { responseObserver.onNext(response); } if (expectedRpc.statusCode.toStatus().isOk()) { @@ -211,7 +291,7 @@ public void readRows( private static class RpcExpectation { ReadRowsRequest.Builder requestBuilder; Status.Code statusCode; - List responses; + List responses; private RpcExpectation() { this.requestBuilder = ReadRowsRequest.newBuilder().setTableName(tableName.toString()); @@ -223,35 +303,46 @@ static RpcExpectation create() { return new RpcExpectation(); } - RpcExpectation expectRequest(String key) { - requestBuilder.getRowsBuilder().addRowKeys(ByteString.copyFromUtf8(key)); + RpcExpectation expectRequest(String... keys) { + for (String key : keys) { + requestBuilder.getRowsBuilder().addRowKeys(ByteString.copyFromUtf8(key)); + } return this; } RpcExpectation expectRequest(Range range) { RowRange.Builder rowRange = requestBuilder.getRowsBuilder().addRowRangesBuilder(); - switch (range.lowerBoundType()) { - case CLOSED: - rowRange.setStartKeyClosed(ByteString.copyFromUtf8(range.lowerEndpoint())); - break; - case OPEN: - rowRange.setStartKeyOpen(ByteString.copyFromUtf8(range.lowerEndpoint())); - break; - default: - throw new IllegalArgumentException( - "Unexpected lowerBoundType: " + range.lowerBoundType()); + if (range.hasLowerBound()) { + switch (range.lowerBoundType()) { + case CLOSED: + rowRange.setStartKeyClosed(ByteString.copyFromUtf8(range.lowerEndpoint())); + break; + case OPEN: + rowRange.setStartKeyOpen(ByteString.copyFromUtf8(range.lowerEndpoint())); + break; + default: + throw new IllegalArgumentException( + "Unexpected lowerBoundType: " + range.lowerBoundType()); + } + } else { + rowRange.clearStartKey(); } - switch (range.upperBoundType()) { - case CLOSED: - rowRange.setEndKeyClosed(ByteString.copyFromUtf8(range.upperEndpoint())); - break; - case OPEN: - rowRange.setEndKeyOpen(ByteString.copyFromUtf8(range.upperEndpoint())); - break; - default: - throw new IllegalArgumentException( - "Unexpected upperBoundType: " + range.upperBoundType()); + + if (range.hasUpperBound()) { + switch (range.upperBoundType()) { + case CLOSED: + rowRange.setEndKeyClosed(ByteString.copyFromUtf8(range.upperEndpoint())); + break; + case OPEN: + rowRange.setEndKeyOpen(ByteString.copyFromUtf8(range.upperEndpoint())); + break; + default: + throw new IllegalArgumentException( + "Unexpected upperBoundType: " + range.upperBoundType()); + } + } else { + rowRange.clearEndKey(); } return this; @@ -268,7 +359,25 @@ RpcExpectation respondWithStatus(Status.Code code) { } RpcExpectation respondWith(String... responses) { - this.responses.addAll(Arrays.asList(responses)); + for (String response : responses) { + this.responses.add( + ReadRowsResponse.newBuilder() + .addChunks( + CellChunk.newBuilder() + .setRowKey(ByteString.copyFromUtf8(response)) + .setFamilyName(StringValue.newBuilder().setValue("family").build()) + .setQualifier(BytesValue.newBuilder().setValue(ByteString.EMPTY).build()) + .setTimestampMicros(10_000) + .setValue(ByteString.copyFromUtf8("value")) + .setCommitRow(true)) + .build()); + } + return this; + } + + RpcExpectation respondWithLastScannedKey(String key) { + this.responses.add( + ReadRowsResponse.newBuilder().setLastScannedRowKey(ByteString.copyFromUtf8(key)).build()); return this; } From d0aa23a1080ab8f9dd8ee339287bb04b1711a39d Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Mon, 5 Mar 2018 10:53:51 -0500 Subject: [PATCH 6/9] handle retries of fulfilled requests --- .../data/v2/stub/EnhancedBigtableStub.java | 16 +++-- .../readrows/ReadRowsResumptionStrategy.java | 24 +++++-- .../ReadRowsRetryCompletedCallable.java | 71 +++++++++++++++++++ .../v2/stub/readrows/ReadRowsRetryTest.java | 29 ++++++-- 4 files changed, 121 insertions(+), 19 deletions(-) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryCompletedCallable.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index d7abc7630418..b34bd2523bb1 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -38,6 +38,7 @@ import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy; +import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable; import java.io.IOException; @@ -148,8 +149,7 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings) * dispatch the RPC. *
  • Upon receiving the response stream, it will merge the {@link * com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row - * implementation can be configured in {@link - * com.google.cloud.bigtable.data.v2.BigtableDataSettings}. + * implementation can be configured in by the {@code rowAdapter} parameter. *
  • Retry/resume on failure. *
  • Filter out marker rows. * @@ -171,10 +171,16 @@ public ServerStreamingCallable createReadRowsCallable( .setIdleTimeout(settings.readRowsSettings().getIdleTimeout()) .build(); - ServerStreamingCallable retrying = - Callables.retrying(merging, innerSettings, clientContext); + // Retry logic is split into 2 parts to workaround a rare edge case described in + // ReadRowsRetryCompletedCallable + ServerStreamingCallable retrying1 = + new ReadRowsRetryCompletedCallable<>(merging); - FilterMarkerRowsCallable filtering = new FilterMarkerRowsCallable<>(retrying, rowAdapter); + ServerStreamingCallable retrying2 = + Callables.retrying(retrying1, innerSettings, clientContext); + + FilterMarkerRowsCallable filtering = + new FilterMarkerRowsCallable<>(retrying2, rowAdapter); ServerStreamingCallable withContext = filtering.withDefaultCallContext(clientContext.getDefaultCallContext()); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java index 7ade3c863c22..d07510e75a07 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java @@ -85,11 +85,10 @@ public ReadRowsRequest getResumeRequest(ReadRowsRequest request) { } Builder builder = request.toBuilder(); - // NOTE: It is considered an an unrecoverable error for the server to send all of the rows, - // followed by an error status. if (request.getRowsLimit() > 0) { + // NOTE: the edge case of request.getRowsLimit() == numProcessed is handled below Preconditions.checkState( - request.getRowsLimit() > numProcessed, + request.getRowsLimit() >= numProcessed, "Detected too many rows for the current row limit during a retry."); builder.setRowsLimit(request.getRowsLimit() - numProcessed); } @@ -102,13 +101,14 @@ public ReadRowsRequest getResumeRequest(ReadRowsRequest request) { // Special case: empty query implies full table scan if (request.getRows().getRowKeysList().isEmpty() && request.getRows().getRowRangesList().isEmpty()) { - rowSetBuilder.addRowRanges(RowRange.newBuilder().setStartKeyOpen(lastKey).build()); - builder.setRows(rowSetBuilder.build()); - return builder.build(); + request = + request + .toBuilder() + .setRows(RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance())) + .build(); } - // Normal flow: narrow the request keys & ranges for (ByteString key : request.getRows().getRowKeysList()) { if (ByteStringComparator.INSTANCE.compare(key, lastKey) > 0) { rowSetBuilder.addRowKeys(key); @@ -164,6 +164,16 @@ public ReadRowsRequest getResumeRequest(ReadRowsRequest request) { rowSetBuilder.addRowRanges(rowRangeBuilder.build()); } + // Edge case: retrying a fulfilled request. + // A fulfilled request is one that has had all of its row keys and ranges fulfilled or if it had + // a row limit, has seen enough rows. These requests are replaced with a marker request that + // will be handled by ReadRowsRetryCompletedCallable. See docs in ReadRowsRetryCompletedCallable + // for more details. + if ((rowSetBuilder.getRowRangesCount() == 0 && rowSetBuilder.getRowKeysCount() == 0) + || (request.getRowsLimit() > 0 && request.getRowsLimit() == numProcessed)) { + return ReadRowsRetryCompletedCallable.FULFILLED_REQUEST_MARKER; + } + builder.setRows(rowSetBuilder.build()); return builder.build(); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryCompletedCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryCompletedCallable.java new file mode 100644 index 000000000000..c5c03f32fd50 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryCompletedCallable.java @@ -0,0 +1,71 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.readrows; + +import com.google.api.core.InternalApi; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StreamController; +import com.google.bigtable.v2.ReadRowsRequest; + +/** + * This callable addresses edge case of a ReadRows stream receiving all of the rows, but receiving a + * retryable error status instead of an OK. If a retry attempt is scheduled, then it should return + * an OK response. + * + *

    This callable works in tandem with {@link ReadRowsResumptionStrategy}, which will send a + * {@link #FULFILLED_REQUEST_MARKER} to be processed by this callable. Which will promptly notify + * the {@link ResponseObserver} that the stream has been successfully compeleted. + * + *

    This class is considered an internal implementation detail and not meant to be used by + * applications. + */ +@InternalApi +public final class ReadRowsRetryCompletedCallable + extends ServerStreamingCallable { + static final ReadRowsRequest FULFILLED_REQUEST_MARKER = + ReadRowsRequest.newBuilder().setRowsLimit(-1).build(); + + private final ServerStreamingCallable inner; + + public ReadRowsRetryCompletedCallable(ServerStreamingCallable inner) { + this.inner = inner; + } + + @Override + public void call( + ReadRowsRequest request, ResponseObserver responseObserver, ApiCallContext context) { + + if (request == FULFILLED_REQUEST_MARKER) { + responseObserver.onStart(new DummyController()); + responseObserver.onComplete(); + } else { + inner.call(request, responseObserver, context); + } + } + + private static class DummyController implements StreamController { + @Override + public void cancel() {} + + @Override + public void disableAutoInboundFlowControl() {} + + @Override + public void request(int count) {} + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java index 9a3827a28988..f37975b6c267 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java @@ -169,14 +169,29 @@ public void errorAfterRowLimitMetTest() { .respondWith("r1", "r2") .respondWithStatus(Code.UNAVAILABLE)); - Throwable error = null; - try { - getResults(Query.create(tableName.getTable()).range("r1", "r3").limit(2)); - } catch (Throwable t) { - error = t; - } + // Second retry request is handled locally in ReadRowsRetryCompletedCallable + + List actualResults = + getResults(Query.create(tableName.getTable()).range("r1", "r3").limit(2)); + + Truth.assertThat(actualResults).containsExactly("r1", "r2"); + } + + @Test + public void errorAfterRequestCompleteTest() { + service.expectations.add( + RpcExpectation.create() + .expectRequest(Range.closedOpen("r1", "r3")) + .expectRequest("r4") + .respondWith("r2", "r4") + .respondWithStatus(Code.UNAVAILABLE)); + + // Second retry request is handled locally in ReadRowsRetryCompletedCallable + + List actualResults = + getResults(Query.create(tableName.getTable()).range("r1", "r3").rowKey("r4")); - Truth.assertThat(error.getCause()).isInstanceOf(IllegalStateException.class); + Truth.assertThat(actualResults).containsExactly("r2", "r4"); } @Test From a8c75c330b3f095e8adc85f4161069fcead3d065 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Mon, 5 Mar 2018 14:47:37 -0500 Subject: [PATCH 7/9] re-organize code: move row limit setting after the edge case, use local variable for request munging --- .../readrows/ReadRowsResumptionStrategy.java | 38 ++++++++++--------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java index d07510e75a07..3b6ece852c23 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java @@ -83,39 +83,34 @@ public ReadRowsRequest getResumeRequest(ReadRowsRequest request) { if (lastKey.isEmpty()) { return request; } - Builder builder = request.toBuilder(); - if (request.getRowsLimit() > 0) { - // NOTE: the edge case of request.getRowsLimit() == numProcessed is handled below - Preconditions.checkState( - request.getRowsLimit() >= numProcessed, - "Detected too many rows for the current row limit during a retry."); - builder.setRowsLimit(request.getRowsLimit() - numProcessed); - } - - // Reset rows to scan. - builder.clearRows(); - - RowSet.Builder rowSetBuilder = RowSet.newBuilder(); + ReadRowsRequest originalRequest = request; - // Special case: empty query implies full table scan + // Special case: empty query implies full table scan, make this explicit by adding an unbounded + // range to the request if (request.getRows().getRowKeysList().isEmpty() && request.getRows().getRowRangesList().isEmpty()) { - request = + originalRequest = request .toBuilder() .setRows(RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance())) .build(); } - for (ByteString key : request.getRows().getRowKeysList()) { + // Start building the resume request. The keys & ranges are cleared and will be recomputed. + Builder builder = originalRequest.toBuilder(); + builder.clearRows(); + + RowSet.Builder rowSetBuilder = RowSet.newBuilder(); + + for (ByteString key : originalRequest.getRows().getRowKeysList()) { if (ByteStringComparator.INSTANCE.compare(key, lastKey) > 0) { rowSetBuilder.addRowKeys(key); } } - for (RowRange rowRange : request.getRows().getRowRangesList()) { + for (RowRange rowRange : originalRequest.getRows().getRowRangesList()) { RowRange.Builder rowRangeBuilder = RowRange.newBuilder(); switch (rowRange.getEndKeyCase()) { @@ -170,10 +165,17 @@ public ReadRowsRequest getResumeRequest(ReadRowsRequest request) { // will be handled by ReadRowsRetryCompletedCallable. See docs in ReadRowsRetryCompletedCallable // for more details. if ((rowSetBuilder.getRowRangesCount() == 0 && rowSetBuilder.getRowKeysCount() == 0) - || (request.getRowsLimit() > 0 && request.getRowsLimit() == numProcessed)) { + || (originalRequest.getRowsLimit() > 0 && originalRequest.getRowsLimit() == numProcessed)) { return ReadRowsRetryCompletedCallable.FULFILLED_REQUEST_MARKER; } + if (originalRequest.getRowsLimit() > 0) { + Preconditions.checkState( + originalRequest.getRowsLimit() > numProcessed, + "Detected too many rows for the current row limit during a retry."); + builder.setRowsLimit(originalRequest.getRowsLimit() - numProcessed); + } + builder.setRows(rowSetBuilder.build()); return builder.build(); } From 92b4a718f0011b0d100c39e827298b5174bcf23b Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Tue, 6 Mar 2018 12:57:51 -0500 Subject: [PATCH 8/9] address feedback --- .../data/v2/stub/EnhancedBigtableStub.java | 2 +- .../readrows/ReadRowsResumptionStrategy.java | 25 ++++++++++--------- .../ReadRowsRetryCompletedCallable.java | 5 ++-- .../data/v2/stub/readrows/package-info.java | 9 ++++--- 4 files changed, 22 insertions(+), 19 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index b34bd2523bb1..fbcc682155c2 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -81,7 +81,7 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings) .setCredentialsProvider(settings.getCredentialsProvider()); // ReadRow retries are handled in the overlay: disable retries in the base layer (but make - // sure to preserve the exception callable settings. + // sure to preserve the exception callable settings). baseSettingsBuilder .readRowsSettings() .setSimpleTimeoutNoRetries(Duration.ofHours(2)) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java index 3b6ece852c23..dfc1ea1ef5d1 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java @@ -60,7 +60,8 @@ public StreamResumptionStrategy createNew() { public void onProgress(RowT response) { // Last key can come from both the last processed row key and a synthetic row marker. The // synthetic row marker is emitted when the server has read a lot of data that was filtered out. - // So it can trim the start of the scan, but does not contribute to the row limit. + // The row marker can be used to trim the start of the scan, but does not contribute to the row + // limit. lastKey = rowAdapter.getKey(response); if (!rowAdapter.isScanMarkerRow(response)) { // Only real rows count towards the rows limit. @@ -79,15 +80,15 @@ public void onProgress(RowT response) { @Override public ReadRowsRequest getResumeRequest(ReadRowsRequest request) { // An empty lastKey means that we have not successfully read the first row, - // resume with the original request object. + // so resume with the original request object. if (lastKey.isEmpty()) { return request; } ReadRowsRequest originalRequest = request; - // Special case: empty query implies full table scan, make this explicit by adding an unbounded - // range to the request + // Special case: empty query implies full table scan, so make this explicit by adding an + // unbounded range to the request if (request.getRows().getRowKeysList().isEmpty() && request.getRows().getRowRangesList().isEmpty()) { @@ -115,17 +116,17 @@ public ReadRowsRequest getResumeRequest(ReadRowsRequest request) { switch (rowRange.getEndKeyCase()) { case END_KEY_CLOSED: - if (ByteStringComparator.INSTANCE.compare(rowRange.getEndKeyClosed(), lastKey) <= 0) { - continue; - } else { + if (ByteStringComparator.INSTANCE.compare(rowRange.getEndKeyClosed(), lastKey) > 0) { rowRangeBuilder.setEndKeyClosed(rowRange.getEndKeyClosed()); + } else { + continue; } break; case END_KEY_OPEN: - if (ByteStringComparator.INSTANCE.compare(rowRange.getEndKeyOpen(), lastKey) <= 0) { - continue; - } else { + if (ByteStringComparator.INSTANCE.compare(rowRange.getEndKeyOpen(), lastKey) > 0) { rowRangeBuilder.setEndKeyOpen(rowRange.getEndKeyOpen()); + } else { + continue; } break; case ENDKEY_NOT_SET: @@ -160,8 +161,8 @@ public ReadRowsRequest getResumeRequest(ReadRowsRequest request) { } // Edge case: retrying a fulfilled request. - // A fulfilled request is one that has had all of its row keys and ranges fulfilled or if it had - // a row limit, has seen enough rows. These requests are replaced with a marker request that + // A fulfilled request is one that has had all of its row keys and ranges fulfilled, or if it + // had a row limit, has seen enough rows. These requests are replaced with a marker request that // will be handled by ReadRowsRetryCompletedCallable. See docs in ReadRowsRetryCompletedCallable // for more details. if ((rowSetBuilder.getRowRangesCount() == 0 && rowSetBuilder.getRowKeysCount() == 0) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryCompletedCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryCompletedCallable.java index c5c03f32fd50..6c698a51ca21 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryCompletedCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryCompletedCallable.java @@ -28,8 +28,9 @@ * an OK response. * *

    This callable works in tandem with {@link ReadRowsResumptionStrategy}, which will send a - * {@link #FULFILLED_REQUEST_MARKER} to be processed by this callable. Which will promptly notify - * the {@link ResponseObserver} that the stream has been successfully compeleted. + * {@link #FULFILLED_REQUEST_MARKER} to be processed by this callable. Upon receiving the {@link + * #FULFILLED_REQUEST_MARKER}, this callable will promptly notify the {@link ResponseObserver} that + * the stream has been successfully compeleted. * *

    This class is considered an internal implementation detail and not meant to be used by * applications. diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/package-info.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/package-info.java index 4502800762fa..21a0186b49d5 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/package-info.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/package-info.java @@ -17,9 +17,9 @@ * Implementation details for {@link * com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub#readRowsCallable()}. * - *

    The ReadRows protocol is optimized for transmission and cannot be consumed directly. This - * package implements significant customizations on top of the raw GAPIC generated stub to handle - * row merging and retries. + *

    The ReadRows protocol is optimized for transmission and is not designed to be consumed + * directly. This package implements significant customizations on top of the raw GAPIC generated + * stub to handle row merging and retries. * *

      *
    • ReadRowsUserCallable: Creates protobuf {@link com.google.bigtable.v2.ReadRowsRequest}s from @@ -30,7 +30,8 @@ * to be filtered out. *
    • RowMerger (+ helpers): Implements resuming retries for gax's Callables#retrying middleware. *
    • FilterMarkerRowsCallable: Filters out marker rows that are used for efficient retry - * resumes. This is necessary because + * resumes. The marker is an internal implementation detail to communicate state to the + * RowMerger and should not be exposed to the caller. *
    * *

    This package is considered an internal implementation detail and is not meant to be used by From 003d88e16e1f2112f33b7ef08f5e081e8fd285fb Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Tue, 6 Mar 2018 13:01:06 -0500 Subject: [PATCH 9/9] fix javadoc link --- .../google/cloud/bigtable/data/v2/stub/readrows/RowMerger.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/RowMerger.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/RowMerger.java index 4ce13eb87daa..dcb7347dda10 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/RowMerger.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/RowMerger.java @@ -25,7 +25,8 @@ * An implementation of a {@link Reframer} that feeds the row merging {@link StateMachine}. * *

    {@link com.google.cloud.bigtable.gaxx.reframing.ReframingResponseObserver} pushes {@link - * ReadRowsResponse.CellChunk}s into this class and pops fully merged logical rows. Example usage: + * com.google.bigtable.v2.ReadRowsResponse.CellChunk}s into this class and pops fully merged logical + * rows. Example usage: * *

    {@code
      * RowMerger rowMerger = new RowMerger<>(myRowBuilder);