Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;

import com.google.api.core.ApiFunction;
import com.google.api.core.NanoClock;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.grpc.GaxGrpcProperties;
Expand All @@ -28,10 +29,12 @@
import com.google.api.gax.rpc.ApiClientHeaderProvider;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.InstantiatingWatchdogProvider;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.rpc.WatchdogProvider;
import com.google.api.pathtemplate.PathTemplate;
import com.google.cloud.ServiceOptions;
import com.google.cloud.grpc.GrpcTransportOptions;
Expand All @@ -50,7 +53,9 @@
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.longrunning.GetOperationRequest;
import com.google.longrunning.Operation;
import com.google.protobuf.Empty;
import com.google.protobuf.FieldMask;
import com.google.spanner.admin.database.v1.CreateDatabaseMetadata;
Expand Down Expand Up @@ -82,10 +87,10 @@
import com.google.spanner.v1.CreateSessionRequest;
import com.google.spanner.v1.DeleteSessionRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.PartitionQueryRequest;
import com.google.spanner.v1.PartitionReadRequest;
import com.google.spanner.v1.PartitionResponse;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ReadRequest;
import com.google.spanner.v1.RollbackRequest;
import com.google.spanner.v1.Session;
Expand All @@ -96,16 +101,22 @@
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.annotation.Nullable;

import com.google.longrunning.Operation;
import org.threeten.bp.Duration;

/** Implementation of Cloud Spanner remote calls using Gapic libraries. */
public class GapicSpannerRpc implements SpannerRpc {

private static final PathTemplate PROJECT_NAME_TEMPLATE =
PathTemplate.create("projects/{project}");
private static final String PROPERTY_TIMEOUT_SECONDS =
"com.google.cloud.spanner.watchdogTimeoutSeconds";
private static final String PROPERTY_PERIOD_SECONDS =
"com.google.cloud.spanner.watchdogPeriodSeconds";
private static final int DEFAULT_TIMEOUT_SECONDS = 30 * 60;
private static final int DEFAULT_PERIOD_SECONDS = 10;

private final SpannerStub stub;
private final InstanceAdminStub instanceStub;
Expand Down Expand Up @@ -157,55 +168,70 @@ public GapicSpannerRpc(SpannerOptions options) throws IOException {
CredentialsProvider credentialsProvider =
GrpcTransportOptions.setUpCredentialsProvider(options);

Duration checkInterval = Duration.ofSeconds(systemProperty(PROPERTY_PERIOD_SECONDS, DEFAULT_TIMEOUT_SECONDS));
Duration idleTimeout = Duration.ofSeconds(systemProperty(PROPERTY_PERIOD_SECONDS, DEFAULT_PERIOD_SECONDS));

WatchdogProvider watchdogProvider =
InstantiatingWatchdogProvider.create()
.withClock(NanoClock.getDefaultClock())
.withCheckInterval(checkInterval)
.withExecutor(Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Cloud-Spanner-WatchdogInterceptor-%d")
.build()));

// Disabling retry for now because spanner handles retry in SpannerImpl.
// We will finally want to improve gax but for smooth transitioning we
// preserve the retry in SpannerImpl
try {
// TODO: bump the version of gax and remove this try-catch block
// applyToAllUnaryMethods does not throw exception in the latest version
this.stub =
GrpcSpannerStub.create(
SpannerStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.applyToAllUnaryMethods(
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
@Override
public Void apply(UnaryCallSettings.Builder<?,?> builder) {
builder.setRetryableCodes(ImmutableSet.<StatusCode.Code>of());
return null;
}
})
.build());

this.instanceStub =
GrpcInstanceAdminStub.create(
InstanceAdminStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.applyToAllUnaryMethods(
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
@Override
public Void apply(UnaryCallSettings.Builder<?,?> builder) {
builder.setRetryableCodes(ImmutableSet.<StatusCode.Code>of());
return null;
}
})
.build());
this.databaseStub =
GrpcDatabaseAdminStub.create(
DatabaseAdminStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.applyToAllUnaryMethods(
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
@Override
public Void apply(UnaryCallSettings.Builder<?,?> builder) {
builder.setRetryableCodes(ImmutableSet.<StatusCode.Code>of());
return null;
}
})
.build());
SpannerStubSettings.Builder spannerStubSettingBuilder =
SpannerStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.applyToAllUnaryMethods(
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
@Override
public Void apply(UnaryCallSettings.Builder<?, ?> builder) {
builder.setRetryableCodes(ImmutableSet.<StatusCode.Code>of());
return null;
}
})
.setStreamWatchdogProvider(watchdogProvider);
spannerStubSettingBuilder.streamingReadSettings().setIdleTimeout(idleTimeout);
spannerStubSettingBuilder.executeStreamingSqlSettings().setIdleTimeout(idleTimeout);
this.stub = GrpcSpannerStub.create(spannerStubSettingBuilder.build());

this.instanceStub =
GrpcInstanceAdminStub.create(
InstanceAdminStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.applyToAllUnaryMethods(
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
@Override
public Void apply(UnaryCallSettings.Builder<?, ?> builder) {
builder.setRetryableCodes(ImmutableSet.<StatusCode.Code>of());
return null;
}
})
.build());
this.databaseStub =
GrpcDatabaseAdminStub.create(
DatabaseAdminStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.applyToAllUnaryMethods(
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
@Override
public Void apply(UnaryCallSettings.Builder<?, ?> builder) {
builder.setRetryableCodes(ImmutableSet.<StatusCode.Code>of());
return null;
}
})
.build());
} catch (Exception e) {
throw SpannerExceptionFactory.newSpannerException(e);
}
Expand Down Expand Up @@ -470,4 +496,9 @@ private GrpcCallContext newCallContext(@Nullable Map<Option, ?> options, String
metadataProvider.newExtraHeaders(resource, projectName));
return context;
}

private static int systemProperty(String name, int defaultValue) {
String stringValue = System.getProperty(name, "");
return stringValue.isEmpty() ? defaultValue : Integer.parseInt(stringValue);
}
}