Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.CompletionStage;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Duration;

/** An interface for real or fake implementations of Cloud Bigtable. */
interface BigtableService extends Serializable {
Expand Down Expand Up @@ -75,6 +76,12 @@ interface Reader {
* current row because the last such call was unsuccessful.
*/
Row getCurrentRow() throws NoSuchElementException;

// Workaround for ReadRows requests which requires to pass the timeouts in
// ApiContext. Can be removed later once it's fixed in Veneer.
Duration getAttemptTimeout();

Duration getOperationTimeout();
}

/** Returns a {@link Reader} that will read from the specified source. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.batching.BatchingException;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StreamController;
Expand All @@ -47,6 +48,8 @@
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.protobuf.ByteString;
import io.grpc.CallOptions;
import io.grpc.Deadline;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.ArrayDeque;
Expand Down Expand Up @@ -105,6 +108,9 @@ class BigtableServiceImpl implements BigtableService {
BigtableServiceImpl(BigtableDataSettings settings) throws IOException {
this.projectId = settings.getProjectId();
this.instanceId = settings.getInstanceId();
RetrySettings retry = settings.getStubSettings().readRowsSettings().getRetrySettings();
this.readAttemptTimeout = Duration.millis(retry.getInitialRpcTimeout().toMillis());
this.readOperationTimeout = Duration.millis(retry.getTotalTimeout().toMillis());
this.client = BigtableDataClient.create(settings);
LOG.info("Started Bigtable service with settings {}", settings);
}
Expand All @@ -113,6 +119,10 @@ class BigtableServiceImpl implements BigtableService {
private final String projectId;
private final String instanceId;

private final Duration readAttemptTimeout;

private final Duration readOperationTimeout;

@Override
public BigtableWriterImpl openForWriting(BigtableWriteOptions writeOptions) {
return new BigtableWriterImpl(
Expand All @@ -135,6 +145,9 @@ static class BigtableReaderImpl implements Reader {
private final RowFilter rowFilter;
private Iterator<Row> results;

private final Duration attemptTimeout;
private final Duration operationTimeout;

private Row currentRow;

@VisibleForTesting
Expand All @@ -144,13 +157,18 @@ static class BigtableReaderImpl implements Reader {
String instanceId,
String tableId,
List<ByteKeyRange> ranges,
@Nullable RowFilter rowFilter) {
@Nullable RowFilter rowFilter,
Duration attemptTimeout,
Duration operationTimeout) {
this.client = client;
this.projectId = projectId;
this.instanceId = instanceId;
this.tableId = tableId;
this.ranges = ranges;
this.rowFilter = rowFilter;

this.attemptTimeout = attemptTimeout;
this.operationTimeout = operationTimeout;
}

@Override
Expand All @@ -171,7 +189,7 @@ public boolean start() throws IOException {
results =
client
.readRowsCallable(new BigtableRowProtoAdapter())
.call(query, GrpcCallContext.createDefault())
.call(query, createScanCallContext(attemptTimeout, operationTimeout))
.iterator();
serviceCallMetric.call("ok");
} catch (StatusRuntimeException e) {
Expand All @@ -197,6 +215,16 @@ public Row getCurrentRow() throws NoSuchElementException {
}
return currentRow;
}

@Override
public Duration getAttemptTimeout() {
return attemptTimeout;
}

@Override
public Duration getOperationTimeout() {
return operationTimeout;
}
}

@VisibleForTesting
Expand All @@ -210,6 +238,8 @@ static class BigtableSegmentReaderImpl implements Reader {
private final int refillSegmentWaterMark;
private final long maxSegmentByteSize;
private ServiceCallMetric serviceCallMetric;
private final Duration attemptTimeout;
private final Duration operationTimeout;

private static class UpstreamResults {
private final List<Row> rows;
Expand All @@ -228,7 +258,9 @@ static BigtableSegmentReaderImpl create(
String tableId,
List<ByteKeyRange> ranges,
@Nullable RowFilter rowFilter,
int maxBufferedElementCount) {
int maxBufferedElementCount,
Duration attemptTimeout,
Duration operationTimeout) {

RowSet.Builder rowSetBuilder = RowSet.newBuilder();
if (ranges.isEmpty()) {
Expand Down Expand Up @@ -260,6 +292,8 @@ static BigtableSegmentReaderImpl create(
filter,
maxBufferedElementCount,
maxSegmentByteSize,
attemptTimeout,
operationTimeout,
createCallMetric(projectId, instanceId, tableId));
}

Expand All @@ -273,6 +307,8 @@ static BigtableSegmentReaderImpl create(
@Nullable RowFilter filter,
int maxRowsInBuffer,
long maxSegmentByteSize,
Duration attemptTimeout,
Duration operationTimeout,
ServiceCallMetric serviceCallMetric) {
if (rowSet.equals(rowSet.getDefaultInstanceForType())) {
rowSet = RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
Expand All @@ -293,6 +329,8 @@ static BigtableSegmentReaderImpl create(
// Asynchronously refill buffer when there is 10% of the elements are left
this.refillSegmentWaterMark =
Math.max(1, (int) (request.getRowsLimit() * WATERMARK_PERCENTAGE));
this.attemptTimeout = attemptTimeout;
this.operationTimeout = operationTimeout;
}

@Override
Expand Down Expand Up @@ -388,7 +426,7 @@ public void onComplete() {
future.set(new UpstreamResults(rows, nextNextRequest));
}
},
GrpcCallContext.createDefault());
createScanCallContext(attemptTimeout, operationTimeout));
return future;
}

Expand Down Expand Up @@ -443,6 +481,16 @@ public Row getCurrentRow() throws NoSuchElementException {
}
return currentRow;
}

@Override
public Duration getAttemptTimeout() {
return attemptTimeout;
}

@Override
public Duration getOperationTimeout() {
return operationTimeout;
}
}

@VisibleForTesting
Expand Down Expand Up @@ -612,18 +660,36 @@ public Reader createReader(BigtableSource source) throws IOException {
source.getTableId().get(),
source.getRanges(),
source.getRowFilter(),
source.getMaxBufferElementCount());
source.getMaxBufferElementCount(),
readAttemptTimeout,
readOperationTimeout);
} else {
return new BigtableReaderImpl(
client,
projectId,
instanceId,
source.getTableId().get(),
source.getRanges(),
source.getRowFilter());
source.getRowFilter(),
readAttemptTimeout,
readOperationTimeout);
}
}

// - per attempt deadlines - veneer doesn't implement deadlines for attempts. To workaround this,
// the timeouts are set per call in the ApiCallContext. However this creates a separate issue of
// over running the operation deadline, so gRPC deadline is also set.
private static GrpcCallContext createScanCallContext(
Duration attemptTimeout, Duration operationTimeout) {
GrpcCallContext ctx = GrpcCallContext.createDefault();

ctx.withCallOptions(
CallOptions.DEFAULT.withDeadline(
Deadline.after(operationTimeout.getMillis(), TimeUnit.MILLISECONDS)));
ctx.withTimeout(org.threeten.bp.Duration.ofMillis(attemptTimeout.getMillis()));
return ctx;
}

@Override
public List<KeyOffset> getSampleRowKeys(BigtableSource source) {
return client.sampleRowKeys(source.getTableId().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1846,6 +1846,16 @@ public Row getCurrentRow() {
}
return currentRow;
}

@Override
public Duration getAttemptTimeout() {
return Duration.millis(100);
}

@Override
public Duration getOperationTimeout() {
return Duration.millis(1000);
}
}

/** A {@link FakeBigtableReader} implementation that throw exceptions at given stage. */
Expand Down
Loading