From eaa7078e58e478a12f387bdeb2df4a9fd4b03146 Mon Sep 17 00:00:00 2001 From: Hanzhen Yi Date: Thu, 17 May 2018 13:41:18 -0700 Subject: [PATCH] Make watchdog work --- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 123 +++++++++++------- 1 file changed, 77 insertions(+), 46 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 366189e1039a..370a1dd0c001 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -19,6 +19,7 @@ import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException; import com.google.api.core.ApiFunction; +import com.google.api.core.NanoClock; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.GaxProperties; import com.google.api.gax.grpc.GaxGrpcProperties; @@ -28,10 +29,12 @@ import com.google.api.gax.rpc.ApiClientHeaderProvider; import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.api.gax.rpc.HeaderProvider; +import com.google.api.gax.rpc.InstantiatingWatchdogProvider; import com.google.api.gax.rpc.ServerStream; import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.UnaryCallSettings; +import com.google.api.gax.rpc.WatchdogProvider; import com.google.api.pathtemplate.PathTemplate; import com.google.cloud.ServiceOptions; import com.google.cloud.grpc.GrpcTransportOptions; @@ -50,7 +53,9 @@ import com.google.cloud.spanner.v1.stub.SpannerStubSettings; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.longrunning.GetOperationRequest; +import com.google.longrunning.Operation; import com.google.protobuf.Empty; import com.google.protobuf.FieldMask; import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; @@ -82,10 +87,10 @@ import com.google.spanner.v1.CreateSessionRequest; import com.google.spanner.v1.DeleteSessionRequest; import com.google.spanner.v1.ExecuteSqlRequest; +import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.PartitionQueryRequest; import com.google.spanner.v1.PartitionReadRequest; import com.google.spanner.v1.PartitionResponse; -import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.ReadRequest; import com.google.spanner.v1.RollbackRequest; import com.google.spanner.v1.Session; @@ -96,16 +101,22 @@ import java.util.Map; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import javax.annotation.Nullable; - -import com.google.longrunning.Operation; +import org.threeten.bp.Duration; /** Implementation of Cloud Spanner remote calls using Gapic libraries. */ public class GapicSpannerRpc implements SpannerRpc { private static final PathTemplate PROJECT_NAME_TEMPLATE = PathTemplate.create("projects/{project}"); + private static final String PROPERTY_TIMEOUT_SECONDS = + "com.google.cloud.spanner.watchdogTimeoutSeconds"; + private static final String PROPERTY_PERIOD_SECONDS = + "com.google.cloud.spanner.watchdogPeriodSeconds"; + private static final int DEFAULT_TIMEOUT_SECONDS = 30 * 60; + private static final int DEFAULT_PERIOD_SECONDS = 10; private final SpannerStub stub; private final InstanceAdminStub instanceStub; @@ -157,55 +168,70 @@ public GapicSpannerRpc(SpannerOptions options) throws IOException { CredentialsProvider credentialsProvider = GrpcTransportOptions.setUpCredentialsProvider(options); + Duration checkInterval = Duration.ofSeconds(systemProperty(PROPERTY_PERIOD_SECONDS, DEFAULT_TIMEOUT_SECONDS)); + Duration idleTimeout = Duration.ofSeconds(systemProperty(PROPERTY_PERIOD_SECONDS, DEFAULT_PERIOD_SECONDS)); + + WatchdogProvider watchdogProvider = + InstantiatingWatchdogProvider.create() + .withClock(NanoClock.getDefaultClock()) + .withCheckInterval(checkInterval) + .withExecutor(Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("Cloud-Spanner-WatchdogInterceptor-%d") + .build())); + // Disabling retry for now because spanner handles retry in SpannerImpl. // We will finally want to improve gax but for smooth transitioning we // preserve the retry in SpannerImpl try { // TODO: bump the version of gax and remove this try-catch block // applyToAllUnaryMethods does not throw exception in the latest version - this.stub = - GrpcSpannerStub.create( - SpannerStubSettings.newBuilder() - .setTransportChannelProvider(channelProvider) - .setCredentialsProvider(credentialsProvider) - .applyToAllUnaryMethods( - new ApiFunction, Void>() { - @Override - public Void apply(UnaryCallSettings.Builder builder) { - builder.setRetryableCodes(ImmutableSet.of()); - return null; - } - }) - .build()); - - this.instanceStub = - GrpcInstanceAdminStub.create( - InstanceAdminStubSettings.newBuilder() - .setTransportChannelProvider(channelProvider) - .setCredentialsProvider(credentialsProvider) - .applyToAllUnaryMethods( - new ApiFunction, Void>() { - @Override - public Void apply(UnaryCallSettings.Builder builder) { - builder.setRetryableCodes(ImmutableSet.of()); - return null; - } - }) - .build()); - this.databaseStub = - GrpcDatabaseAdminStub.create( - DatabaseAdminStubSettings.newBuilder() - .setTransportChannelProvider(channelProvider) - .setCredentialsProvider(credentialsProvider) - .applyToAllUnaryMethods( - new ApiFunction, Void>() { - @Override - public Void apply(UnaryCallSettings.Builder builder) { - builder.setRetryableCodes(ImmutableSet.of()); - return null; - } - }) - .build()); + SpannerStubSettings.Builder spannerStubSettingBuilder = + SpannerStubSettings.newBuilder() + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .applyToAllUnaryMethods( + new ApiFunction, Void>() { + @Override + public Void apply(UnaryCallSettings.Builder builder) { + builder.setRetryableCodes(ImmutableSet.of()); + return null; + } + }) + .setStreamWatchdogProvider(watchdogProvider); + spannerStubSettingBuilder.streamingReadSettings().setIdleTimeout(idleTimeout); + spannerStubSettingBuilder.executeStreamingSqlSettings().setIdleTimeout(idleTimeout); + this.stub = GrpcSpannerStub.create(spannerStubSettingBuilder.build()); + + this.instanceStub = + GrpcInstanceAdminStub.create( + InstanceAdminStubSettings.newBuilder() + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .applyToAllUnaryMethods( + new ApiFunction, Void>() { + @Override + public Void apply(UnaryCallSettings.Builder builder) { + builder.setRetryableCodes(ImmutableSet.of()); + return null; + } + }) + .build()); + this.databaseStub = + GrpcDatabaseAdminStub.create( + DatabaseAdminStubSettings.newBuilder() + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .applyToAllUnaryMethods( + new ApiFunction, Void>() { + @Override + public Void apply(UnaryCallSettings.Builder builder) { + builder.setRetryableCodes(ImmutableSet.of()); + return null; + } + }) + .build()); } catch (Exception e) { throw SpannerExceptionFactory.newSpannerException(e); } @@ -470,4 +496,9 @@ private GrpcCallContext newCallContext(@Nullable Map options, String metadataProvider.newExtraHeaders(resource, projectName)); return context; } + + private static int systemProperty(String name, int defaultValue) { + String stringValue = System.getProperty(name, ""); + return stringValue.isEmpty() ? defaultValue : Integer.parseInt(stringValue); + } }