diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index 4744edd035d6..c4d910cba6ca 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -32,6 +32,7 @@ import com.google.cloud.GrpcTransportOptions.ExecutorFactory; import com.google.cloud.spanner.SessionPool.Clock; import com.google.cloud.spanner.SessionPool.PooledSession; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import java.util.ArrayList; import java.util.Arrays; @@ -114,6 +115,123 @@ public void poolClosure() throws Exception { pool.closeAsync().get(); } + @Test + public void poolClosureFailsPendingReadWaiters() throws Exception { + final CountDownLatch insideCreation = new CountDownLatch(1); + final CountDownLatch releaseCreation = new CountDownLatch(1); + when(client.createSession(db)) + .thenReturn(mock(Session.class)) + .thenAnswer( + new Answer() { + @Override + public Session answer(InvocationOnMock invocation) throws Throwable { + insideCreation.countDown(); + releaseCreation.await(); + return mock(Session.class); + } + }); + pool = createPool(); + pool.getReadSession(); + AtomicBoolean failed = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + getSessionAsync(latch, failed); + insideCreation.await(); + pool.closeAsync(); + releaseCreation.countDown(); + latch.await(); + assertThat(failed.get()).isTrue(); + } + + @Test + public void poolClosureFailsPendingWriteWaiters() throws Exception { + final CountDownLatch insideCreation = new CountDownLatch(1); + final CountDownLatch releaseCreation = new CountDownLatch(1); + when(client.createSession(db)) + .thenReturn(mock(Session.class)) + .thenAnswer( + new Answer() { + @Override + public Session answer(InvocationOnMock invocation) throws Throwable { + insideCreation.countDown(); + releaseCreation.await(); + return mock(Session.class); + } + }); + pool = createPool(); + pool.getReadSession(); + AtomicBoolean failed = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + getReadWriteSessionAsync(latch, failed); + insideCreation.await(); + pool.closeAsync(); + releaseCreation.countDown(); + latch.await(); + assertThat(failed.get()).isTrue(); + } + + @Test + public void poolClosesEvenIfCreationFails() throws Exception { + final CountDownLatch insideCreation = new CountDownLatch(1); + final CountDownLatch releaseCreation = new CountDownLatch(1); + when(client.createSession(db)) + .thenAnswer( + new Answer() { + @Override + public Session answer(InvocationOnMock invocation) throws Throwable { + insideCreation.countDown(); + releaseCreation.await(); + throw SpannerExceptionFactory.newSpannerException(new RuntimeException()); + } + }); + pool = createPool(); + AtomicBoolean failed = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + getSessionAsync(latch, failed); + insideCreation.await(); + ListenableFuture f = pool.closeAsync(); + releaseCreation.countDown(); + f.get(); + assertThat(f.isDone()).isTrue(); + } + + @Test + public void poolClosesEvenIfPreparationFails() throws Exception { + Session session = mock(Session.class); + when(client.createSession(db)).thenReturn(session); + final CountDownLatch insidePrepare = new CountDownLatch(1); + final CountDownLatch releasePrepare = new CountDownLatch(1); + doAnswer( + new Answer() { + @Override + public Session answer(InvocationOnMock invocation) throws Throwable { + insidePrepare.countDown(); + releasePrepare.await(); + throw SpannerExceptionFactory.newSpannerException(new RuntimeException()); + } + }) + .when(session) + .prepareReadWriteTransaction(); + pool = createPool(); + AtomicBoolean failed = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + getReadWriteSessionAsync(latch, failed); + insidePrepare.await(); + ListenableFuture f = pool.closeAsync(); + releasePrepare.countDown(); + f.get(); + assertThat(f.isDone()).isTrue(); + } + + @Test + public void poolClosureFailsNewRequests() throws Exception { + when(client.createSession(db)).thenReturn(mock(Session.class)); + pool = createPool(); + pool.getReadSession(); + pool.closeAsync(); + expectedException.expect(IllegalStateException.class); + pool.getReadSession(); + } + @Test public void atMostMaxSessionsCreated() { AtomicBoolean failed = new AtomicBoolean(false); @@ -243,6 +361,35 @@ public Void answer(InvocationOnMock arg0) throws Throwable { writeSession.close(); } + @Test + public void getReadSessionFallsBackToWritePreparedSession() throws Exception { + Session mockSession1 = mock(Session.class); + final CountDownLatch prepareLatch = new CountDownLatch(2); + doAnswer( + new Answer() { + @Override + public Void answer(InvocationOnMock arg0) throws Throwable { + prepareLatch.countDown(); + return null; + } + }) + .when(mockSession1) + .prepareReadWriteTransaction(); + when(client.createSession(db)).thenReturn(mockSession1); + options = + SessionPoolOptions.newBuilder() + .setMinSessions(minSessions) + .setMaxSessions(1) + .setWriteSessionsFraction(1.0f) + .build(); + pool = createPool(); + pool.getReadWriteSession().close(); + prepareLatch.await(); + // This session should also be write prepared. + PooledSession readSession = (PooledSession) pool.getReadSession(); + verify(readSession.delegate, times(2)).prepareReadWriteTransaction(); + } + @Test public void failOnPoolExhaustion() { options = @@ -262,6 +409,18 @@ public void failOnPoolExhaustion() { session1.close(); } + @Test + public void poolWorksWhenSessionNotFound() { + Session mockSession1 = mock(Session.class); + Session mockSession2 = mock(Session.class); + doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.NOT_FOUND, "Session not found")) + .when(mockSession1) + .prepareReadWriteTransaction(); + when(client.createSession(db)).thenReturn(mockSession1).thenReturn(mockSession2); + pool = createPool(); + assertThat(((PooledSession) pool.getReadWriteSession()).delegate).isEqualTo(mockSession2); + } + @Test public void idleSessionCleanup() throws Exception { options = @@ -366,6 +525,9 @@ public void run() { try (Session session = pool.getReadSession()) { failed.compareAndSet(false, session == null); Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS); + } catch (SpannerException e) { + failed.compareAndSet(false, true); + } finally { latch.countDown(); } } @@ -381,6 +543,9 @@ public void run() { try (Session session = pool.getReadWriteSession()) { failed.compareAndSet(false, session == null); Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS); + } catch (SpannerException e) { + failed.compareAndSet(false, true); + } finally { latch.countDown(); } }