diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/spanner/snippets/DatabaseClientSnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/spanner/snippets/DatabaseClientSnippets.java index e349fa5fbde4..f584abf285b1 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/spanner/snippets/DatabaseClientSnippets.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/spanner/snippets/DatabaseClientSnippets.java @@ -23,6 +23,7 @@ package com.google.cloud.examples.spanner.snippets; import com.google.cloud.Timestamp; +import com.google.cloud.spanner.AbortedException; import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.Key; import com.google.cloud.spanner.Mutation; @@ -30,6 +31,7 @@ import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.TimestampBound; import com.google.cloud.spanner.TransactionContext; +import com.google.cloud.spanner.TransactionManager; import com.google.cloud.spanner.TransactionRunner; import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import java.util.Collections; @@ -222,4 +224,31 @@ public Void run(TransactionContext transaction) throws Exception { }); // [END readWriteTransaction] } + + /** + * Example of using {@link TransactionManager}. + */ + // [TARGET transactionManager()] + // [VARIABLE my_singer_id] + public void transactionManager(final long singerId) throws InterruptedException { + // [START transactionManager] + try (TransactionManager manager = dbClient.transactionManager()) { + TransactionContext txn = manager.begin(); + while (true) { + String column = "FirstName"; + Struct row = txn.readRow("Singers", Key.of(singerId), Collections.singleton(column)); + String name = row.getString(column); + txn.buffer( + Mutation.newUpdateBuilder("Singers").set(column).to(name.toUpperCase()).build()); + try { + manager.commit(); + break; + } catch (AbortedException e) { + Thread.sleep(e.getRetryDelayInMillis() / 1000); + txn = manager.resetForRetry(); + } + } + } + // [END transactionManager] + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java index cbcb586e52bf..42be5f606589 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java @@ -237,6 +237,56 @@ public interface DatabaseClient { * }); * * + *

