diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDMLTransaction.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDMLTransaction.java index fac3e27d88b1..ded74ce85a17 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDMLTransaction.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDMLTransaction.java @@ -29,6 +29,7 @@ import com.google.spanner.v1.TransactionSelector; import java.util.Map; import java.util.concurrent.Callable; +import org.threeten.bp.Duration; /** Partitioned DML transaction for bulk updates and deletes. */ class PartitionedDMLTransaction implements SessionTransaction { @@ -62,7 +63,7 @@ private ByteString initTransaction() { * Executes the {@link Statement} using a partitioned dml transaction with automatic retry if the * transaction was aborted. */ - long executePartitionedUpdate(final Statement statement) { + long executePartitionedUpdate(final Statement statement, final Duration timeout) { checkState(isValid, "Partitioned DML has been invalidated by a new operation on the session"); Callable callable = new Callable() { @@ -83,7 +84,7 @@ public com.google.spanner.v1.ResultSet call() throws Exception { builder.putParamTypes(param.getKey(), param.getValue().getType().toProto()); } } - return rpc.executeQuery(builder.build(), session.getOptions()); + return rpc.executePartitionedDml(builder.build(), session.getOptions(), timeout); } }; com.google.spanner.v1.ResultSet resultSet = diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index ef514fba1a9c..5df62a9baee7 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -100,7 +100,7 @@ public String getName() { public long executePartitionedUpdate(Statement stmt) { setActive(null); PartitionedDMLTransaction txn = new PartitionedDMLTransaction(this, spanner.getRpc()); - return txn.executePartitionedUpdate(stmt); + return txn.executePartitionedUpdate(stmt, spanner.getOptions().getPartitionedDmlTimeout()); } @Override diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java index a1f306372331..7f4ca2407f91 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java @@ -43,6 +43,7 @@ import java.net.URL; import java.util.Map; import java.util.Set; +import org.threeten.bp.Duration; /** Options for the Cloud Spanner service. */ public class SpannerOptions extends ServiceOptions { @@ -68,6 +69,7 @@ public class SpannerOptions extends ServiceOptions { private final SpannerStubSettings spannerStubSettings; private final InstanceAdminStubSettings instanceAdminStubSettings; private final DatabaseAdminStubSettings databaseAdminStubSettings; + private final Duration partitionedDmlTimeout; /** Default implementation of {@code SpannerFactory}. */ private static class DefaultSpannerFactory implements SpannerFactory { @@ -114,6 +116,7 @@ private SpannerOptions(Builder builder) { } catch (IOException e) { throw SpannerExceptionFactory.newSpannerException(e); } + partitionedDmlTimeout = builder.partitionedDmlTimeout; } /** Builder for {@link SpannerOptions} instances. */ @@ -139,6 +142,7 @@ public static class Builder InstanceAdminStubSettings.newBuilder(); private DatabaseAdminStubSettings.Builder databaseAdminStubSettingsBuilder = DatabaseAdminStubSettings.newBuilder(); + private Duration partitionedDmlTimeout = Duration.ofHours(2L); private Builder() {} @@ -151,6 +155,7 @@ private Builder() {} this.spannerStubSettingsBuilder = options.spannerStubSettings.toBuilder(); this.instanceAdminStubSettingsBuilder = options.instanceAdminStubSettings.toBuilder(); this.databaseAdminStubSettingsBuilder = options.databaseAdminStubSettings.toBuilder(); + this.partitionedDmlTimeout = options.partitionedDmlTimeout; this.channelProvider = options.channelProvider; this.channelConfigurator = options.channelConfigurator; this.interceptorProvider = options.interceptorProvider; @@ -328,6 +333,15 @@ public DatabaseAdminStubSettings.Builder getDatabaseAdminStubSettingsBuilder() { return databaseAdminStubSettingsBuilder; } + /** + * Sets a timeout specifically for Partitioned DML statements executed through {@link + * DatabaseClient#executePartitionedUpdate(Statement)}. The default is 2 hours. + */ + public Builder setPartitionedDmlTimeout(Duration timeout) { + this.partitionedDmlTimeout = timeout; + return this; + } + /** * Specifying this will allow the client to prefetch up to {@code prefetchChunks} {@code * PartialResultSet} chunks for each read and query. The data size of each chunk depends on the @@ -396,6 +410,10 @@ public DatabaseAdminStubSettings getDatabaseAdminStubSettings() { return databaseAdminStubSettings; } + public Duration getPartitionedDmlTimeout() { + return partitionedDmlTimeout; + } + public int getPrefetchChunks() { return prefetchChunks; } diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index e309f60ab8a4..e9739501ca25 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -511,6 +511,13 @@ public ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map options, Duration timeout) { + GrpcCallContext context = newCallContext(options, request.getSession(), timeout); + return get(spannerStub.executeSqlCallable().futureCall(request, context)); + } + @Override public StreamingCall executeQuery( ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map options) { @@ -591,11 +598,19 @@ private static T get(final Future future) throws SpannerException { } private GrpcCallContext newCallContext(@Nullable Map options, String resource) { + return newCallContext(options, resource, null); + } + + private GrpcCallContext newCallContext( + @Nullable Map options, String resource, Duration timeout) { GrpcCallContext context = GrpcCallContext.createDefault(); if (options != null) { context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()); } context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName)); + if (timeout != null) { + context = context.withTimeout(timeout); + } return context.withStreamWaitTimeout(waitTimeout).withStreamIdleTimeout(idleTimeout); } diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java index 593ba3c5ec06..632ded0aee71 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; +import org.threeten.bp.Duration; /** * Abstracts remote calls to the Cloud Spanner service. Typically end-consumer code will never use @@ -213,6 +214,9 @@ StreamingCall read( ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map options); + ResultSet executePartitionedDml( + ExecuteSqlRequest request, @Nullable Map options, Duration timeout); + StreamingCall executeQuery( ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map options); diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 8bed4e561971..0a90e5bdcd5f 100644 --- a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -19,10 +19,14 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; import com.google.api.gax.grpc.testing.LocalChannelProvider; +import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.NoCredentials; +import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; +import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.protobuf.ListValue; import com.google.spanner.v1.ResultSetMetadata; import com.google.spanner.v1.StructType; @@ -32,6 +36,7 @@ import io.grpc.Status; import io.grpc.inprocess.InProcessServerBuilder; import java.io.IOException; +import java.util.concurrent.ScheduledThreadPoolExecutor; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -39,6 +44,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.threeten.bp.Duration; @RunWith(JUnit4.class) public class DatabaseClientImplTest { @@ -89,7 +95,8 @@ public static void startStaticServer() throws IOException { String uniqueName = InProcessServerBuilder.generateName(); server = InProcessServerBuilder.forName(uniqueName) - .directExecutor() + // We need to use a real executor for timeouts to occur. + .scheduledExecutorService(new ScheduledThreadPoolExecutor(1)) .addService(mockSpanner) .build() .start(); @@ -97,13 +104,15 @@ public static void startStaticServer() throws IOException { } @AfterClass - public static void stopServer() { + public static void stopServer() throws InterruptedException { server.shutdown(); + server.awaitTermination(); } @Before public void setUp() throws IOException { mockSpanner.reset(); + mockSpanner.removeAllExecutionTimes(); spanner = SpannerOptions.newBuilder() .setProjectId("[PROJECT]") @@ -158,4 +167,89 @@ public void testExecutePartitionedDmlWithException() { spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); client.executePartitionedUpdate(INVALID_UPDATE_STATEMENT); } + + @Test + public void testPartitionedDmlDoesNotTimeout() throws Exception { + mockSpanner.setExecuteSqlExecutionTime(SimulatedExecutionTime.ofMinimumAndRandomTime(10, 0)); + final RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRpcTimeout(Duration.ofMillis(1L)) + .setMaxRpcTimeout(Duration.ofMillis(1L)) + .setMaxAttempts(1) + .setTotalTimeout(Duration.ofMillis(1L)) + .build(); + SpannerOptions.Builder builder = + SpannerOptions.newBuilder() + .setProjectId("[PROJECT]") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()); + // Set normal DML timeout value. + builder.getSpannerStubSettingsBuilder().executeSqlSettings().setRetrySettings(retrySettings); + try (Spanner spanner = builder.build().getService()) { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + + // PDML should not timeout with these settings. + long updateCount = client.executePartitionedUpdate(UPDATE_STATEMENT); + assertThat(updateCount, is(equalTo(UPDATE_COUNT))); + + // Normal DML should timeout. + try { + client + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + transaction.executeUpdate(UPDATE_STATEMENT); + return null; + } + }); + fail("expected DEADLINE_EXCEEDED"); + } catch (SpannerException e) { + if (e.getErrorCode() != ErrorCode.DEADLINE_EXCEEDED) { + fail("expected DEADLINE_EXCEEDED"); + } + } + } + } + + @Test + public void testPartitionedDmlWithTimeout() throws Exception { + mockSpanner.setExecuteSqlExecutionTime(SimulatedExecutionTime.ofMinimumAndRandomTime(10, 0)); + SpannerOptions.Builder builder = + SpannerOptions.newBuilder() + .setProjectId("[PROJECT]") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()); + // Set PDML timeout value. + builder.setPartitionedDmlTimeout(Duration.ofMillis(1L)); + try (Spanner spanner = builder.build().getService()) { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + + // PDML should timeout with these settings. + try { + client.executePartitionedUpdate(UPDATE_STATEMENT); + fail("expected DEADLINE_EXCEEDED"); + } catch (SpannerException e) { + if (e.getErrorCode() != ErrorCode.DEADLINE_EXCEEDED) { + fail("expected DEADLINE_EXCEEDED"); + } + } + + // Normal DML should not timeout. + long updateCount = + client + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Long run(TransactionContext transaction) throws Exception { + return transaction.executeUpdate(UPDATE_STATEMENT); + } + }); + assertThat(updateCount, is(equalTo(UPDATE_COUNT))); + } + } }