From 9c6d1a5d78b4e298ae135e98b69188ee54817741 Mon Sep 17 00:00:00 2001 From: Alex Moore Date: Fri, 19 Aug 2016 02:27:25 -0400 Subject: [PATCH 1/4] Fix for netty throwing BlockingOperationExceptions during compound commands --- .../com/basho/riak/client/core/RiakNode.java | 19 +- .../basho/riak/client/core/RiakNodeTest.java | 360 +++++++++++++++--- 2 files changed, 321 insertions(+), 58 deletions(-) diff --git a/src/main/java/com/basho/riak/client/core/RiakNode.java b/src/main/java/com/basho/riak/client/core/RiakNode.java index 9d3288793..d76f520ee 100644 --- a/src/main/java/com/basho/riak/client/core/RiakNode.java +++ b/src/main/java/com/basho/riak/client/core/RiakNode.java @@ -25,6 +25,7 @@ import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.concurrent.BlockingOperationException; import io.netty.util.concurrent.DefaultPromise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -644,6 +645,12 @@ private Channel getConnection() { // no-op, don't care } + catch (BlockingOperationException ex) + { + logger.error("Netty interrupted waiting for connection permit to be available; {}", + remoteAddress); + + } } else { @@ -706,6 +713,12 @@ private Channel doGetConnection(boolean forceAddressRefresh) throws ConnectionFa Thread.currentThread().interrupt(); throw new ConnectionFailedException(ex); } + catch (BlockingOperationException ex) + { + logger.error("Netty interrupted waiting for new connection to be made; {}", + remoteAddress); + throw new ConnectionFailedException(ex); + } if (!f.isSuccess()) { @@ -1183,11 +1196,7 @@ private void checkHealth() closeConnection(c); } } - catch (ConnectionFailedException ex) - { - healthCheckFailed(ex); - } - catch (UnknownHostException ex) + catch (ConnectionFailedException | UnknownHostException ex) { healthCheckFailed(ex); } diff --git a/src/test/java/com/basho/riak/client/core/RiakNodeTest.java b/src/test/java/com/basho/riak/client/core/RiakNodeTest.java index 771daad58..5ada6325c 100644 --- a/src/test/java/com/basho/riak/client/core/RiakNodeTest.java +++ b/src/test/java/com/basho/riak/client/core/RiakNodeTest.java @@ -15,6 +15,8 @@ */ package com.basho.riak.client.core; +import com.basho.riak.client.api.RiakCommand; +import com.basho.riak.client.api.commands.ListenableFuture; import com.basho.riak.client.core.RiakNode.State; import com.google.protobuf.Message; import io.netty.bootstrap.Bootstrap; @@ -22,9 +24,12 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelPipeline; +import io.netty.util.concurrent.BlockingOperationException; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; @@ -35,9 +40,8 @@ import java.util.Deque; import java.util.List; import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; + import static com.jayway.awaitility.Awaitility.await; import static com.jayway.awaitility.Awaitility.fieldIn; import static org.hamcrest.Matchers.equalTo; @@ -83,16 +87,15 @@ public void builderProducesCorrectNode() doReturn(BOOTSTRAP).when(BOOTSTRAP).clone(); - RiakNode node = new RiakNode.Builder() - .withIdleTimeout(IDLE_TIMEOUT) - .withConnectionTimeout(CONNECTION_TIMEOUT) - .withMinConnections(MIN_CONNECTIONS) - .withMaxConnections(MAX_CONNECTIONS) - .withRemotePort(PORT) - .withRemoteAddress(REMOTE_ADDRESS) - .withExecutor(EXECUTOR) - .withBootstrap(BOOTSTRAP) - .build(); + RiakNode node = new RiakNode.Builder().withIdleTimeout(IDLE_TIMEOUT) + .withConnectionTimeout(CONNECTION_TIMEOUT) + .withMinConnections(MIN_CONNECTIONS) + .withMaxConnections(MAX_CONNECTIONS) + .withRemotePort(PORT) + .withRemoteAddress(REMOTE_ADDRESS) + .withExecutor(EXECUTOR) + .withBootstrap(BOOTSTRAP) + .build(); assertEquals(node.getRemoteAddress(), REMOTE_ADDRESS); @@ -145,10 +148,7 @@ public void nodeStartsMinConnections() throws InterruptedException, UnknownHostE doReturn(future).when(bootstrap).connect(); doReturn(bootstrap).when(bootstrap).clone(); - RiakNode node = new RiakNode.Builder() - .withBootstrap(bootstrap) - .withMinConnections(MIN_CONNECTIONS) - .build(); + RiakNode node = new RiakNode.Builder().withBootstrap(bootstrap).withMinConnections(MIN_CONNECTIONS).build(); node.start(); Deque available = Whitebox.getInternalState(node, "available"); assertEquals(MIN_CONNECTIONS, available.size()); @@ -172,10 +172,7 @@ public void NodeRespectsMax() throws Exception doReturn(future).when(bootstrap).connect(); doReturn(bootstrap).when(bootstrap).clone(); - RiakNode node = new RiakNode.Builder() - .withBootstrap(bootstrap) - .withMaxConnections(MAX_CONNECTIONS) - .build(); + RiakNode node = new RiakNode.Builder().withBootstrap(bootstrap).withMaxConnections(MAX_CONNECTIONS).build(); node.start(); for (int i = 0; i < MAX_CONNECTIONS; i++) @@ -195,9 +192,7 @@ public void NodeRespectsMax() throws Exception public void NodeMaxCanBeExplicitlySetToUnlimited() throws Exception { final int UNLIMITED = 0; - new RiakNode.Builder() - .withMaxConnections(UNLIMITED) - .build(); + new RiakNode.Builder().withMaxConnections(UNLIMITED).build(); } @Test @@ -217,10 +212,7 @@ public void channelsReturnedCorrectly() throws Exception doReturn(future).when(bootstrap).connect(); doReturn(bootstrap).when(bootstrap).clone(); - RiakNode node = new RiakNode.Builder() - .withBootstrap(bootstrap) - .withMaxConnections(MAX_CONNECTIONS) - .build(); + RiakNode node = new RiakNode.Builder().withBootstrap(bootstrap).withMaxConnections(MAX_CONNECTIONS).build(); node.start(); assertNotNull(Whitebox.invokeMethod(node, "getConnection")); @@ -247,13 +239,12 @@ public void healthCheckChangesState() throws Exception doReturn(future).when(bootstrap).connect(); doReturn(bootstrap).when(bootstrap).clone(); - RiakNode node = new RiakNode.Builder() - .withBootstrap(bootstrap) - .build(); + RiakNode node = new RiakNode.Builder().withBootstrap(bootstrap).build(); for (int i = 0; i < 5; i++) { - ChannelFutureListener listener = Whitebox.getInternalState(node, "inAvailableCloseListener", RiakNode.class); + ChannelFutureListener listener = Whitebox.getInternalState(node, "inAvailableCloseListener", RiakNode + .class); listener.operationComplete(future); } @@ -281,11 +272,7 @@ public void idleReaperTest() throws Exception doReturn(future).when(bootstrap).connect(); doReturn(bootstrap).when(bootstrap).clone(); - RiakNode node = new RiakNode.Builder() - .withBootstrap(bootstrap) - .withMinConnections(1) - .withIdleTimeout(1) - .build(); + RiakNode node = new RiakNode.Builder().withBootstrap(bootstrap).withMinConnections(1).withIdleTimeout(1).build(); node.start(); Channel[] channelArray = new Channel[6]; @@ -323,11 +310,7 @@ public void closedConnectionsOnReturnTest() throws Exception doReturn(future).when(bootstrap).connect(); doReturn(bootstrap).when(bootstrap).clone(); - RiakNode node = new RiakNode.Builder() - .withBootstrap(bootstrap) - .withMinConnections(1) - .withMaxConnections(6) - .build(); + RiakNode node = new RiakNode.Builder().withBootstrap(bootstrap).withMinConnections(1).withMaxConnections(6).build(); node.start(); Channel[] channelArray = new Channel[6]; @@ -387,11 +370,7 @@ public void deadConnectionsOnGetConnection() throws Exception doReturn(future).when(bootstrap).connect(); doReturn(bootstrap).when(bootstrap).clone(); - RiakNode node = new RiakNode.Builder() - .withBootstrap(bootstrap) - .withMinConnections(1) - .withMaxConnections(1) - .build(); + RiakNode node = new RiakNode.Builder().withBootstrap(bootstrap).withMinConnections(1).withMaxConnections(1).build(); node.start(); @@ -433,11 +412,7 @@ public void nodeRefreshesInetSocketAddressWhenConnectionsDie() throws Exception // Capture arguments passed to InetSocketAddress ctors. ArgumentCaptor addressCaptor = ArgumentCaptor.forClass(InetSocketAddress.class); - RiakNode node = new RiakNode.Builder() - .withBootstrap(bootstrap) - .withMinConnections(1) - .withMaxConnections(1) - .build(); + RiakNode node = new RiakNode.Builder().withBootstrap(bootstrap).withMinConnections(1).withMaxConnections(1).build(); node.start(); @@ -530,13 +505,108 @@ public void nodeFailsOperation() throws InterruptedException, UnknownHostExcepti await().atMost(500, TimeUnit.MILLISECONDS).until(fieldIn(operation).ofType(Throwable.class).andWithName("exception"), equalTo(t)); } - @Test(expected = UnknownHostException.class ) + @Test(expected = UnknownHostException.class) public void failsResolvingHostname() throws UnknownHostException { RiakNode node = new RiakNode.Builder().withRemoteAddress("invalid-host-name").build(); node.start(); } + @Test(expected = ExecutionException.class) + public void BlockingExceptionIsCaught() throws UnknownHostException, InterruptedException, ExecutionException + { + CompoundCommand cc = new CompoundCommand(); + + final Channel channel = mock(Channel.class); + ChannelPipeline channelPipeline = mock(ChannelPipeline.class); + final ChannelFuture future = mock(ChannelFuture.class); + + Bootstrap bootstrap = PowerMockito.spy(new Bootstrap()); + + doReturn(future).when(channel).closeFuture(); + doReturn(true).when(channel).isOpen(); + doReturn(channelPipeline).when(channel).pipeline(); + doReturn(future).when(channel).writeAndFlush(cc.firstCommand.operation); + doReturn(future).when(future).await(); + doReturn(true).when(future).isSuccess(); + doReturn(channel).when(future).channel(); + doReturn(future).when(bootstrap).connect(); + doReturn(bootstrap).when(bootstrap).clone(); + + final RiakNode node = new RiakNode.Builder().build(); + RiakCluster cluster = RiakCluster.builder(node).withExecutionAttempts(1).withBootstrap(bootstrap).build(); + + Runnable secondCmdSetup = new Runnable() + { + @Override + public void run() + { + try + { + Deque available = Whitebox.getInternalState(node, "available"); + + // throw away connection, imitate that they are all used. + available.poll(); + + // throw a BlockingOperationException when a new connection opened. + doThrow(BlockingOperationException.class).when(future).await(); // + } + catch (InterruptedException ignored) + { + + } + } + }; + + // Throw away all connections before second command is run, and throw a BlockingOperationException when a new one is opened. + cc.secondCommand.injectSetup(secondCmdSetup); + + cluster.start(); + + doAnswer(createYesChannelListenerAnswer(node, channel, future)) + .when(future).addListener(any(ChannelFutureListener.class)); + + final RiakFuture ccFuture = cc.executeAsync(cluster); + ccFuture.await(); + try + { + ccFuture.get(); + } + catch (ExecutionException ex) + { + assertEquals(NoNodesAvailableException.class, ex.getCause().getClass()); + throw ex; + } + } + + private Answer createYesChannelListenerAnswer(final RiakNode node, final Channel channel, final ChannelFuture future) + { + return new Answer() + { + final ChannelFutureListener writeListener = Whitebox.getInternalState(node, "writeListener"); + final ChannelFutureListener inProgressCloseListener = + Whitebox.getInternalState(node, "inProgressCloseListener"); + + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable + { + final ChannelFutureListener listener = (ChannelFutureListener) invocationOnMock.getArguments()[0]; + if (listener.equals(inProgressCloseListener)) + { + return null; + } + + listener.operationComplete(future); + + if (listener.equals(writeListener)) + { + node.onSuccess(channel, new RiakMessage((byte) 0, new byte[]{})); + } + return null; + } + }; + } + private class FutureOperationImpl extends FutureOperation { @@ -563,7 +633,191 @@ public Void getQueryInfo() { return null; } + } + + private class NopCommand extends RiakCommand + { + public FutureOperationImpl operation = new FutureOperationImpl(); + private Runnable testSetup; + + @Override + protected RiakFuture executeAsync(RiakCluster cluster) + { + if(testSetup != null) + { + testSetup.run(); + } + + return cluster.execute(operation); + } + public void injectSetup(Runnable testSetup) {this.testSetup = testSetup;} } + /** + * Imitates a compound command like UpdateValue, where we run 1 command, then run a second in the callback of the first. + */ + private class CompoundCommand extends RiakCommand + { + public NopCommand firstCommand = new NopCommand(); + public NopCommand secondCommand = new NopCommand(); + + @Override + protected RiakFuture executeAsync(final RiakCluster cluster) + { + final CompoundFuture cf = new CompoundFuture(); + + final RiakFuture firstFuture = firstCommand.executeAsync(cluster); + + RiakFutureListener listener = + new RiakFutureListener() + { + @Override + public void handle(RiakFuture f) + { + final RiakFuture secondFuture = + secondCommand.executeAsync(cluster); + secondFuture.addListener(cf); + } + }; + + firstFuture.addListener(listener); + + return cf; + } + + private class CompoundFuture extends ListenableFuture + implements RiakFutureListener + { + private String finalValue; + private Throwable exception; + private final CountDownLatch latch = new CountDownLatch(1); + + @Override + public void handle(RiakFuture f) + { + try + { + setResponse(f.get()); + } + catch (InterruptedException | ExecutionException ex) + { + setException(ex); + } + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) + { + return false; + } + + @Override + public String get() throws InterruptedException, ExecutionException + { + latch.await(); + + if (exception != null) + { + if(exception.getClass() == ExecutionException.class) + { + throw (ExecutionException) exception; + } + + throw new ExecutionException(exception); + } + else + { + return finalValue; + } + } + + @Override + public String get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException + { + boolean succeed = latch.await(timeout, unit); + + if (!succeed) + { + throw new TimeoutException(); + } + else if (exception != null) + { + if(exception.getClass() == ExecutionException.class) + { + throw (ExecutionException) exception; + } + + throw new ExecutionException(exception); + } + else + { + return finalValue; + } + } + + @Override + public boolean isCancelled() + { + return false; + } + + @Override + public boolean isDone() + { + return latch.getCount() != 1; + } + + @Override + public void await() throws InterruptedException + { + latch.await(); + } + + @Override + public boolean await(long timeout, TimeUnit unit) throws InterruptedException + { + return latch.await(timeout, unit); + } + + @Override + public String getNow() + { + return finalValue; + } + + @Override + public boolean isSuccess() + { + return isDone() && exception == null; + } + + @Override + public Throwable cause() + { + return this.exception; + } + + @Override + public Void getQueryInfo() + { + return null; + } + + private void setResponse(String response) + { + this.finalValue = response; + latch.countDown(); + notifyListeners(); + } + + private void setException(Throwable t) + { + this.exception = t; + latch.countDown(); + notifyListeners(); + } + } + } } From d8508d1e13b5b56a8b94e17018f421d7b22be127 Mon Sep 17 00:00:00 2001 From: Alex Moore Date: Fri, 19 Aug 2016 02:53:48 -0400 Subject: [PATCH 2/4] Add test for BlockingOperationException + blockOnMaxConnections --- .../basho/riak/client/core/RiakNodeTest.java | 118 ++++++++++++++++++ 1 file changed, 118 insertions(+) diff --git a/src/test/java/com/basho/riak/client/core/RiakNodeTest.java b/src/test/java/com/basho/riak/client/core/RiakNodeTest.java index 5ada6325c..04f017fb8 100644 --- a/src/test/java/com/basho/riak/client/core/RiakNodeTest.java +++ b/src/test/java/com/basho/riak/client/core/RiakNodeTest.java @@ -35,6 +35,7 @@ import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.reflect.Whitebox; +import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.Deque; @@ -579,6 +580,123 @@ public void run() } } + @Test(expected = ExecutionException.class) + public void BlockingExceptionIsCaughtBlockOnMaxConns() + throws UnknownHostException, InterruptedException, ExecutionException, NoSuchFieldException, + IllegalAccessException + { + CompoundCommand cc = new CompoundCommand(); + + final Channel channel = mock(Channel.class); + ChannelPipeline channelPipeline = mock(ChannelPipeline.class); + final ChannelFuture future = mock(ChannelFuture.class); + + Bootstrap bootstrap = PowerMockito.spy(new Bootstrap()); + + doReturn(future).when(channel).closeFuture(); + doReturn(true).when(channel).isOpen(); + doReturn(channelPipeline).when(channel).pipeline(); + doReturn(future).when(channel).writeAndFlush(cc.firstCommand.operation); + doReturn(future).when(future).await(); + doReturn(true).when(future).isSuccess(); + doReturn(channel).when(future).channel(); + doReturn(future).when(bootstrap).connect(); + doReturn(bootstrap).when(bootstrap).clone(); + + final RiakNode node = new RiakNode.Builder().withBlockOnMaxConnections(true).withMaxConnections(1).build(); + + Field field = node.getClass().getDeclaredField("permits"); + field.setAccessible(true); + field.set(node, new BadSync(1)); + + RiakCluster cluster = RiakCluster.builder(node).withExecutionAttempts(1).withBootstrap(bootstrap).build(); + + Runnable secondCmdSetup = new Runnable() + { + @Override + public void run() + { + try + { + RiakNode.Sync permits = Whitebox.getInternalState(node, "permits"); + + // Use up permit, imitate that they are all used. + permits.acquire(); + } + catch (InterruptedException ignored) + { + + } + } + }; + + // Throw away all connections before second command is run, and throw a BlockingOperationException when a new one is opened. + cc.secondCommand.injectSetup(secondCmdSetup); + + cluster.start(); + + final Answer holdOntoConnectionAnswer = new Answer() + { + final ChannelFutureListener writeListener = Whitebox.getInternalState(node, "writeListener"); + final ChannelFutureListener inProgressCloseListener = Whitebox.getInternalState(node, + "inProgressCloseListener"); + + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable + { + final ChannelFutureListener listener = (ChannelFutureListener) invocationOnMock.getArguments()[0]; + if (listener.equals(inProgressCloseListener)) + { + return null; + } + + if (listener.equals(writeListener)) + { + node.onSuccess(channel, new RiakMessage((byte) 0, new byte[]{})); + } + else + { + listener.operationComplete(future); + } + return null; + } + }; + + doAnswer(holdOntoConnectionAnswer).when(future).addListener(any(ChannelFutureListener.class)); + + final RiakFuture ccFuture = cc.executeAsync(cluster); + ccFuture.await(); + try + { + ccFuture.get(); + } + catch (ExecutionException ex) + { + assertEquals(NoNodesAvailableException.class, ex.getCause().getClass()); + throw ex; + } + } + + private class BadSync extends RiakNode.Sync + { + public BadSync(int numPermits) + { + super(numPermits); + } + + @Override + public void acquire() throws InterruptedException + { + if(this.availablePermits() == 0) + { + throw new BlockingOperationException(); + } + super.acquire(); + } + + + } + private Answer createYesChannelListenerAnswer(final RiakNode node, final Channel channel, final ChannelFuture future) { return new Answer() From 37aa5ae62be7117aadf96fafe67e85d2f3b31fd9 Mon Sep 17 00:00:00 2001 From: Alex Moore Date: Fri, 19 Aug 2016 03:08:14 -0400 Subject: [PATCH 3/4] Reduce, Reuse, Recycle your tests --- .../basho/riak/client/core/RiakNodeTest.java | 181 ++++++++---------- 1 file changed, 84 insertions(+), 97 deletions(-) diff --git a/src/test/java/com/basho/riak/client/core/RiakNodeTest.java b/src/test/java/com/basho/riak/client/core/RiakNodeTest.java index 04f017fb8..9ac8030ed 100644 --- a/src/test/java/com/basho/riak/client/core/RiakNodeTest.java +++ b/src/test/java/com/basho/riak/client/core/RiakNodeTest.java @@ -514,28 +514,14 @@ public void failsResolvingHostname() throws UnknownHostException } @Test(expected = ExecutionException.class) - public void BlockingExceptionIsCaught() throws UnknownHostException, InterruptedException, ExecutionException + public void BlockingExceptionIsCaughtWhileWaitingForChannelToOpen() throws UnknownHostException, InterruptedException, ExecutionException { - CompoundCommand cc = new CompoundCommand(); - - final Channel channel = mock(Channel.class); - ChannelPipeline channelPipeline = mock(ChannelPipeline.class); - final ChannelFuture future = mock(ChannelFuture.class); - - Bootstrap bootstrap = PowerMockito.spy(new Bootstrap()); - - doReturn(future).when(channel).closeFuture(); - doReturn(true).when(channel).isOpen(); - doReturn(channelPipeline).when(channel).pipeline(); - doReturn(future).when(channel).writeAndFlush(cc.firstCommand.operation); - doReturn(future).when(future).await(); - doReturn(true).when(future).isSuccess(); - doReturn(channel).when(future).channel(); - doReturn(future).when(bootstrap).connect(); - doReturn(bootstrap).when(bootstrap).clone(); + final BlockingExceptionTestSetup setup = new BlockingExceptionTestSetup(); + final CompoundCommand command = setup.getCommand(); final RiakNode node = new RiakNode.Builder().build(); - RiakCluster cluster = RiakCluster.builder(node).withExecutionAttempts(1).withBootstrap(bootstrap).build(); + RiakCluster cluster = RiakCluster.builder(node).withExecutionAttempts(1) + .withBootstrap(setup.getBootstrap()).build(); Runnable secondCmdSetup = new Runnable() { @@ -550,58 +536,25 @@ public void run() available.poll(); // throw a BlockingOperationException when a new connection opened. - doThrow(BlockingOperationException.class).when(future).await(); // - } - catch (InterruptedException ignored) - { - + doThrow(BlockingOperationException.class).when(setup.getChannelFuture()).await(); // } + catch (InterruptedException ignored) {} } }; // Throw away all connections before second command is run, and throw a BlockingOperationException when a new one is opened. - cc.secondCommand.injectSetup(secondCmdSetup); - - cluster.start(); - - doAnswer(createYesChannelListenerAnswer(node, channel, future)) - .when(future).addListener(any(ChannelFutureListener.class)); + command.secondCommand.injectSetup(secondCmdSetup); - final RiakFuture ccFuture = cc.executeAsync(cluster); - ccFuture.await(); - try - { - ccFuture.get(); - } - catch (ExecutionException ex) - { - assertEquals(NoNodesAvailableException.class, ex.getCause().getClass()); - throw ex; - } + startAndRunBlockingExceptionTest(command, setup.getChannel(), setup.getChannelFuture(), node, cluster); } @Test(expected = ExecutionException.class) - public void BlockingExceptionIsCaughtBlockOnMaxConns() + public void BlockingExceptionIsCaughtWhileBlockedOnMaxConnections() throws UnknownHostException, InterruptedException, ExecutionException, NoSuchFieldException, IllegalAccessException { - CompoundCommand cc = new CompoundCommand(); - - final Channel channel = mock(Channel.class); - ChannelPipeline channelPipeline = mock(ChannelPipeline.class); - final ChannelFuture future = mock(ChannelFuture.class); - - Bootstrap bootstrap = PowerMockito.spy(new Bootstrap()); - - doReturn(future).when(channel).closeFuture(); - doReturn(true).when(channel).isOpen(); - doReturn(channelPipeline).when(channel).pipeline(); - doReturn(future).when(channel).writeAndFlush(cc.firstCommand.operation); - doReturn(future).when(future).await(); - doReturn(true).when(future).isSuccess(); - doReturn(channel).when(future).channel(); - doReturn(future).when(bootstrap).connect(); - doReturn(bootstrap).when(bootstrap).clone(); + BlockingExceptionTestSetup setup = new BlockingExceptionTestSetup(); + final CompoundCommand command = setup.getCommand(); final RiakNode node = new RiakNode.Builder().withBlockOnMaxConnections(true).withMaxConnections(1).build(); @@ -609,7 +562,9 @@ public void BlockingExceptionIsCaughtBlockOnMaxConns() field.setAccessible(true); field.set(node, new BadSync(1)); - RiakCluster cluster = RiakCluster.builder(node).withExecutionAttempts(1).withBootstrap(bootstrap).build(); + RiakCluster cluster = RiakCluster.builder(node) + .withExecutionAttempts(1) + .withBootstrap(setup.getBootstrap()).build(); Runnable secondCmdSetup = new Runnable() { @@ -623,48 +578,28 @@ public void run() // Use up permit, imitate that they are all used. permits.acquire(); } - catch (InterruptedException ignored) - { - - } + catch (InterruptedException ignored) {} } }; // Throw away all connections before second command is run, and throw a BlockingOperationException when a new one is opened. - cc.secondCommand.injectSetup(secondCmdSetup); - - cluster.start(); - - final Answer holdOntoConnectionAnswer = new Answer() - { - final ChannelFutureListener writeListener = Whitebox.getInternalState(node, "writeListener"); - final ChannelFutureListener inProgressCloseListener = Whitebox.getInternalState(node, - "inProgressCloseListener"); + command.secondCommand.injectSetup(secondCmdSetup); - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable - { - final ChannelFutureListener listener = (ChannelFutureListener) invocationOnMock.getArguments()[0]; - if (listener.equals(inProgressCloseListener)) - { - return null; - } + startAndRunBlockingExceptionTest(command, setup.getChannel(), setup.getChannelFuture(), node, cluster); + } - if (listener.equals(writeListener)) - { - node.onSuccess(channel, new RiakMessage((byte) 0, new byte[]{})); - } - else - { - listener.operationComplete(future); - } - return null; - } - }; + private void startAndRunBlockingExceptionTest(CompoundCommand compoundCommand, + Channel channel, + ChannelFuture future, + RiakNode node, + RiakCluster cluster) throws InterruptedException, ExecutionException + { + cluster.start(); - doAnswer(holdOntoConnectionAnswer).when(future).addListener(any(ChannelFutureListener.class)); + doAnswer(createYesChannelListenerAnswer(node,channel,future)) + .when(future).addListener(any(ChannelFutureListener.class)); - final RiakFuture ccFuture = cc.executeAsync(cluster); + final RiakFuture ccFuture = compoundCommand.executeAsync(cluster); ccFuture.await(); try { @@ -677,6 +612,9 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable } } + /** + * Don't wait when we run out of permits, throw an exception + */ private class BadSync extends RiakNode.Sync { public BadSync(int numPermits) @@ -693,10 +631,11 @@ public void acquire() throws InterruptedException } super.acquire(); } - - } + /** + * Say yes/onSuccess to all listeners that are added, except inProgressCloseListener. Completes operations. + */ private Answer createYesChannelListenerAnswer(final RiakNode node, final Channel channel, final ChannelFuture future) { return new Answer() @@ -727,7 +666,6 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable private class FutureOperationImpl extends FutureOperation { - @Override protected String convert(List rawResponse) { @@ -938,4 +876,53 @@ private void setException(Throwable t) } } } + + private class BlockingExceptionTestSetup + { + private CompoundCommand compoundCommand; + private Channel channel; + private ChannelFuture channelFuture; + private Bootstrap bootstrap; + + public CompoundCommand getCommand() + { + return compoundCommand; + } + + public Channel getChannel() + { + return channel; + } + + public ChannelFuture getChannelFuture() + { + return channelFuture; + } + + public Bootstrap getBootstrap() + { + return bootstrap; + } + + public BlockingExceptionTestSetup() throws InterruptedException + { + compoundCommand = new CompoundCommand(); + + channel = mock(Channel.class); + ChannelPipeline channelPipeline = mock(ChannelPipeline.class); + channelFuture = mock(ChannelFuture.class); + + bootstrap = PowerMockito.spy(new Bootstrap()); + + doReturn(channelFuture).when(channel).closeFuture(); + doReturn(true).when(channel).isOpen(); + doReturn(channelPipeline).when(channel).pipeline(); + doReturn(channelFuture).when(channel).writeAndFlush(compoundCommand.firstCommand.operation); + doReturn(channelFuture).when(channelFuture).await(); + doReturn(true).when(channelFuture).isSuccess(); + doReturn(channel).when(channelFuture).channel(); + doReturn(channelFuture).when(bootstrap).connect(); + doReturn(bootstrap).when(bootstrap).clone(); + } + } } From f470ba2f8ec15e3170ee523983162cdcc75fa2b1 Mon Sep 17 00:00:00 2001 From: Alex Moore Date: Fri, 19 Aug 2016 15:35:10 -0400 Subject: [PATCH 4/4] Undo some automatic code reformatting --- .../basho/riak/client/core/RiakNodeTest.java | 69 +++++++++++++------ 1 file changed, 49 insertions(+), 20 deletions(-) diff --git a/src/test/java/com/basho/riak/client/core/RiakNodeTest.java b/src/test/java/com/basho/riak/client/core/RiakNodeTest.java index 9ac8030ed..922898b6d 100644 --- a/src/test/java/com/basho/riak/client/core/RiakNodeTest.java +++ b/src/test/java/com/basho/riak/client/core/RiakNodeTest.java @@ -88,15 +88,16 @@ public void builderProducesCorrectNode() doReturn(BOOTSTRAP).when(BOOTSTRAP).clone(); - RiakNode node = new RiakNode.Builder().withIdleTimeout(IDLE_TIMEOUT) - .withConnectionTimeout(CONNECTION_TIMEOUT) - .withMinConnections(MIN_CONNECTIONS) - .withMaxConnections(MAX_CONNECTIONS) - .withRemotePort(PORT) - .withRemoteAddress(REMOTE_ADDRESS) - .withExecutor(EXECUTOR) - .withBootstrap(BOOTSTRAP) - .build(); + RiakNode node = new RiakNode.Builder() + .withIdleTimeout(IDLE_TIMEOUT) + .withConnectionTimeout(CONNECTION_TIMEOUT) + .withMinConnections(MIN_CONNECTIONS) + .withMaxConnections(MAX_CONNECTIONS) + .withRemotePort(PORT) + .withRemoteAddress(REMOTE_ADDRESS) + .withExecutor(EXECUTOR) + .withBootstrap(BOOTSTRAP) + .build(); assertEquals(node.getRemoteAddress(), REMOTE_ADDRESS); @@ -149,7 +150,10 @@ public void nodeStartsMinConnections() throws InterruptedException, UnknownHostE doReturn(future).when(bootstrap).connect(); doReturn(bootstrap).when(bootstrap).clone(); - RiakNode node = new RiakNode.Builder().withBootstrap(bootstrap).withMinConnections(MIN_CONNECTIONS).build(); + RiakNode node = new RiakNode.Builder() + .withBootstrap(bootstrap) + .withMinConnections(MIN_CONNECTIONS) + .build(); node.start(); Deque available = Whitebox.getInternalState(node, "available"); assertEquals(MIN_CONNECTIONS, available.size()); @@ -173,7 +177,10 @@ public void NodeRespectsMax() throws Exception doReturn(future).when(bootstrap).connect(); doReturn(bootstrap).when(bootstrap).clone(); - RiakNode node = new RiakNode.Builder().withBootstrap(bootstrap).withMaxConnections(MAX_CONNECTIONS).build(); + RiakNode node = new RiakNode.Builder() + .withBootstrap(bootstrap) + .withMaxConnections(MAX_CONNECTIONS) + .build(); node.start(); for (int i = 0; i < MAX_CONNECTIONS; i++) @@ -193,7 +200,9 @@ public void NodeRespectsMax() throws Exception public void NodeMaxCanBeExplicitlySetToUnlimited() throws Exception { final int UNLIMITED = 0; - new RiakNode.Builder().withMaxConnections(UNLIMITED).build(); + new RiakNode.Builder() + .withMaxConnections(UNLIMITED) + .build(); } @Test @@ -213,7 +222,10 @@ public void channelsReturnedCorrectly() throws Exception doReturn(future).when(bootstrap).connect(); doReturn(bootstrap).when(bootstrap).clone(); - RiakNode node = new RiakNode.Builder().withBootstrap(bootstrap).withMaxConnections(MAX_CONNECTIONS).build(); + RiakNode node = new RiakNode.Builder() + .withBootstrap(bootstrap) + .withMaxConnections(MAX_CONNECTIONS) + .build(); node.start(); assertNotNull(Whitebox.invokeMethod(node, "getConnection")); @@ -240,12 +252,13 @@ public void healthCheckChangesState() throws Exception doReturn(future).when(bootstrap).connect(); doReturn(bootstrap).when(bootstrap).clone(); - RiakNode node = new RiakNode.Builder().withBootstrap(bootstrap).build(); + RiakNode node = new RiakNode.Builder() + .withBootstrap(bootstrap) + .build(); for (int i = 0; i < 5; i++) { - ChannelFutureListener listener = Whitebox.getInternalState(node, "inAvailableCloseListener", RiakNode - .class); + ChannelFutureListener listener = Whitebox.getInternalState(node, "inAvailableCloseListener", RiakNode.class); listener.operationComplete(future); } @@ -273,7 +286,11 @@ public void idleReaperTest() throws Exception doReturn(future).when(bootstrap).connect(); doReturn(bootstrap).when(bootstrap).clone(); - RiakNode node = new RiakNode.Builder().withBootstrap(bootstrap).withMinConnections(1).withIdleTimeout(1).build(); + RiakNode node = new RiakNode.Builder() + .withBootstrap(bootstrap) + .withMinConnections(1) + .withIdleTimeout(1) + .build(); node.start(); Channel[] channelArray = new Channel[6]; @@ -311,7 +328,11 @@ public void closedConnectionsOnReturnTest() throws Exception doReturn(future).when(bootstrap).connect(); doReturn(bootstrap).when(bootstrap).clone(); - RiakNode node = new RiakNode.Builder().withBootstrap(bootstrap).withMinConnections(1).withMaxConnections(6).build(); + RiakNode node = new RiakNode.Builder() + .withBootstrap(bootstrap) + .withMinConnections(1) + .withMaxConnections(6) + .build(); node.start(); Channel[] channelArray = new Channel[6]; @@ -371,7 +392,11 @@ public void deadConnectionsOnGetConnection() throws Exception doReturn(future).when(bootstrap).connect(); doReturn(bootstrap).when(bootstrap).clone(); - RiakNode node = new RiakNode.Builder().withBootstrap(bootstrap).withMinConnections(1).withMaxConnections(1).build(); + RiakNode node = new RiakNode.Builder() + .withBootstrap(bootstrap) + .withMinConnections(1) + .withMaxConnections(1) + .build(); node.start(); @@ -413,7 +438,11 @@ public void nodeRefreshesInetSocketAddressWhenConnectionsDie() throws Exception // Capture arguments passed to InetSocketAddress ctors. ArgumentCaptor addressCaptor = ArgumentCaptor.forClass(InetSocketAddress.class); - RiakNode node = new RiakNode.Builder().withBootstrap(bootstrap).withMinConnections(1).withMaxConnections(1).build(); + RiakNode node = new RiakNode.Builder() + .withBootstrap(bootstrap) + .withMinConnections(1) + .withMaxConnections(1) + .build(); node.start();