Example of a read write transaction. + *

 {@code
+   * long singerId = my_singer_id;
+   * TransactionRunner runner = dbClient.readWriteTransaction();
+   * runner.run(
+   *     new TransactionCallable() {
+   * 
+   *       @Override
+   *       public Void run(TransactionContext transaction) throws Exception {
+   *         String column = "FirstName";
+   *         Struct row =
+   *             transaction.readRow("Singers", Key.of(singerId), Collections.singleton(column));
+   *         String name = row.getString(column);
+   *         transaction.buffer(
+   *             Mutation.newUpdateBuilder("Singers").set(column).to(name.toUpperCase()).build());
+   *         return null;
+   *       }
+   *     });
+   * }
+ * */ TransactionRunner readWriteTransaction(); + + /** + * Returns a transaction manager which allows manual management of transaction lifecycle. This + * API is meant for advanced users. Most users should instead use the + * {@link #readWriteTransaction()} API instead. + * + *

Example of using {@link TransactionManager}. + *

 {@code
+   * long singerId = my_singer_id;
+   * try (TransactionManager manager = dbClient.transactionManager()) {
+   *   TransactionContext txn = manager.begin();
+   *   while (true) {
+   *     String column = "FirstName";
+   *     Struct row = txn.readRow("Singers", Key.of(singerId), Collections.singleton(column));
+   *     String name = row.getString(column);
+   *     txn.buffer(
+   *         Mutation.newUpdateBuilder("Singers").set(column).to(name.toUpperCase()).build());
+   *     try {
+   *       manager.commit();
+   *       break;
+   *     } catch (AbortedException e) {
+   *       Thread.sleep(e.getRetryDelayInMillis() / 1000);
+   *       txn = manager.resetForRetry();
+   *     }
+   *   }
+   * }
+   * }
+ * + */ + TransactionManager transactionManager(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index af7c113ca42a..b8c51849d5a6 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -144,6 +144,17 @@ public TransactionRunner readWriteTransaction() { } } + @Override + public TransactionManager transactionManager() { + Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); + try (Scope s = tracer.withSpan(span)) { + return pool.getReadWriteSession().transactionManager(); + } catch (RuntimeException e) { + TraceUtil.endSpanWithFailure(span, e); + throw e; + } + } + ListenableFuture closeAsync() { return pool.closeAsync(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 4dbad3043492..8c60e6a416bd 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -195,6 +195,65 @@ public Timestamp getReadTimestamp() { return txn.getReadTimestamp(); } } + + private static class AutoClosingTransactionManager implements TransactionManager { + final TransactionManager delegate; + final PooledSession session; + + AutoClosingTransactionManager(TransactionManager delegate, PooledSession session) { + this.delegate = delegate; + this.session = session; + } + + @Override + public TransactionContext begin() { + return delegate.begin(); + } + + @Override + public void commit() { + try { + delegate.commit(); + } finally { + if (getState() != TransactionState.ABORTED) { + close(); + } + } + } + + @Override + public void rollback() { + try { + delegate.rollback(); + } finally { + close(); + } + } + + @Override + public TransactionContext resetForRetry() { + return delegate.resetForRetry(); + } + + @Override + public Timestamp getCommitTimestamp() { + return delegate.getCommitTimestamp(); + } + + @Override + public void close() { + try { + delegate.close(); + } finally { + session.close(); + } + } + + @Override + public TransactionState getState() { + return delegate.getState(); + } + } // Exception class used just to track the stack trace at the point when a session was handed out // from the pool. @@ -386,6 +445,12 @@ private void keepAlive() { private void markUsed() { lastUseTime = clock.instant(); } + + @Override + public TransactionManager transactionManager() { + markUsed(); + return new AutoClosingTransactionManager(delegate.transactionManager(), this); + } } private static final class SessionOrError { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index 225926a10e25..9960c2218da5 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -898,7 +898,13 @@ public Transaction call() throws Exception { } } - private T setActive(@Nullable T ctx) { + TransactionContextImpl newTransaction() { + TransactionContextImpl txn = new TransactionContextImpl(this, readyTransactionId, rpc, + defaultPrefetchChunks); + return txn; + } + + T setActive(@Nullable T ctx) { if (activeTransaction != null) { activeTransaction.invalidate(); } @@ -906,6 +912,11 @@ private T setActive(@Nullable T ctx) { readyTransactionId = null; return ctx; } + + @Override + public TransactionManager transactionManager() { + return new TransactionManagerImpl(this); + } } /** @@ -914,7 +925,7 @@ private T setActive(@Nullable T ctx) { * transactions, and read-write transactions. The defining characteristic is that a session may * only have one such transaction active at a time. */ - private interface SessionTransaction { + static interface SessionTransaction { /** Invalidates the transaction, generally because a new one has been started on the session. */ void invalidate(); } @@ -1018,7 +1029,7 @@ ResultSet executeQueryInternalWithOptions( ExecuteSqlRequest.newBuilder() .setSql(statement.getSql()) .setQueryMode(queryMode) - .setSession(session.name); + .setSession(session.getName()); Map stmtParameters = statement.getParameters(); if (!stmtParameters.isEmpty()) { com.google.protobuf.Struct.Builder paramsBuilder = builder.getParamsBuilder(); @@ -1217,10 +1228,7 @@ void backoffSleep(Context context, long backoffMillis) { this.session = session; this.sleeper = sleeper; this.span = Tracing.getTracer().getCurrentSpan(); - ByteString transactionId = session.readyTransactionId; - session.readyTransactionId = null; - this.txn = new TransactionContextImpl(session, transactionId, rpc, defaultPrefetchChunks, - span); + this.txn = session.newTransaction(); } TransactionRunnerImpl(SessionImpl session, SpannerRpc rpc, int defaultPrefetchChunks) { @@ -1230,7 +1238,7 @@ void backoffSleep(Context context, long backoffMillis) { @Nullable @Override public T run(TransactionCallable callable) { - try { + try (Scope s = tracer.withSpan(span)) { return runInternal(callable); } catch (RuntimeException e) { TraceUtil.endSpanWithFailure(span, e); @@ -1244,6 +1252,7 @@ private T runInternal(TransactionCallable callable) { BackOff backoff = newBackOff(); final Context context = Context.current(); int attempt = 0; + // TODO: Change this to use TransactionManager. while (true) { checkState( isValid, "TransactionRunner has been invalidated by a new operation on the session"); @@ -1318,7 +1327,7 @@ public void invalidate() { private void backoff(Context context, BackOff backoff) { long delay = txn.getRetryDelayInMillis(backoff); - txn = new TransactionContextImpl(session, null, txn.rpc, txn.defaultPrefetchChunks, span); + txn = session.newTransaction(); span.addAnnotation("Backing off", ImmutableMap.of("Delay", AttributeValue.longAttributeValue(delay))); sleeper.backoffSleep(context, delay); @@ -1344,9 +1353,8 @@ static class TransactionContextImpl extends AbstractReadContext implements Trans SessionImpl session, @Nullable ByteString transactionId, SpannerRpc rpc, - int defaultPrefetchChunks, - Span span) { - super(session, rpc, defaultPrefetchChunks, span); + int defaultPrefetchChunks) { + super(session, rpc, defaultPrefetchChunks); this.transactionId = transactionId; } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java new file mode 100644 index 000000000000..bc0b87066a8e --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java @@ -0,0 +1,104 @@ +/* + * Copyright 2017 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import com.google.cloud.Timestamp; + +/** + * An interface for managing the life cycle of a read write transaction including all its retries. + * See {@link TransactionContext} for a description of transaction semantics. + * + *

At any point in time there can be at most one active transaction in this manager. When that + * transaction is committed, if it fails with an {@code ABORTED} error, calling + * {@link #resetForRetry()} would create a new {@link TransactionContext}. The newly created + * transaction would use the same session thus increasing its lock priority. If the transaction is + * committed successfully, or is rolled back or commit fails with any error other than + * {@code ABORTED}, the manager is considered complete and no further transactions are allowed to be + * created in it. + * + *

Every {@code TransactionManager} should either be committed or rolled back. Failure to do so + * can cause resources to be leaked and deadlocks. Easiest way to guarantee this is by calling + * {@link #close()} in a finally block. + * + * @see DatabaseClient#transactionManager() + */ +public interface TransactionManager extends AutoCloseable { + + /** + * State of the transaction manager. + */ + public enum TransactionState { + // Transaction has been started either by calling {@link #begin()} or via + // {@link resetForRetry()} but has not been commited or rolled back yet. + STARTED, + // Transaction was sucessfully committed. This is a terminal state. + COMMITTED, + // Transaction failed during commit with an error other than ABORTED. Transaction cannot be + // retried in this state. This is a terminal state. + COMMIT_FAILED, + // Transaction failed during commit with ABORTED and can be retried. + ABORTED, + // Transaction was rolled back. This is a terminal state. + ROLLED_BACK + } + + /** + * Creates a new read write transaction. This must be called before doing any other operation and + * can only be called once. To create a new transaction for subsequent retries, see + * {@link #resetForRetry()}. + */ + TransactionContext begin(); + + /** + * Commits the currently active transaction. If the transaction was already aborted, then this + * would throw an {@link AbortedException}. + */ + void commit(); + + /** + * Rolls back the currently active transaction. In most cases there should be no need to call this + * explicitly since {@link #close()} would automatically roll back any active transaction. + */ + void rollback(); + + /** + * Creates a new transaction for retry. This should only be called if the previous transaction + * failed with {@code ABORTED}. In all other cases, this will throw an + * {@link IllegalStateException}. Users should backoff before calling this method. Backoff delay + * is specified by {@link SpannerException#getRetryDelayInMillis()} on the + * {@code SpannerException} throw by the previous commit call. + */ + TransactionContext resetForRetry(); + + /** + * Returns the commit timestamp if the transaction committed successfully otherwise it will throw + * {@code IllegalStateException}. + */ + Timestamp getCommitTimestamp(); + + /** + * Returns the state of the transaction. + */ + TransactionState getState(); + + /** + * Closes the manager. If there is an active transaction, it will be rolled back. Underlying + * session will be released back to the session pool. + */ + @Override + void close(); +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java new file mode 100644 index 000000000000..7a68b8b09c64 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java @@ -0,0 +1,133 @@ +/* + * Copyright 2017 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.SpannerImpl.SessionImpl; +import com.google.cloud.spanner.SpannerImpl.SessionTransaction; +import com.google.common.base.Preconditions; + +import io.opencensus.common.Scope; +import io.opencensus.trace.Span; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; + +/** + * Implementation of {@link TransactionManager}. + */ +final class TransactionManagerImpl implements TransactionManager, SessionTransaction { + private static final Tracer tracer = Tracing.getTracer(); + + private final SessionImpl session; + private final Span span; + + private SpannerImpl.TransactionContextImpl txn; + private TransactionState txnState; + + TransactionManagerImpl(SessionImpl session) { + this.session = session; + this.span = Tracing.getTracer().getCurrentSpan(); + } + + @Override + public TransactionContext begin() { + Preconditions.checkState(txn == null, "begin can only be called once"); + try (Scope s = tracer.withSpan(span)) { + txn = session.newTransaction(); + session.setActive(this); + txn.ensureTxn(); + txnState = TransactionState.STARTED; + return txn; + } + } + + @Override + public void commit() { + Preconditions.checkState(txnState == TransactionState.STARTED, "commit can only be invoked if" + + " the transaction is in progress"); + if (txn.isAborted()) { + txnState = TransactionState.ABORTED; + throw SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, + "Transaction already aborted"); + } + try { + txn.commit(); + txnState = TransactionState.COMMITTED; + } catch (AbortedException e1) { + txnState = TransactionState.ABORTED; + throw e1; + } catch (SpannerException e2) { + txnState = TransactionState.COMMIT_FAILED; + throw e2; + } + } + + @Override + public void rollback() { + Preconditions.checkState(txnState == TransactionState.STARTED, + "rollback can only be called if the transaction is in progress"); + try { + txn.rollback(); + } finally { + txnState = TransactionState.ROLLED_BACK; + } + } + + @Override + public TransactionContext resetForRetry() { + if (txn == null + || !txn.isAborted() && txnState != TransactionState.ABORTED) { + throw new IllegalStateException("resetForRetry can only be called if the previous attempt" + + " aborted"); + } + try (Scope s = tracer.withSpan(span)) { + txn = session.newTransaction(); + txn.ensureTxn(); + txnState = TransactionState.STARTED; + return txn; + } + } + + @Override + public Timestamp getCommitTimestamp() { + Preconditions.checkState(txnState == TransactionState.COMMITTED, + "getCommitTimestamp can only be invoked if the transaction committed successfully"); + return txn.commitTimestamp(); + } + + @Override + public void close() { + try { + if (txnState == TransactionState.STARTED && !txn.isAborted()) { + txn.rollback(); + txnState = TransactionState.ROLLED_BACK; + } + } finally { + span.end(); + } + } + + @Override + public TransactionState getState() { + return txnState; + } + + @Override + public void invalidate() { + close(); + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java new file mode 100644 index 000000000000..f3f9ddf2f000 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java @@ -0,0 +1,161 @@ +/* + * Copyright 2017 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +import com.google.cloud.spanner.SpannerImpl.SessionImpl; +import com.google.cloud.spanner.TransactionManager.TransactionState; +import com.google.cloud.Timestamp; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; + +@RunWith(JUnit4.class) +public class TransactionManagerImplTest { + + @Rule public ExpectedException exception = ExpectedException.none(); + + @Mock private SessionImpl session; + @Mock SpannerImpl.TransactionContextImpl txn; + private TransactionManagerImpl manager; + + @Before + public void setUp() { + initMocks(this); + manager = new TransactionManagerImpl(session); + } + + @Test + public void beginCalledTwiceFails() { + when(session.newTransaction()).thenReturn(txn); + assertThat(manager.begin()).isEqualTo(txn); + assertThat(manager.getState()).isEqualTo(TransactionState.STARTED); + exception.expect(IllegalStateException.class); + manager.begin(); + } + + @Test + public void commitBeforeBeginFails() { + exception.expect(IllegalStateException.class); + manager.commit(); + } + + @Test + public void rollbackBeforeBeginFails() { + exception.expect(IllegalStateException.class); + manager.rollback(); + } + + @Test + public void resetBeforeBeginFails() { + exception.expect(IllegalStateException.class); + manager.resetForRetry(); + } + + @Test + public void transactionRolledBackOnClose() { + when(session.newTransaction()).thenReturn(txn); + when(txn.isAborted()).thenReturn(false); + manager.begin(); + manager.close(); + verify(txn).rollback(); + } + + @Test + public void commitSucceeds() { + when(session.newTransaction()).thenReturn(txn); + Timestamp commitTimestamp = Timestamp.ofTimeMicroseconds(1); + when(txn.commitTimestamp()).thenReturn(commitTimestamp); + manager.begin(); + manager.commit(); + assertThat(manager.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(manager.getCommitTimestamp()).isEqualTo(commitTimestamp); + } + + @Test + public void resetAfterSuccessfulCommitFails() { + when(session.newTransaction()).thenReturn(txn); + manager.begin(); + manager.commit(); + exception.expect(IllegalStateException.class); + manager.resetForRetry(); + } + + @Test + public void resetAfterAbortSucceeds() { + when(session.newTransaction()).thenReturn(txn); + manager.begin(); + doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, "")) + .when(txn).commit(); + try { + manager.commit(); + fail("Expected AbortedException"); + } catch (AbortedException e) { + assertThat(manager.getState()).isEqualTo(TransactionState.ABORTED); + } + txn = Mockito.mock(SpannerImpl.TransactionContextImpl.class); + when(session.newTransaction()).thenReturn(txn); + assertThat(manager.resetForRetry()).isEqualTo(txn); + assertThat(manager.getState()).isEqualTo(TransactionState.STARTED); + } + + @Test + public void resetAfterErrorFails() { + when(session.newTransaction()).thenReturn(txn); + manager.begin(); + doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.UNKNOWN, "")) + .when(txn).commit(); + try { + manager.commit(); + fail("Expected AbortedException"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.UNKNOWN); + } + exception.expect(IllegalStateException.class); + manager.resetForRetry(); + } + + @Test + public void rollbackAfterCommitFails() { + when(session.newTransaction()).thenReturn(txn); + manager.begin(); + manager.commit(); + exception.expect(IllegalStateException.class); + manager.rollback(); + } + + @Test + public void commitAfterRollbackFails() { + when(session.newTransaction()).thenReturn(txn); + manager.begin(); + manager.rollback(); + exception.expect(IllegalStateException.class); + manager.commit(); + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java index 3818ad2dfc0d..9362814146ab 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java @@ -13,43 +13,42 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.google.cloud.spanner; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.api.client.util.BackOff; import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.cloud.spanner.spi.v1.SpannerRpc; -import com.google.cloud.spanner.spi.v1.SpannerRpc.Option; -import com.google.protobuf.ByteString; -import com.google.protobuf.Duration; -import com.google.protobuf.Timestamp; -import com.google.rpc.RetryInfo; -import com.google.spanner.v1.CommitRequest; -import com.google.spanner.v1.CommitResponse; import io.grpc.Context; -import io.grpc.Metadata; import io.grpc.Status; import io.grpc.StatusRuntimeException; -import io.grpc.protobuf.ProtoUtils; -import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.MockitoAnnotations; -/** Unit test for {@link com.google.cloud.spanner.SpannerImpl.TransactionRunnerImpl} */ +/** + * Unit test for {@link com.google.cloud.spanner.SpannerImpl.TransactionRunnerImpl} + **/ @RunWith(JUnit4.class) public class TransactionRunnerImplTest { @Mock private SpannerRpc rpc; @Mock private SpannerImpl.SessionImpl session; @Mock private SpannerImpl.TransactionRunnerImpl.Sleeper sleeper; + @Mock private SpannerImpl.TransactionContextImpl txn; private SpannerImpl.TransactionRunnerImpl transactionRunner; private boolean firstRun; @@ -57,89 +56,102 @@ public class TransactionRunnerImplTest { public void setUp() throws Exception { MockitoAnnotations.initMocks(this); firstRun = true; + when(session.newTransaction()).thenReturn(txn); transactionRunner = new SpannerImpl.TransactionRunnerImpl(session, rpc, sleeper, 1); - when(session.beginTransaction()).thenReturn(ByteString.copyFromUtf8("transaction")); - when(session.getName()).thenReturn("fake_session"); } @Test - public void runAbort() { - runTransaction(createRetryException(Status.Code.ABORTED)); - ArgumentCaptor backoffMillis = ArgumentCaptor.forClass(Long.class); - verify(sleeper, times(1)).backoffSleep(Mockito.any(), backoffMillis.capture()); - assertThat(backoffMillis.getValue()).isEqualTo(1001L); + public void commitSucceeds() { + final AtomicInteger numCalls = new AtomicInteger(0); + transactionRunner.run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + numCalls.incrementAndGet(); + return null; + } + }); + assertThat(numCalls.get()).isEqualTo(1); + verify(txn).ensureTxn(); + verify(txn).commit(); } - + @Test - public void runAbortNoRetryInfo() { - runTransaction(new StatusRuntimeException(Status.fromCodeValue(Status.Code.ABORTED.value()))); - ArgumentCaptor backoffMillis = ArgumentCaptor.forClass(Long.class); - verify(sleeper, times(1)).backoffSleep(Mockito.any(), backoffMillis.capture()); - assertThat(backoffMillis.getValue()).isGreaterThan(0L); + public void runAbort() { + when(txn.isAborted()).thenReturn(true); + long backoffMillis = 100L; + when(txn.getRetryDelayInMillis(any(BackOff.class))).thenReturn(backoffMillis); + runTransaction(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, "")); + verify(sleeper, times(1)).backoffSleep(any(Context.class), eq(backoffMillis)); } @Test public void commitAbort() { final SpannerException error = - SpannerExceptionFactory.newSpannerException(createRetryException(Status.Code.ABORTED)); - when(rpc.commit(Mockito.any(), Mockito.>any())) - .thenThrow(error) - .thenReturn( - CommitResponse.newBuilder() - .setCommitTimestamp(Timestamp.newBuilder().setSeconds(100)) - .build()); - + SpannerExceptionFactory.newSpannerException( + SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, "")); + doThrow(error).doNothing().when(txn).commit(); + long backoffMillis = 100L; + when(txn.getRetryDelayInMillis(any(BackOff.class))).thenReturn(backoffMillis); + final AtomicInteger numCalls = new AtomicInteger(0); transactionRunner.run( new TransactionCallable() { @Override public Void run(TransactionContext transaction) throws Exception { - if (firstRun) { - ((SpannerImpl.TransactionContextImpl) transaction).onError(error); - firstRun = false; - } + numCalls.incrementAndGet(); return null; } }); - - ArgumentCaptor backoffMillis = ArgumentCaptor.forClass(Long.class); - verify(sleeper, times(1)).backoffSleep(Mockito.any(), backoffMillis.capture()); - assertThat(backoffMillis.getValue()).isEqualTo(1001L); + assertThat(numCalls.get()).isEqualTo(2); + verify(sleeper, times(1)).backoffSleep(any(Context.class), eq(backoffMillis)); + verify(txn, times(2)).ensureTxn(); } - - @Test(expected = SpannerException.class) - public void runResourceExhaustedNoRetry() throws Exception { - runTransaction( - new StatusRuntimeException(Status.fromCodeValue(Status.Code.RESOURCE_EXHAUSTED.value()))); + + @Test + public void commitFailsWithNonAbort() { + final SpannerException error = + SpannerExceptionFactory.newSpannerException( + SpannerExceptionFactory.newSpannerException(ErrorCode.UNKNOWN, "")); + doThrow(error).when(txn).commit(); + final AtomicInteger numCalls = new AtomicInteger(0); + try { + transactionRunner.run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + numCalls.incrementAndGet(); + return null; + } + }); + fail("Expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.UNKNOWN); + } + assertThat(numCalls.get()).isEqualTo(1); + verify(txn, times(1)).ensureTxn(); + verify(txn, times(1)).commit(); } - private StatusRuntimeException createRetryException(Status.Code code) { - Metadata.Key key = ProtoUtils.keyForProto(RetryInfo.getDefaultInstance()); - Status status = Status.fromCodeValue(code.value()); - Metadata trailers = new Metadata(); - RetryInfo retryInfo = - RetryInfo.newBuilder() - .setRetryDelay(Duration.newBuilder().setNanos(1000000).setSeconds(1L)) - .build(); - trailers.put(key, retryInfo); - return new StatusRuntimeException(status, trailers); + @Test + public void runResourceExhaustedNoRetry() throws Exception { + try { + runTransaction( + new StatusRuntimeException(Status.fromCodeValue(Status.Code.RESOURCE_EXHAUSTED.value()))); + fail("Expected exception"); + } catch (SpannerException e) { + // expected. + } + verify(txn).rollback(); } private void runTransaction(final Exception exception) { - when(rpc.commit(Mockito.any(), Mockito.>any())) - .thenReturn( - CommitResponse.newBuilder() - .setCommitTimestamp(Timestamp.newBuilder().setSeconds(100)) - .build()); - final SpannerException error = SpannerExceptionFactory.newSpannerException(exception); - transactionRunner.run( new TransactionCallable() { @Override public Void run(TransactionContext transaction) throws Exception { if (firstRun) { - ((SpannerImpl.TransactionContextImpl) transaction).onError(error); firstRun = false; - throw error; + throw SpannerExceptionFactory.newSpannerException(exception); } return null; } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionManagerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionManagerTest.java new file mode 100644 index 000000000000..b0bc409adf44 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionManagerTest.java @@ -0,0 +1,144 @@ +/* + * Copyright 2017 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.it; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; + +import com.google.cloud.spanner.AbortedException; +import com.google.cloud.spanner.Database; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.IntegrationTest; +import com.google.cloud.spanner.IntegrationTestEnv; +import com.google.cloud.spanner.Key; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.SpannerException; +import com.google.cloud.spanner.Struct; +import com.google.cloud.spanner.TransactionContext; +import com.google.cloud.spanner.TransactionManager; +import com.google.cloud.spanner.TransactionManager.TransactionState; + +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Arrays; + +@Category(IntegrationTest.class) +@RunWith(JUnit4.class) +public class ITTransactionManagerTest { + + @ClassRule public static IntegrationTestEnv env = new IntegrationTestEnv(); + private static Database db; + @Rule public ExpectedException expectedException = ExpectedException.none(); + private static DatabaseClient client; + + @BeforeClass + public static void setUpDatabase() { + // Empty database. + db = env.getTestHelper().createTestDatabase( + "CREATE TABLE T (" + + " K STRING(MAX) NOT NULL," + + " BoolValue BOOL," + + ") PRIMARY KEY (K)"); + client = env.getTestHelper().getDatabaseClient(db); + } + + @Test + public void simpleInsert() { + TransactionManager manager = client.transactionManager(); + TransactionContext txn = manager.begin(); + assertThat(manager.getState()).isEqualTo(TransactionState.STARTED); + txn.buffer(Mutation.newInsertBuilder("T").set("K").to("Key1") + .set("BoolValue").to(true).build()); + manager.commit(); + assertThat(manager.getState()).isEqualTo(TransactionState.COMMITTED); + Struct row = client.singleUse().readRow("T", Key.of("Key1"), Arrays.asList("K", "BoolValue")); + assertThat(row.getString(0)).isEqualTo("Key1"); + assertThat(row.getBoolean(1)).isTrue(); + } + + @Test + public void invalidInsert() { + TransactionManager manager = client.transactionManager(); + TransactionContext txn = manager.begin(); + txn.buffer(Mutation.newInsertBuilder("InvalidTable").set("K").to("Key1") + .set("BoolValue").to(true).build()); + try { + manager.commit(); + fail("Expected exception"); + } catch (SpannerException e) { + // expected + } + assertThat(manager.getState()).isEqualTo(TransactionState.COMMIT_FAILED); + // We cannot retry for non aborted errors. + expectedException.expect(IllegalStateException.class); + manager.resetForRetry(); + } + + @Test + public void rollback() { + TransactionManager manager = client.transactionManager(); + TransactionContext txn = manager.begin(); + txn.buffer(Mutation.newInsertBuilder("T").set("K").to("Key2") + .set("BoolValue").to(true).build()); + manager.rollback(); + assertThat(manager.getState()).isEqualTo(TransactionState.ROLLED_BACK); + // Row should not have been inserted. + assertThat( + client.singleUse().readRow("T", Key.of("Key2"), Arrays.asList("K", "BoolValue"))).isNull(); + } + + @Test + public void abortAndRetry() { + client.write(Arrays.asList(Mutation.newInsertBuilder("T").set("K").to("Key3") + .set("BoolValue").to(true).build())); + TransactionManager manager1 = client.transactionManager(); + TransactionContext txn1 = manager1.begin(); + txn1.readRow("T", Key.of("Key3"), Arrays.asList("K", "BoolValue")); + TransactionManager manager2 = client.transactionManager(); + TransactionContext txn2 = manager2.begin(); + txn2.readRow("T", Key.of("Key3"), Arrays.asList("K", "BoolValue")); + + txn1.buffer(Mutation.newUpdateBuilder("T").set("K").to("Key3") + .set("BoolValue").to(false).build()); + manager1.commit(); + + // txn2 should have been aborted. + try { + manager2.commit(); + fail("Expected to abort"); + } catch (AbortedException e) { + assertThat(manager2.getState()).isEqualTo(TransactionState.ABORTED); + txn2 = manager2.resetForRetry(); + } + txn2.buffer(Mutation.newUpdateBuilder("T").set("K").to("Key3") + .set("BoolValue").to(true).build()); + manager2.commit(); + Struct row = client.singleUse().readRow("T", Key.of("Key3"), Arrays.asList("K", "BoolValue")); + assertThat(row.getString(0)).isEqualTo("Key3"); + assertThat(row.getBoolean(1)).isTrue(); + } + + + +}