diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDMLTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDMLTransaction.java deleted file mode 100644 index a938d174476..00000000000 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDMLTransaction.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Copyright 2019 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.spanner; - -import static com.google.common.base.Preconditions.checkState; - -import com.google.api.gax.grpc.GrpcStatusCode; -import com.google.api.gax.rpc.DeadlineExceededException; -import com.google.api.gax.rpc.ServerStream; -import com.google.api.gax.rpc.UnavailableException; -import com.google.cloud.spanner.SessionImpl.SessionTransaction; -import com.google.cloud.spanner.spi.v1.SpannerRpc; -import com.google.common.base.Stopwatch; -import com.google.protobuf.ByteString; -import com.google.spanner.v1.BeginTransactionRequest; -import com.google.spanner.v1.ExecuteSqlRequest; -import com.google.spanner.v1.ExecuteSqlRequest.QueryMode; -import com.google.spanner.v1.PartialResultSet; -import com.google.spanner.v1.Transaction; -import com.google.spanner.v1.TransactionOptions; -import com.google.spanner.v1.TransactionSelector; -import io.grpc.Status.Code; -import io.opencensus.trace.Span; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; -import org.threeten.bp.Duration; -import org.threeten.bp.temporal.ChronoUnit; - -/** Partitioned DML transaction for bulk updates and deletes. */ -class PartitionedDMLTransaction implements SessionTransaction { - private static final Logger log = Logger.getLogger(PartitionedDMLTransaction.class.getName()); - - private final SessionImpl session; - private final SpannerRpc rpc; - private volatile boolean isValid = true; - - PartitionedDMLTransaction(SessionImpl session, SpannerRpc rpc) { - this.session = session; - this.rpc = rpc; - } - - private ByteString initTransaction() { - final BeginTransactionRequest request = - BeginTransactionRequest.newBuilder() - .setSession(session.getName()) - .setOptions( - TransactionOptions.newBuilder() - .setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance())) - .build(); - Transaction txn = rpc.beginTransaction(request, session.getOptions()); - if (txn.getId().isEmpty()) { - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.INTERNAL, - "Failed to init transaction, missing transaction id\n" + session.getName()); - } - return txn.getId(); - } - - /** - * Executes the {@link Statement} using a partitioned dml transaction with automatic retry if the - * transaction was aborted. The update method uses the ExecuteStreamingSql RPC to execute the - * statement, and will retry the stream if an {@link UnavailableException} is thrown, using the - * last seen resume token if the server returns any. - */ - long executeStreamingPartitionedUpdate(final Statement statement, final Duration timeout) { - checkState(isValid, "Partitioned DML has been invalidated by a new operation on the session"); - log.log(Level.FINER, "Starting PartitionedUpdate statement"); - boolean foundStats = false; - long updateCount = 0L; - Stopwatch stopWatch = createStopwatchStarted(); - try { - // Loop to catch AbortedExceptions. - while (true) { - ByteString resumeToken = ByteString.EMPTY; - try { - ByteString transactionId = initTransaction(); - final ExecuteSqlRequest.Builder builder = - ExecuteSqlRequest.newBuilder() - .setSql(statement.getSql()) - .setQueryMode(QueryMode.NORMAL) - .setSession(session.getName()) - .setTransaction(TransactionSelector.newBuilder().setId(transactionId).build()); - Map stmtParameters = statement.getParameters(); - if (!stmtParameters.isEmpty()) { - com.google.protobuf.Struct.Builder paramsBuilder = builder.getParamsBuilder(); - for (Map.Entry param : stmtParameters.entrySet()) { - paramsBuilder.putFields(param.getKey(), param.getValue().toProto()); - builder.putParamTypes(param.getKey(), param.getValue().getType().toProto()); - } - } - while (true) { - Duration remainingTimeout = - timeout.minus(stopWatch.elapsed(TimeUnit.MILLISECONDS), ChronoUnit.MILLIS); - if (remainingTimeout.isNegative() || remainingTimeout.isZero()) { - // The total deadline has been exceeded while retrying. - throw new DeadlineExceededException( - null, GrpcStatusCode.of(Code.DEADLINE_EXCEEDED), false); - } - try { - builder.setResumeToken(resumeToken); - ServerStream stream = - rpc.executeStreamingPartitionedDml( - builder.build(), session.getOptions(), remainingTimeout); - for (PartialResultSet rs : stream) { - if (rs.getResumeToken() != null && !ByteString.EMPTY.equals(rs.getResumeToken())) { - resumeToken = rs.getResumeToken(); - } - if (rs.hasStats()) { - foundStats = true; - updateCount += rs.getStats().getRowCountLowerBound(); - } - } - break; - } catch (UnavailableException e) { - // Retry the stream in the same transaction if the stream breaks with - // UnavailableException and we have a resume token. Otherwise, we just retry the - // entire transaction. - if (!ByteString.EMPTY.equals(resumeToken)) { - log.log( - Level.FINER, - "Retrying PartitionedDml stream using resume token '" - + resumeToken.toStringUtf8() - + "' because of broken stream", - e); - } else { - throw new com.google.api.gax.rpc.AbortedException( - e, GrpcStatusCode.of(Code.ABORTED), true); - } - } - } - break; - } catch (com.google.api.gax.rpc.AbortedException e) { - // Retry using a new transaction but with the same session if the transaction is aborted. - log.log(Level.FINER, "Retrying PartitionedDml transaction after AbortedException", e); - } - } - if (!foundStats) { - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.INVALID_ARGUMENT, - "Partitioned DML response missing stats possibly due to non-DML statement as input"); - } - log.log(Level.FINER, "Finished PartitionedUpdate statement"); - return updateCount; - } catch (Exception e) { - throw SpannerExceptionFactory.newSpannerException(e); - } - } - - Stopwatch createStopwatchStarted() { - return Stopwatch.createStarted(); - } - - @Override - public void invalidate() { - isValid = false; - } - - // No-op method needed to implement SessionTransaction interface. - @Override - public void setSpan(Span span) {} -} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java new file mode 100644 index 00000000000..4d2b01146e8 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java @@ -0,0 +1,196 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.rpc.AbortedException; +import com.google.api.gax.rpc.DeadlineExceededException; +import com.google.api.gax.rpc.ServerStream; +import com.google.api.gax.rpc.UnavailableException; +import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.common.base.Stopwatch; +import com.google.common.base.Ticker; +import com.google.protobuf.ByteString; +import com.google.spanner.v1.BeginTransactionRequest; +import com.google.spanner.v1.ExecuteSqlRequest; +import com.google.spanner.v1.PartialResultSet; +import com.google.spanner.v1.Transaction; +import com.google.spanner.v1.TransactionOptions; +import com.google.spanner.v1.TransactionSelector; +import io.grpc.Status; +import io.opencensus.trace.Span; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.threeten.bp.Duration; +import org.threeten.bp.temporal.ChronoUnit; + +public class PartitionedDmlTransaction implements SessionImpl.SessionTransaction { + private static final Logger LOGGER = Logger.getLogger(PartitionedDmlTransaction.class.getName()); + + private final SessionImpl session; + private final SpannerRpc rpc; + private final Ticker ticker; + private volatile boolean isValid = true; + + PartitionedDmlTransaction(SessionImpl session, SpannerRpc rpc, Ticker ticker) { + this.session = session; + this.rpc = rpc; + this.ticker = ticker; + } + + /** + * Executes the {@link Statement} using a partitioned dml transaction with automatic retry if the + * transaction was aborted. The update method uses the ExecuteStreamingSql RPC to execute the + * statement, and will retry the stream if an {@link UnavailableException} is thrown, using the + * last seen resume token if the server returns any. + */ + long executeStreamingPartitionedUpdate(final Statement statement, final Duration timeout) { + checkState(isValid, "Partitioned DML has been invalidated by a new operation on the session"); + LOGGER.log(Level.FINER, "Starting PartitionedUpdate statement"); + + ByteString resumeToken = ByteString.EMPTY; + boolean foundStats = false; + long updateCount = 0L; + Stopwatch stopwatch = Stopwatch.createStarted(ticker); + + try { + ExecuteSqlRequest request = newTransactionRequestFrom(statement); + + while (true) { + final Duration remainingTimeout = tryUpdateTimeout(timeout, stopwatch); + + try { + ServerStream stream = + rpc.executeStreamingPartitionedDml(request, session.getOptions(), remainingTimeout); + + for (PartialResultSet rs : stream) { + if (rs.getResumeToken() != null && !rs.getResumeToken().isEmpty()) { + resumeToken = rs.getResumeToken(); + } + if (rs.hasStats()) { + foundStats = true; + updateCount += rs.getStats().getRowCountLowerBound(); + } + } + break; + } catch (UnavailableException e) { + LOGGER.log( + Level.FINER, "Retrying PartitionedDml transaction after UnavailableException", e); + request = resumeOrRestartRequest(resumeToken, statement, request); + } catch (AbortedException e) { + LOGGER.log(Level.FINER, "Retrying PartitionedDml transaction after AbortedException", e); + resumeToken = ByteString.EMPTY; + foundStats = false; + updateCount = 0L; + request = newTransactionRequestFrom(statement); + } + } + if (!foundStats) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.INVALID_ARGUMENT, + "Partitioned DML response missing stats possibly due to non-DML statement as input"); + } + LOGGER.log(Level.FINER, "Finished PartitionedUpdate statement"); + return updateCount; + } catch (Exception e) { + throw SpannerExceptionFactory.newSpannerException(e); + } + } + + @Override + public void invalidate() { + isValid = false; + } + + // No-op method needed to implement SessionTransaction interface. + @Override + public void setSpan(Span span) {} + + private Duration tryUpdateTimeout(final Duration timeout, final Stopwatch stopwatch) { + final Duration remainingTimeout = + timeout.minus(stopwatch.elapsed(TimeUnit.MILLISECONDS), ChronoUnit.MILLIS); + if (remainingTimeout.isNegative() || remainingTimeout.isZero()) { + // The total deadline has been exceeded while retrying. + throw new DeadlineExceededException( + null, GrpcStatusCode.of(Status.Code.DEADLINE_EXCEEDED), false); + } + return remainingTimeout; + } + + private ExecuteSqlRequest resumeOrRestartRequest( + final ByteString resumeToken, + final Statement statement, + final ExecuteSqlRequest originalRequest) { + if (resumeToken.isEmpty()) { + return newTransactionRequestFrom(statement); + } else { + return ExecuteSqlRequest.newBuilder(originalRequest).setResumeToken(resumeToken).build(); + } + } + + private ExecuteSqlRequest newTransactionRequestFrom(final Statement statement) { + ByteString transactionId = initTransaction(); + + final TransactionSelector transactionSelector = + TransactionSelector.newBuilder().setId(transactionId).build(); + final ExecuteSqlRequest.Builder builder = + ExecuteSqlRequest.newBuilder() + .setSql(statement.getSql()) + .setQueryMode(ExecuteSqlRequest.QueryMode.NORMAL) + .setSession(session.getName()) + .setTransaction(transactionSelector); + + setParameters(builder, statement.getParameters()); + + builder.setResumeToken(ByteString.EMPTY); + + return builder.build(); + } + + private ByteString initTransaction() { + final BeginTransactionRequest request = + BeginTransactionRequest.newBuilder() + .setSession(session.getName()) + .setOptions( + TransactionOptions.newBuilder() + .setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance())) + .build(); + Transaction tx = rpc.beginTransaction(request, session.getOptions()); + if (tx.getId().isEmpty()) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.INTERNAL, + "Failed to init transaction, missing transaction id\n" + session.getName()); + } + return tx.getId(); + } + + private void setParameters( + final ExecuteSqlRequest.Builder requestBuilder, + final Map statementParameters) { + if (!statementParameters.isEmpty()) { + com.google.protobuf.Struct.Builder paramsBuilder = requestBuilder.getParamsBuilder(); + for (Map.Entry param : statementParameters.entrySet()) { + paramsBuilder.putFields(param.getKey(), param.getValue().toProto()); + requestBuilder.putParamTypes(param.getKey(), param.getValue().getType().toProto()); + } + } + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index ce4d27e94ea..6a91d85fef4 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -28,6 +28,7 @@ import com.google.cloud.spanner.SessionClient.SessionId; import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.common.base.Ticker; import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; @@ -114,7 +115,8 @@ void setCurrentSpan(Span span) { @Override public long executePartitionedUpdate(Statement stmt) { setActive(null); - PartitionedDMLTransaction txn = new PartitionedDMLTransaction(this, spanner.getRpc()); + PartitionedDmlTransaction txn = + new PartitionedDmlTransaction(this, spanner.getRpc(), Ticker.systemTicker()); return txn.executeStreamingPartitionedUpdate( stmt, spanner.getOptions().getPartitionedDmlTimeout()); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java index 8a158eb12e6..5315114aac9 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java @@ -30,7 +30,6 @@ import com.google.api.gax.rpc.ServerStream; import com.google.api.gax.rpc.UnavailableException; import com.google.cloud.spanner.spi.v1.SpannerRpc; -import com.google.common.base.Stopwatch; import com.google.common.base.Ticker; import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; @@ -64,6 +63,10 @@ public class PartitionedDmlTransactionTest { @Mock private SessionImpl session; + @Mock private Ticker ticker; + + private PartitionedDmlTransaction tx; + private final String sessionId = "projects/p/instances/i/databases/d/sessions/s"; private final ByteString txId = ByteString.copyFromUtf8("tx"); private final ByteString resumeToken = ByteString.copyFromUtf8("resume"); @@ -85,6 +88,8 @@ public void setup() { when(session.getOptions()).thenReturn(Collections.EMPTY_MAP); when(rpc.beginTransaction(any(BeginTransactionRequest.class), anyMap())) .thenReturn(Transaction.newBuilder().setId(txId).build()); + + tx = new PartitionedDmlTransaction(session, rpc, ticker); } @Test @@ -98,8 +103,8 @@ public void testExecuteStreamingPartitionedUpdate() { Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class))) .thenReturn(stream); - PartitionedDMLTransaction tx = new PartitionedDMLTransaction(session, rpc); long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); + assertThat(count).isEqualTo(1000L); verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap()); verify(rpc) @@ -127,8 +132,8 @@ public void testExecuteStreamingPartitionedUpdateAborted() { any(ExecuteSqlRequest.class), anyMap(), any(Duration.class))) .thenReturn(stream1, stream2); - PartitionedDMLTransaction tx = new PartitionedDMLTransaction(session, rpc); long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); + assertThat(count).isEqualTo(1000L); verify(rpc, times(2)).beginTransaction(any(BeginTransactionRequest.class), anyMap()); verify(rpc, times(2)) @@ -159,8 +164,8 @@ public void testExecuteStreamingPartitionedUpdateUnavailable() { Mockito.eq(executeRequestWithResumeToken), anyMap(), any(Duration.class))) .thenReturn(stream2); - PartitionedDMLTransaction tx = new PartitionedDMLTransaction(session, rpc); long count = tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); + assertThat(count).isEqualTo(1000L); verify(rpc).beginTransaction(any(BeginTransactionRequest.class), anyMap()); verify(rpc) @@ -186,17 +191,8 @@ public void testExecuteStreamingPartitionedUpdateUnavailableAndThenDeadlineExcee when(rpc.executeStreamingPartitionedDml( Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class))) .thenReturn(stream1); + when(ticker.read()).thenReturn(0L, 1L, TimeUnit.NANOSECONDS.convert(10L, TimeUnit.MINUTES)); - PartitionedDMLTransaction tx = - new PartitionedDMLTransaction(session, rpc) { - @Override - Stopwatch createStopwatchStarted() { - Ticker ticker = mock(Ticker.class); - when(ticker.read()) - .thenReturn(0L, 1L, TimeUnit.NANOSECONDS.convert(10L, TimeUnit.MINUTES)); - return Stopwatch.createStarted(ticker); - } - }; try { tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); fail("missing expected DEADLINE_EXCEEDED exception"); @@ -224,17 +220,8 @@ public void testExecuteStreamingPartitionedUpdateAbortedAndThenDeadlineExceeded( when(rpc.executeStreamingPartitionedDml( Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class))) .thenReturn(stream1); + when(ticker.read()).thenReturn(0L, 1L, TimeUnit.NANOSECONDS.convert(10L, TimeUnit.MINUTES)); - PartitionedDMLTransaction tx = - new PartitionedDMLTransaction(session, rpc) { - @Override - Stopwatch createStopwatchStarted() { - Ticker ticker = mock(Ticker.class); - when(ticker.read()) - .thenReturn(0L, 1L, TimeUnit.NANOSECONDS.convert(10L, TimeUnit.MINUTES)); - return Stopwatch.createStarted(ticker); - } - }; try { tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); fail("missing expected DEADLINE_EXCEEDED exception"); @@ -262,25 +249,17 @@ public void testExecuteStreamingPartitionedUpdateMultipleAbortsUntilDeadlineExce when(rpc.executeStreamingPartitionedDml( Mockito.eq(executeRequestWithoutResumeToken), anyMap(), any(Duration.class))) .thenReturn(stream1); + when(ticker.read()) + .thenAnswer( + new Answer() { + long ticks = 0L; - PartitionedDMLTransaction tx = - new PartitionedDMLTransaction(session, rpc) { - long ticks = 0L; + @Override + public Long answer(InvocationOnMock invocation) throws Throwable { + return TimeUnit.NANOSECONDS.convert(++ticks, TimeUnit.MINUTES); + } + }); - @Override - Stopwatch createStopwatchStarted() { - Ticker ticker = mock(Ticker.class); - when(ticker.read()) - .thenAnswer( - new Answer() { - @Override - public Long answer(InvocationOnMock invocation) throws Throwable { - return TimeUnit.NANOSECONDS.convert(++ticks, TimeUnit.MINUTES); - } - }); - return Stopwatch.createStarted(ticker); - } - }; try { tx.executeStreamingPartitionedUpdate(Statement.of(sql), Duration.ofMinutes(10)); fail("missing expected DEADLINE_EXCEEDED exception");