diff --git a/src/main/java/com/basho/riak/client/core/RiakNode.java b/src/main/java/com/basho/riak/client/core/RiakNode.java index 9d3288793..d76f520ee 100644 --- a/src/main/java/com/basho/riak/client/core/RiakNode.java +++ b/src/main/java/com/basho/riak/client/core/RiakNode.java @@ -25,6 +25,7 @@ import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.concurrent.BlockingOperationException; import io.netty.util.concurrent.DefaultPromise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -644,6 +645,12 @@ private Channel getConnection() { // no-op, don't care } + catch (BlockingOperationException ex) + { + logger.error("Netty interrupted waiting for connection permit to be available; {}", + remoteAddress); + + } } else { @@ -706,6 +713,12 @@ private Channel doGetConnection(boolean forceAddressRefresh) throws ConnectionFa Thread.currentThread().interrupt(); throw new ConnectionFailedException(ex); } + catch (BlockingOperationException ex) + { + logger.error("Netty interrupted waiting for new connection to be made; {}", + remoteAddress); + throw new ConnectionFailedException(ex); + } if (!f.isSuccess()) { @@ -1183,11 +1196,7 @@ private void checkHealth() closeConnection(c); } } - catch (ConnectionFailedException ex) - { - healthCheckFailed(ex); - } - catch (UnknownHostException ex) + catch (ConnectionFailedException | UnknownHostException ex) { healthCheckFailed(ex); } diff --git a/src/test/java/com/basho/riak/client/core/RiakNodeTest.java b/src/test/java/com/basho/riak/client/core/RiakNodeTest.java index 771daad58..922898b6d 100644 --- a/src/test/java/com/basho/riak/client/core/RiakNodeTest.java +++ b/src/test/java/com/basho/riak/client/core/RiakNodeTest.java @@ -15,6 +15,8 @@ */ package com.basho.riak.client.core; +import com.basho.riak.client.api.RiakCommand; +import com.basho.riak.client.api.commands.ListenableFuture; import com.basho.riak.client.core.RiakNode.State; import com.google.protobuf.Message; import io.netty.bootstrap.Bootstrap; @@ -22,22 +24,25 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelPipeline; +import io.netty.util.concurrent.BlockingOperationException; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.reflect.Whitebox; +import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.Deque; import java.util.List; import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; + import static com.jayway.awaitility.Awaitility.await; import static com.jayway.awaitility.Awaitility.fieldIn; import static org.hamcrest.Matchers.equalTo; @@ -530,16 +535,166 @@ public void nodeFailsOperation() throws InterruptedException, UnknownHostExcepti await().atMost(500, TimeUnit.MILLISECONDS).until(fieldIn(operation).ofType(Throwable.class).andWithName("exception"), equalTo(t)); } - @Test(expected = UnknownHostException.class ) + @Test(expected = UnknownHostException.class) public void failsResolvingHostname() throws UnknownHostException { RiakNode node = new RiakNode.Builder().withRemoteAddress("invalid-host-name").build(); node.start(); } - private class FutureOperationImpl extends FutureOperation + @Test(expected = ExecutionException.class) + public void BlockingExceptionIsCaughtWhileWaitingForChannelToOpen() throws UnknownHostException, InterruptedException, ExecutionException + { + final BlockingExceptionTestSetup setup = new BlockingExceptionTestSetup(); + final CompoundCommand command = setup.getCommand(); + + final RiakNode node = new RiakNode.Builder().build(); + RiakCluster cluster = RiakCluster.builder(node).withExecutionAttempts(1) + .withBootstrap(setup.getBootstrap()).build(); + + Runnable secondCmdSetup = new Runnable() + { + @Override + public void run() + { + try + { + Deque available = Whitebox.getInternalState(node, "available"); + + // throw away connection, imitate that they are all used. + available.poll(); + + // throw a BlockingOperationException when a new connection opened. + doThrow(BlockingOperationException.class).when(setup.getChannelFuture()).await(); // + } + catch (InterruptedException ignored) {} + } + }; + + // Throw away all connections before second command is run, and throw a BlockingOperationException when a new one is opened. + command.secondCommand.injectSetup(secondCmdSetup); + + startAndRunBlockingExceptionTest(command, setup.getChannel(), setup.getChannelFuture(), node, cluster); + } + + @Test(expected = ExecutionException.class) + public void BlockingExceptionIsCaughtWhileBlockedOnMaxConnections() + throws UnknownHostException, InterruptedException, ExecutionException, NoSuchFieldException, + IllegalAccessException + { + BlockingExceptionTestSetup setup = new BlockingExceptionTestSetup(); + final CompoundCommand command = setup.getCommand(); + + final RiakNode node = new RiakNode.Builder().withBlockOnMaxConnections(true).withMaxConnections(1).build(); + + Field field = node.getClass().getDeclaredField("permits"); + field.setAccessible(true); + field.set(node, new BadSync(1)); + + RiakCluster cluster = RiakCluster.builder(node) + .withExecutionAttempts(1) + .withBootstrap(setup.getBootstrap()).build(); + + Runnable secondCmdSetup = new Runnable() + { + @Override + public void run() + { + try + { + RiakNode.Sync permits = Whitebox.getInternalState(node, "permits"); + + // Use up permit, imitate that they are all used. + permits.acquire(); + } + catch (InterruptedException ignored) {} + } + }; + + // Throw away all connections before second command is run, and throw a BlockingOperationException when a new one is opened. + command.secondCommand.injectSetup(secondCmdSetup); + + startAndRunBlockingExceptionTest(command, setup.getChannel(), setup.getChannelFuture(), node, cluster); + } + + private void startAndRunBlockingExceptionTest(CompoundCommand compoundCommand, + Channel channel, + ChannelFuture future, + RiakNode node, + RiakCluster cluster) throws InterruptedException, ExecutionException { + cluster.start(); + + doAnswer(createYesChannelListenerAnswer(node,channel,future)) + .when(future).addListener(any(ChannelFutureListener.class)); + + final RiakFuture ccFuture = compoundCommand.executeAsync(cluster); + ccFuture.await(); + try + { + ccFuture.get(); + } + catch (ExecutionException ex) + { + assertEquals(NoNodesAvailableException.class, ex.getCause().getClass()); + throw ex; + } + } + + /** + * Don't wait when we run out of permits, throw an exception + */ + private class BadSync extends RiakNode.Sync + { + public BadSync(int numPermits) + { + super(numPermits); + } + @Override + public void acquire() throws InterruptedException + { + if(this.availablePermits() == 0) + { + throw new BlockingOperationException(); + } + super.acquire(); + } + } + + /** + * Say yes/onSuccess to all listeners that are added, except inProgressCloseListener. Completes operations. + */ + private Answer createYesChannelListenerAnswer(final RiakNode node, final Channel channel, final ChannelFuture future) + { + return new Answer() + { + final ChannelFutureListener writeListener = Whitebox.getInternalState(node, "writeListener"); + final ChannelFutureListener inProgressCloseListener = + Whitebox.getInternalState(node, "inProgressCloseListener"); + + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable + { + final ChannelFutureListener listener = (ChannelFutureListener) invocationOnMock.getArguments()[0]; + if (listener.equals(inProgressCloseListener)) + { + return null; + } + + listener.operationComplete(future); + + if (listener.equals(writeListener)) + { + node.onSuccess(channel, new RiakMessage((byte) 0, new byte[]{})); + } + return null; + } + }; + } + + private class FutureOperationImpl extends FutureOperation + { @Override protected String convert(List rawResponse) { @@ -563,7 +718,240 @@ public Void getQueryInfo() { return null; } + } + private class NopCommand extends RiakCommand + { + public FutureOperationImpl operation = new FutureOperationImpl(); + private Runnable testSetup; + + @Override + protected RiakFuture executeAsync(RiakCluster cluster) + { + if(testSetup != null) + { + testSetup.run(); + } + + return cluster.execute(operation); + } + + public void injectSetup(Runnable testSetup) {this.testSetup = testSetup;} + } + + /** + * Imitates a compound command like UpdateValue, where we run 1 command, then run a second in the callback of the first. + */ + private class CompoundCommand extends RiakCommand + { + public NopCommand firstCommand = new NopCommand(); + public NopCommand secondCommand = new NopCommand(); + + @Override + protected RiakFuture executeAsync(final RiakCluster cluster) + { + final CompoundFuture cf = new CompoundFuture(); + + final RiakFuture firstFuture = firstCommand.executeAsync(cluster); + + RiakFutureListener listener = + new RiakFutureListener() + { + @Override + public void handle(RiakFuture f) + { + final RiakFuture secondFuture = + secondCommand.executeAsync(cluster); + secondFuture.addListener(cf); + } + }; + + firstFuture.addListener(listener); + + return cf; + } + + private class CompoundFuture extends ListenableFuture + implements RiakFutureListener + { + private String finalValue; + private Throwable exception; + private final CountDownLatch latch = new CountDownLatch(1); + + @Override + public void handle(RiakFuture f) + { + try + { + setResponse(f.get()); + } + catch (InterruptedException | ExecutionException ex) + { + setException(ex); + } + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) + { + return false; + } + + @Override + public String get() throws InterruptedException, ExecutionException + { + latch.await(); + + if (exception != null) + { + if(exception.getClass() == ExecutionException.class) + { + throw (ExecutionException) exception; + } + + throw new ExecutionException(exception); + } + else + { + return finalValue; + } + } + + @Override + public String get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException + { + boolean succeed = latch.await(timeout, unit); + + if (!succeed) + { + throw new TimeoutException(); + } + else if (exception != null) + { + if(exception.getClass() == ExecutionException.class) + { + throw (ExecutionException) exception; + } + + throw new ExecutionException(exception); + } + else + { + return finalValue; + } + } + + @Override + public boolean isCancelled() + { + return false; + } + + @Override + public boolean isDone() + { + return latch.getCount() != 1; + } + + @Override + public void await() throws InterruptedException + { + latch.await(); + } + + @Override + public boolean await(long timeout, TimeUnit unit) throws InterruptedException + { + return latch.await(timeout, unit); + } + + @Override + public String getNow() + { + return finalValue; + } + + @Override + public boolean isSuccess() + { + return isDone() && exception == null; + } + + @Override + public Throwable cause() + { + return this.exception; + } + + @Override + public Void getQueryInfo() + { + return null; + } + + private void setResponse(String response) + { + this.finalValue = response; + latch.countDown(); + notifyListeners(); + } + + private void setException(Throwable t) + { + this.exception = t; + latch.countDown(); + notifyListeners(); + } + } } + private class BlockingExceptionTestSetup + { + private CompoundCommand compoundCommand; + private Channel channel; + private ChannelFuture channelFuture; + private Bootstrap bootstrap; + + public CompoundCommand getCommand() + { + return compoundCommand; + } + + public Channel getChannel() + { + return channel; + } + + public ChannelFuture getChannelFuture() + { + return channelFuture; + } + + public Bootstrap getBootstrap() + { + return bootstrap; + } + + public BlockingExceptionTestSetup() throws InterruptedException + { + compoundCommand = new CompoundCommand(); + + channel = mock(Channel.class); + ChannelPipeline channelPipeline = mock(ChannelPipeline.class); + channelFuture = mock(ChannelFuture.class); + + bootstrap = PowerMockito.spy(new Bootstrap()); + + doReturn(channelFuture).when(channel).closeFuture(); + doReturn(true).when(channel).isOpen(); + doReturn(channelPipeline).when(channel).pipeline(); + doReturn(channelFuture).when(channel).writeAndFlush(compoundCommand.firstCommand.operation); + doReturn(channelFuture).when(channelFuture).await(); + doReturn(true).when(channelFuture).isSuccess(); + doReturn(channel).when(channelFuture).channel(); + doReturn(channelFuture).when(bootstrap).connect(); + doReturn(bootstrap).when(bootstrap).clone(); + } + } }