Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,16 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
BigtableStubSettings.newBuilder()
.setTransportChannelProvider(settings.getTransportChannelProvider())
.setEndpoint(settings.getEndpoint())
.setCredentialsProvider(settings.getCredentialsProvider());
.setCredentialsProvider(settings.getCredentialsProvider())
.setStreamWatchdogProvider(settings.getStreamWatchdogProvider())
.setStreamWatchdogCheckInterval(settings.getStreamWatchdogCheckInterval());

// ReadRow retries are handled in the overlay: disable retries in the base layer (but make
// sure to preserve the exception callable settings).
baseSettingsBuilder
.readRowsSettings()
.setSimpleTimeoutNoRetries(Duration.ofHours(2))
.setRetryableCodes(settings.readRowsSettings().getRetryableCodes())
.setTimeoutCheckInterval(Duration.ZERO)
.setIdleTimeout(Duration.ZERO);

// SampleRowKeys retries are handled in the overlay: disable retries in the base layer (but make
Expand All @@ -115,7 +116,6 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
.mutateRowsSettings()
.setSimpleTimeoutNoRetries(Duration.ofHours(2))
.setRetryableCodes(settings.mutateRowsSettings().getRetryableCodes())
.setTimeoutCheckInterval(Duration.ZERO)
.setIdleTimeout(Duration.ZERO);

// CheckAndMutateRow is a simple passthrough
Expand Down Expand Up @@ -182,7 +182,6 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
.setResumptionStrategy(new ReadRowsResumptionStrategy<>(rowAdapter))
.setRetryableCodes(settings.readRowsSettings().getRetryableCodes())
.setRetrySettings(settings.readRowsSettings().getRetrySettings())
.setTimeoutCheckInterval(settings.readRowsSettings().getTimeoutCheckInterval())
.setIdleTimeout(settings.readRowsSettings().getIdleTimeout())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,16 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private static final Set<Code> DEFAULT_RETRY_CODES =
ImmutableSet.of(Code.DEADLINE_EXCEEDED, Code.UNAVAILABLE, Code.ABORTED);

// Copy of default retrying settings in the yaml
private static final RetrySettings DEFAULT_RETRY_SETTINGS =
RetrySettings.newBuilder()
.setMaxAttempts(10)
.setTotalTimeout(Duration.ofHours(1))
.setInitialRetryDelay(Duration.ofMillis(100))
.setInitialRetryDelay(Duration.ofMillis(100L))
.setRetryDelayMultiplier(1.3)
.setMaxRetryDelay(Duration.ofMinutes(1))
.setInitialRpcTimeout(Duration.ofSeconds(20))
.setRpcTimeoutMultiplier(1)
.setMaxRpcTimeout(Duration.ofSeconds(20))
.setMaxRetryDelay(Duration.ofMillis(60000L))
.setInitialRpcTimeout(Duration.ofMillis(20000L))
.setRpcTimeoutMultiplier(1.0)
.setMaxRpcTimeout(Duration.ofMillis(20000L))
.setTotalTimeout(Duration.ofMillis(600000L))
.build();

private final InstanceName instanceName;
Expand Down Expand Up @@ -201,14 +201,16 @@ private Builder() {
.setChannelsPerCpu(2)
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
.build());
setStreamWatchdogCheckInterval(baseDefaults.getStreamWatchdogCheckInterval());
setStreamWatchdogProvider(baseDefaults.getStreamWatchdogProvider());

// Per-method settings using baseSettings for defaults.
readRowsSettings = ServerStreamingCallSettings.newBuilder();
/* TODO: copy timeouts, retryCodes & retrySettings from baseSettings.readRows once it exists in GAPIC */
readRowsSettings
.setRetryableCodes(DEFAULT_RETRY_CODES)
.setRetrySettings(DEFAULT_RETRY_SETTINGS)
.setTimeoutCheckInterval(Duration.ofSeconds(10))
.setRetrySettings(
DEFAULT_RETRY_SETTINGS.toBuilder().setTotalTimeout(Duration.ofHours(1)).build())
.setIdleTimeout(Duration.ofMinutes(5));

sampleRowKeysSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
Expand All @@ -217,14 +219,8 @@ private Builder() {
.setRetryableCodes(Code.DEADLINE_EXCEEDED, Code.UNAVAILABLE, Code.ABORTED)
.setRetrySettings(DEFAULT_RETRY_SETTINGS);

// NOTE: This client enforces client side timestamps, which makes all mutations retryable.
// However, since the base GAPIC client allows for server side timestamps, it is not
// configured to enable retries. So the retry settings have to be defined here instead of
// being copied from the BigtableStubSettings.
mutateRowSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
mutateRowSettings
.setRetryableCodes(DEFAULT_RETRY_CODES)
.setRetrySettings(DEFAULT_RETRY_SETTINGS);
copyRetrySettings(baseDefaults.mutateRowSettings(), mutateRowSettings);

/* TODO: copy retryCodes & retrySettings from baseSettings.mutateRows once it exists in GAPIC */
mutateRowsSettings =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public StreamResumptionStrategy<ReadRowsRequest, RowT> createNew() {
}

@Override
public void onProgress(RowT response) {
public RowT processResponse(RowT response) {
// Last key can come from both the last processed row key and a synthetic row marker. The
// synthetic row marker is emitted when the server has read a lot of data that was filtered out.
// The row marker can be used to trim the start of the scan, but does not contribute to the row
Expand All @@ -67,6 +67,7 @@ public void onProgress(RowT response) {
// Only real rows count towards the rows limit.
numProcessed++;
}
return response;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.rpc.WatchdogProvider;
import com.google.bigtable.admin.v2.InstanceName;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
Expand Down Expand Up @@ -62,42 +63,74 @@ public void settingsAreNotLostTest() {
String appProfileId = "my-app-profile-id";
String endpoint = "some.other.host:123";
CredentialsProvider credentialsProvider = Mockito.mock(CredentialsProvider.class);
WatchdogProvider watchdogProvider = Mockito.mock(WatchdogProvider.class);
Duration watchdogInterval = Duration.ofSeconds(12);

EnhancedBigtableStubSettings.Builder builder =
EnhancedBigtableStubSettings.newBuilder()
.setInstanceName(instanceName)
.setAppProfileId(appProfileId)
.setEndpoint(endpoint)
.setCredentialsProvider(credentialsProvider);
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
.setStreamWatchdogCheckInterval(watchdogInterval);

verifyBuilder(builder, instanceName, appProfileId, endpoint, credentialsProvider);
verifySettings(builder.build(), instanceName, appProfileId, endpoint, credentialsProvider);
verifyBuilder(
builder.build().toBuilder(), instanceName, appProfileId, endpoint, credentialsProvider);
builder,
instanceName,
appProfileId,
endpoint,
credentialsProvider,
watchdogProvider,
watchdogInterval);
verifySettings(
builder.build(),
instanceName,
appProfileId,
endpoint,
credentialsProvider,
watchdogProvider,
watchdogInterval);
verifyBuilder(
builder.build().toBuilder(),
instanceName,
appProfileId,
endpoint,
credentialsProvider,
watchdogProvider,
watchdogInterval);
}

private void verifyBuilder(
EnhancedBigtableStubSettings.Builder builder,
InstanceName instanceName,
String appProfileId,
String endpoint,
CredentialsProvider credentialsProvider) {
CredentialsProvider credentialsProvider,
WatchdogProvider watchdogProvider,
Duration watchdogInterval) {
assertThat(builder.getInstanceName()).isEqualTo(instanceName);
assertThat(builder.getAppProfileId()).isEqualTo(appProfileId);
assertThat(builder.getEndpoint()).isEqualTo(endpoint);
assertThat(builder.getCredentialsProvider()).isEqualTo(credentialsProvider);
assertThat(builder.getStreamWatchdogProvider()).isSameAs(watchdogProvider);
assertThat(builder.getStreamWatchdogCheckInterval()).isEqualTo(watchdogInterval);
}

private void verifySettings(
EnhancedBigtableStubSettings settings,
InstanceName instanceName,
String appProfileId,
String endpoint,
CredentialsProvider credentialsProvider) {
CredentialsProvider credentialsProvider,
WatchdogProvider watchdogProvider,
Duration watchdogInterval) {
assertThat(settings.getInstanceName()).isEqualTo(instanceName);
assertThat(settings.getAppProfileId()).isEqualTo(appProfileId);
assertThat(settings.getEndpoint()).isEqualTo(endpoint);
assertThat(settings.getCredentialsProvider()).isEqualTo(credentialsProvider);
assertThat(settings.getStreamWatchdogProvider()).isSameAs(watchdogProvider);
assertThat(settings.getStreamWatchdogCheckInterval()).isEqualTo(watchdogInterval);
}

@Test
Expand Down Expand Up @@ -132,29 +165,22 @@ public void readRowsIsNotLostTest() {

builder
.readRowsSettings()
.setTimeoutCheckInterval(Duration.ofSeconds(10))
.setIdleTimeout(Duration.ofMinutes(5))
.setRetryableCodes(Code.ABORTED, Code.DEADLINE_EXCEEDED)
.setRetrySettings(retrySettings)
.build();

assertThat(builder.readRowsSettings().getTimeoutCheckInterval())
.isEqualTo(Duration.ofSeconds(10));
assertThat(builder.readRowsSettings().getIdleTimeout()).isEqualTo(Duration.ofMinutes(5));
assertThat(builder.readRowsSettings().getRetryableCodes())
.containsAllOf(Code.ABORTED, Code.DEADLINE_EXCEEDED);
assertThat(builder.readRowsSettings().getRetrySettings()).isEqualTo(retrySettings);

assertThat(builder.build().readRowsSettings().getTimeoutCheckInterval())
.isEqualTo(Duration.ofSeconds(10));
assertThat(builder.build().readRowsSettings().getIdleTimeout())
.isEqualTo(Duration.ofMinutes(5));
assertThat(builder.build().readRowsSettings().getRetryableCodes())
.containsAllOf(Code.ABORTED, Code.DEADLINE_EXCEEDED);
assertThat(builder.build().readRowsSettings().getRetrySettings()).isEqualTo(retrySettings);

assertThat(builder.build().toBuilder().readRowsSettings().getTimeoutCheckInterval())
.isEqualTo(Duration.ofSeconds(10));
assertThat(builder.build().toBuilder().readRowsSettings().getIdleTimeout())
.isEqualTo(Duration.ofMinutes(5));
assertThat(builder.build().toBuilder().readRowsSettings().getRetryableCodes())
Expand Down Expand Up @@ -367,9 +393,8 @@ public void checkAndMutateRowSettingsAreSane() {
}

private void verifyRetrySettingAreSane(Set<Code> retryCodes, RetrySettings retrySettings) {
assertThat(retryCodes).containsAllOf(Code.DEADLINE_EXCEEDED, Code.UNAVAILABLE, Code.ABORTED);
assertThat(retryCodes).containsAllOf(Code.DEADLINE_EXCEEDED, Code.UNAVAILABLE);

assertThat(retrySettings.getMaxAttempts()).isGreaterThan(1);
assertThat(retrySettings.getTotalTimeout()).isGreaterThan(Duration.ZERO);

assertThat(retrySettings.getInitialRetryDelay()).isGreaterThan(Duration.ZERO);
Expand Down
8 changes: 4 additions & 4 deletions google-cloud-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,10 @@
<managedtest.version>0.39.1-alpha-SNAPSHOT</managedtest.version><!-- {x-version-update:google-cloud-managedtest:current} -->
<testing.version>0.39.1-alpha-SNAPSHOT</testing.version><!-- {x-version-update:google-cloud-testing:current} -->

<api-common.version>1.4.0</api-common.version>
<gax.version>1.19.0</gax.version>
<gax-grpc.version>1.19.0</gax-grpc.version>
<gax-httpjson.version>0.36.0</gax-httpjson.version>
<api-common.version>1.5.0</api-common.version>
<gax.version>1.20.0</gax.version>
<gax-grpc.version>1.20.0</gax-grpc.version>
<gax-httpjson.version>0.37.0</gax-httpjson.version>
<generated-proto-beta.version>0.4.0</generated-proto-beta.version>
<generated-proto-ga.version>1.3.0</generated-proto-ga.version>
</properties>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
<site.installationModule>google-cloud</site.installationModule>
<bom.version>0.39.1-alpha-SNAPSHOT</bom.version><!-- {x-version-update:google-cloud-pom:current} -->
<api-client.version>1.23.0</api-client.version>
<gax.version>1.19.0</gax.version>
<gax.version>1.20.0</gax.version>
<google.auth.version>0.9.0</google.auth.version>
<grpc.version>1.9.0</grpc.version>
<nettyssl.version>2.0.7.Final</nettyssl.version>
Expand Down