diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java index 1e3839b5df4f..3a3de5622cd2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java @@ -30,6 +30,7 @@ import java.util.concurrent.CompletionStage; import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource; import org.apache.beam.sdk.values.KV; +import org.joda.time.Duration; /** An interface for real or fake implementations of Cloud Bigtable. */ interface BigtableService extends Serializable { @@ -75,6 +76,12 @@ interface Reader { * current row because the last such call was unsuccessful. */ Row getCurrentRow() throws NoSuchElementException; + + // Workaround for ReadRows requests which requires to pass the timeouts in + // ApiContext. Can be removed later once it's fixed in Veneer. + Duration getAttemptTimeout(); + + Duration getOperationTimeout(); } /** Returns a {@link Reader} that will read from the specified source. */ diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java index dad3370dae60..d6208be1bf94 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java @@ -23,6 +23,7 @@ import com.google.api.gax.batching.Batcher; import com.google.api.gax.batching.BatchingException; import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.StreamController; @@ -47,6 +48,8 @@ import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; import com.google.protobuf.ByteString; +import io.grpc.CallOptions; +import io.grpc.Deadline; import io.grpc.StatusRuntimeException; import java.io.IOException; import java.util.ArrayDeque; @@ -105,6 +108,9 @@ class BigtableServiceImpl implements BigtableService { BigtableServiceImpl(BigtableDataSettings settings) throws IOException { this.projectId = settings.getProjectId(); this.instanceId = settings.getInstanceId(); + RetrySettings retry = settings.getStubSettings().readRowsSettings().getRetrySettings(); + this.readAttemptTimeout = Duration.millis(retry.getInitialRpcTimeout().toMillis()); + this.readOperationTimeout = Duration.millis(retry.getTotalTimeout().toMillis()); this.client = BigtableDataClient.create(settings); LOG.info("Started Bigtable service with settings {}", settings); } @@ -113,6 +119,10 @@ class BigtableServiceImpl implements BigtableService { private final String projectId; private final String instanceId; + private final Duration readAttemptTimeout; + + private final Duration readOperationTimeout; + @Override public BigtableWriterImpl openForWriting(BigtableWriteOptions writeOptions) { return new BigtableWriterImpl( @@ -135,6 +145,9 @@ static class BigtableReaderImpl implements Reader { private final RowFilter rowFilter; private Iterator results; + private final Duration attemptTimeout; + private final Duration operationTimeout; + private Row currentRow; @VisibleForTesting @@ -144,13 +157,18 @@ static class BigtableReaderImpl implements Reader { String instanceId, String tableId, List ranges, - @Nullable RowFilter rowFilter) { + @Nullable RowFilter rowFilter, + Duration attemptTimeout, + Duration operationTimeout) { this.client = client; this.projectId = projectId; this.instanceId = instanceId; this.tableId = tableId; this.ranges = ranges; this.rowFilter = rowFilter; + + this.attemptTimeout = attemptTimeout; + this.operationTimeout = operationTimeout; } @Override @@ -171,7 +189,7 @@ public boolean start() throws IOException { results = client .readRowsCallable(new BigtableRowProtoAdapter()) - .call(query, GrpcCallContext.createDefault()) + .call(query, createScanCallContext(attemptTimeout, operationTimeout)) .iterator(); serviceCallMetric.call("ok"); } catch (StatusRuntimeException e) { @@ -197,6 +215,16 @@ public Row getCurrentRow() throws NoSuchElementException { } return currentRow; } + + @Override + public Duration getAttemptTimeout() { + return attemptTimeout; + } + + @Override + public Duration getOperationTimeout() { + return operationTimeout; + } } @VisibleForTesting @@ -210,6 +238,8 @@ static class BigtableSegmentReaderImpl implements Reader { private final int refillSegmentWaterMark; private final long maxSegmentByteSize; private ServiceCallMetric serviceCallMetric; + private final Duration attemptTimeout; + private final Duration operationTimeout; private static class UpstreamResults { private final List rows; @@ -228,7 +258,9 @@ static BigtableSegmentReaderImpl create( String tableId, List ranges, @Nullable RowFilter rowFilter, - int maxBufferedElementCount) { + int maxBufferedElementCount, + Duration attemptTimeout, + Duration operationTimeout) { RowSet.Builder rowSetBuilder = RowSet.newBuilder(); if (ranges.isEmpty()) { @@ -260,6 +292,8 @@ static BigtableSegmentReaderImpl create( filter, maxBufferedElementCount, maxSegmentByteSize, + attemptTimeout, + operationTimeout, createCallMetric(projectId, instanceId, tableId)); } @@ -273,6 +307,8 @@ static BigtableSegmentReaderImpl create( @Nullable RowFilter filter, int maxRowsInBuffer, long maxSegmentByteSize, + Duration attemptTimeout, + Duration operationTimeout, ServiceCallMetric serviceCallMetric) { if (rowSet.equals(rowSet.getDefaultInstanceForType())) { rowSet = RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build(); @@ -293,6 +329,8 @@ static BigtableSegmentReaderImpl create( // Asynchronously refill buffer when there is 10% of the elements are left this.refillSegmentWaterMark = Math.max(1, (int) (request.getRowsLimit() * WATERMARK_PERCENTAGE)); + this.attemptTimeout = attemptTimeout; + this.operationTimeout = operationTimeout; } @Override @@ -388,7 +426,7 @@ public void onComplete() { future.set(new UpstreamResults(rows, nextNextRequest)); } }, - GrpcCallContext.createDefault()); + createScanCallContext(attemptTimeout, operationTimeout)); return future; } @@ -443,6 +481,16 @@ public Row getCurrentRow() throws NoSuchElementException { } return currentRow; } + + @Override + public Duration getAttemptTimeout() { + return attemptTimeout; + } + + @Override + public Duration getOperationTimeout() { + return operationTimeout; + } } @VisibleForTesting @@ -612,7 +660,9 @@ public Reader createReader(BigtableSource source) throws IOException { source.getTableId().get(), source.getRanges(), source.getRowFilter(), - source.getMaxBufferElementCount()); + source.getMaxBufferElementCount(), + readAttemptTimeout, + readOperationTimeout); } else { return new BigtableReaderImpl( client, @@ -620,10 +670,26 @@ public Reader createReader(BigtableSource source) throws IOException { instanceId, source.getTableId().get(), source.getRanges(), - source.getRowFilter()); + source.getRowFilter(), + readAttemptTimeout, + readOperationTimeout); } } + // - per attempt deadlines - veneer doesn't implement deadlines for attempts. To workaround this, + // the timeouts are set per call in the ApiCallContext. However this creates a separate issue of + // over running the operation deadline, so gRPC deadline is also set. + private static GrpcCallContext createScanCallContext( + Duration attemptTimeout, Duration operationTimeout) { + GrpcCallContext ctx = GrpcCallContext.createDefault(); + + ctx.withCallOptions( + CallOptions.DEFAULT.withDeadline( + Deadline.after(operationTimeout.getMillis(), TimeUnit.MILLISECONDS))); + ctx.withTimeout(org.threeten.bp.Duration.ofMillis(attemptTimeout.getMillis())); + return ctx; + } + @Override public List getSampleRowKeys(BigtableSource source) { return client.sampleRowKeys(source.getTableId().get()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index ae960533340d..5163e0e4ff25 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -1846,6 +1846,16 @@ public Row getCurrentRow() { } return currentRow; } + + @Override + public Duration getAttemptTimeout() { + return Duration.millis(100); + } + + @Override + public Duration getOperationTimeout() { + return Duration.millis(1000); + } } /** A {@link FakeBigtableReader} implementation that throw exceptions at given stage. */ diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java index 98f33ffeb9d7..8291c21e7e84 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java @@ -29,6 +29,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.Batcher; +import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.ServerStream; @@ -170,6 +171,8 @@ public void testRead() throws IOException { // Set up client to return callable when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable); + RetrySettings retrySettings = + bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings(); BigtableService.Reader underTest = new BigtableServiceImpl.BigtableReaderImpl( mockBigtableDataClient, @@ -177,7 +180,9 @@ public void testRead() throws IOException { bigtableDataSettings.getInstanceId(), mockBigtableSource.getTableId().get(), mockBigtableSource.getRanges(), - null); + null, + Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()), + Duration.millis(retrySettings.getTotalTimeout().toMillis())); underTest.start(); Assert.assertEquals(expectedRow, underTest.getCurrentRow()); @@ -225,6 +230,8 @@ public Void answer(InvocationOnMock invocation) throws Throwable { when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable); when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID)); + RetrySettings retrySettings = + bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings(); BigtableService.Reader underTest = new BigtableServiceImpl.BigtableSegmentReaderImpl( mockBigtableDataClient, @@ -235,6 +242,8 @@ public Void answer(InvocationOnMock invocation) throws Throwable { RowFilter.getDefaultInstance(), SEGMENT_SIZE, DEFAULT_BYTE_SEGMENT_SIZE, + Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()), + Duration.millis(retrySettings.getTotalTimeout().toMillis()), mockCallMetric); underTest.start(); @@ -278,6 +287,8 @@ public void testReadSingleRangeAboveSegmentLimit() throws IOException { // Set up client to return callable when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable); + RetrySettings retrySettings = + bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings(); BigtableService.Reader underTest = new BigtableServiceImpl.BigtableSegmentReaderImpl( mockBigtableDataClient, @@ -288,6 +299,8 @@ public void testReadSingleRangeAboveSegmentLimit() throws IOException { RowFilter.getDefaultInstance(), SEGMENT_SIZE, DEFAULT_BYTE_SEGMENT_SIZE, + Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()), + Duration.millis(retrySettings.getTotalTimeout().toMillis()), mockCallMetric); List actualResults = new ArrayList<>(); @@ -344,6 +357,8 @@ public void testReadMultipleRanges() throws IOException { // Set up client to return callable when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable); + RetrySettings retrySettings = + bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings(); BigtableService.Reader underTest = new BigtableServiceImpl.BigtableSegmentReaderImpl( mockBigtableDataClient, @@ -354,6 +369,8 @@ public void testReadMultipleRanges() throws IOException { RowFilter.getDefaultInstance(), SEGMENT_SIZE, DEFAULT_BYTE_SEGMENT_SIZE, + Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()), + Duration.millis(retrySettings.getTotalTimeout().toMillis()), mockCallMetric); List actualResults = new ArrayList<>(); @@ -411,6 +428,8 @@ public void testReadMultipleRangesOverlappingKeys() throws IOException { // Set up client to return callable when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable); + RetrySettings retrySettings = + bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings(); BigtableService.Reader underTest = new BigtableServiceImpl.BigtableSegmentReaderImpl( mockBigtableDataClient, @@ -421,6 +440,8 @@ public void testReadMultipleRangesOverlappingKeys() throws IOException { RowFilter.getDefaultInstance(), SEGMENT_SIZE, DEFAULT_BYTE_SEGMENT_SIZE, + Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()), + Duration.millis(retrySettings.getTotalTimeout().toMillis()), mockCallMetric); List actualResults = new ArrayList<>(); @@ -463,6 +484,8 @@ public void testReadFullTableScan() throws IOException { // Set up client to return callable when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable); + RetrySettings retrySettings = + bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings(); BigtableService.Reader underTest = new BigtableServiceImpl.BigtableSegmentReaderImpl( mockBigtableDataClient, @@ -473,6 +496,8 @@ public void testReadFullTableScan() throws IOException { RowFilter.getDefaultInstance(), SEGMENT_SIZE, DEFAULT_BYTE_SEGMENT_SIZE, + Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()), + Duration.millis(retrySettings.getTotalTimeout().toMillis()), mockCallMetric); List actualResults = new ArrayList<>(); @@ -530,6 +555,8 @@ public void testReadFillBuffer() throws IOException { // Set up client to return callable when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable); + RetrySettings retrySettings = + bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings(); BigtableService.Reader underTest = new BigtableServiceImpl.BigtableSegmentReaderImpl( mockBigtableDataClient, @@ -540,6 +567,8 @@ public void testReadFillBuffer() throws IOException { RowFilter.getDefaultInstance(), SEGMENT_SIZE, DEFAULT_BYTE_SEGMENT_SIZE, + Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()), + Duration.millis(retrySettings.getTotalTimeout().toMillis()), mockCallMetric); List actualResults = new ArrayList<>(); @@ -595,6 +624,8 @@ public void testRefillOnLowWatermark() throws IOException { // Set up client to return callable when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable); + RetrySettings retrySettings = + bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings(); BigtableService.Reader underTest = new BigtableServiceImpl.BigtableSegmentReaderImpl( mockBigtableDataClient, @@ -605,6 +636,8 @@ public void testRefillOnLowWatermark() throws IOException { RowFilter.getDefaultInstance(), segmentSize, DEFAULT_BYTE_SEGMENT_SIZE, + Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()), + Duration.millis(retrySettings.getTotalTimeout().toMillis()), mockCallMetric); Assert.assertTrue(underTest.start()); @@ -674,6 +707,8 @@ public Void answer(InvocationOnMock invocation) throws Throwable { // Set up client to return callable when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable); + RetrySettings retrySettings = + bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings(); BigtableService.Reader underTest = new BigtableServiceImpl.BigtableSegmentReaderImpl( mockBigtableDataClient, @@ -684,6 +719,8 @@ public Void answer(InvocationOnMock invocation) throws Throwable { RowFilter.getDefaultInstance(), SEGMENT_SIZE, segmentByteLimit, + Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()), + Duration.millis(retrySettings.getTotalTimeout().toMillis()), mockCallMetric); List actualResults = new ArrayList<>(); @@ -736,6 +773,8 @@ public void run() { mockStub.createReadRowsCallable(new BigtableServiceImpl.BigtableRowProtoAdapter()); // Set up client to return callable when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable); + RetrySettings retrySettings = + bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings(); BigtableService.Reader underTest = new BigtableServiceImpl.BigtableSegmentReaderImpl( mockBigtableDataClient, @@ -746,6 +785,8 @@ public void run() { RowFilter.getDefaultInstance(), SEGMENT_SIZE, DEFAULT_BYTE_SEGMENT_SIZE, + Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()), + Duration.millis(retrySettings.getTotalTimeout().toMillis()), mockCallMetric); IOException returnedError = null;