From 99686090687d36f09804b13f7b246ae86b52c422 Mon Sep 17 00:00:00 2001 From: Michael Edgar Date: Fri, 11 Jun 2021 14:25:07 -0400 Subject: [PATCH 1/2] ZOOKEEPER-4293: Lock Contention in ClientCnxnSocketNetty Check for failed/cancelled ChannelFutures before acquiring `connectLock` Signed-off-by: Michael Edgar --- .../zookeeper/ClientCnxnSocketNetty.java | 42 ++++++++++++++----- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java index 55c5a9125f6..1c4e243b5fa 100755 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java @@ -73,7 +73,7 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket { private final EventLoopGroup eventLoopGroup; private Channel channel; private CountDownLatch firstConnect; - private ChannelFuture connectFuture; + private volatile ChannelFuture connectFuture; private final Lock connectLock = new ReentrantLock(); private final AtomicBoolean disconnected = new AtomicBoolean(); private final AtomicBoolean needSasl = new AtomicBoolean(); @@ -146,19 +146,27 @@ void connect(InetSocketAddress addr) throws IOException { connectFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { + if (!channelFuture.isSuccess()) { + LOG.info("future isn't success, cause:", channelFuture.cause()); + return; + } + // this lock guarantees that channel won't be assigned after cleanup(). boolean connected = false; - connectLock.lock(); - try { - if (!channelFuture.isSuccess()) { - LOG.info("future isn't success, cause:", channelFuture.cause()); + + while (!connectLock.tryLock(50, TimeUnit.MILLISECONDS)) { + /* + * Lock can not be acquired. Check if cleanup() has cancelled + * the connection and return if so. + */ + if (cancelled(channelFuture)) { + LOG.debug("Connect attempt cancelled by `cleanup` before listener could acquire lock"); return; - } else if (connectFuture == null) { - LOG.info("connect attempt cancelled"); - // If the connect attempt was cancelled but succeeded - // anyway, make sure to close the channel, otherwise - // we may leak a file descriptor. - channelFuture.channel().close(); + } + } + + try { + if (cancelled(channelFuture)) { return; } // setup channel, variables, connection, etc. @@ -200,6 +208,18 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception { } } + boolean cancelled(ChannelFuture channelFuture) { + if (connectFuture == null) { + LOG.info("connect attempt cancelled"); + // If the connect attempt was cancelled but succeeded + // anyway, make sure to close the channel, otherwise + // we may leak a file descriptor. + channelFuture.channel().close(); + return true; + } + return false; + } + @Override void cleanup() { connectLock.lock(); From 99b7b00a8e141e1dc563bdd1f9d3692c8fd2cd15 Mon Sep 17 00:00:00 2001 From: Michael Edgar Date: Wed, 13 Jul 2022 06:40:46 -0400 Subject: [PATCH 2/2] ZOOKEEPER-4293: Add test cases for ClientCnxnSocketNetty lock sync --- .../zookeeper/ClientCnxnSocketNetty.java | 7 +- .../zookeeper/ClientCnxnSocketNettyTest.java | 149 ++++++++++++++++++ 2 files changed, 155 insertions(+), 1 deletion(-) create mode 100644 zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketNettyTest.java diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java index 1c4e243b5fa..5e6089a0b8f 100755 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java @@ -118,7 +118,7 @@ boolean isConnected() { } } - private Bootstrap configureBootstrapAllocator(Bootstrap bootstrap) { + Bootstrap configureBootstrapAllocator(Bootstrap bootstrap) { ByteBufAllocator testAllocator = TEST_ALLOCATOR.get(); if (testAllocator != null) { return bootstrap.option(ChannelOption.ALLOCATOR, testAllocator); @@ -227,6 +227,7 @@ void cleanup() { if (connectFuture != null) { connectFuture.cancel(false); connectFuture = null; + afterConnectFutureCancel(); } if (channel != null) { channel.close().syncUninterruptibly(); @@ -244,6 +245,10 @@ void cleanup() { } } + void afterConnectFutureCancel() { + // NO-OP ; this method exists only to allow test case overrides to exercise thread sync scenarios. + } + @Override void close() { eventLoopGroup.shutdownGracefully(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketNettyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketNettyTest.java new file mode 100644 index 00000000000..cbb934f2269 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketNettyTest.java @@ -0,0 +1,149 @@ +package org.apache.zookeeper; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import org.apache.zookeeper.client.ZKClientConfig; +import org.junit.Before; +import org.junit.Test; + +import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingDeque; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ClientCnxnSocketNettyTest { + + ClientCnxn.SendThread mockSendThread; + ChannelFuture connectFutureMock; + Bootstrap bootstrapMock; + ChannelFutureListener channelFutureListener; + + @Before + public void setUp() throws Exception { + mockSendThread = mock(ClientCnxn.SendThread.class); + when(mockSendThread.tunnelAuthInProgress()).thenReturn(Boolean.FALSE); + + connectFutureMock = mock(ChannelFuture.class); + + bootstrapMock = mock(Bootstrap.class); + when(bootstrapMock.connect(any(InetSocketAddress.class))).thenReturn(connectFutureMock); + + when(connectFutureMock.addListener(any())) + .thenAnswer(args -> { + channelFutureListener = args.getArgument(0); + return connectFutureMock; + }); + } + + @Test + public void testCleanupDoesNotBlockUnsuccessfulConnectionHandling() throws Exception { + CountDownLatch cancelInvoked = new CountDownLatch(1); + CountDownLatch operationCompleteInvoked = new CountDownLatch(1); + + when(connectFutureMock.cancel(false)) + .thenAnswer(args -> { + cancelInvoked.countDown(); + operationCompleteInvoked.await(); + return false; + }); + + ClientCnxnSocketNetty target = new ClientCnxnSocketNetty(new ZKClientConfig()) { + @Override + Bootstrap configureBootstrapAllocator(Bootstrap bootstrap) { + return bootstrapMock; + } + }; + + target.introduce(mockSendThread, 0, new LinkedBlockingDeque<>()); + target.connect(InetSocketAddress.createUnresolved("127.0.0.1", 6000)); + + CompletableFuture pendingCleanup = CompletableFuture + .runAsync(() -> target.cleanup()) + .thenApply(nothing -> System.currentTimeMillis()); + cancelInvoked.await(); + + channelFutureListener.operationComplete(mockChannelFuture(false)); + operationCompleteInvoked.countDown(); + + long endOperationComplete = System.currentTimeMillis(); + long endCleanup = pendingCleanup.get(); + + // `operationComplete` (exits without lock) completes before `cleanup` (holds lock until `operationComplete` exits) + assertTrue(endOperationComplete <= endCleanup); + assertFalse(target.isConnected()); + } + + @Test + public void testCancelledOperationComplete() throws Exception { + testOperationCompleteLockAcquisition(3, 3); + } + + @Test + public void testCancelledOperationCompleteRetriesLockAcquisition() throws Exception { + testOperationCompleteLockAcquisition(3, 4); + } + + void testOperationCompleteLockAcquisition(int cancelledInvocations, int unlockedCancelledInvocations) throws Exception { + CountDownLatch cancelledInvoked = new CountDownLatch(cancelledInvocations); + CountDownLatch cancelledInvokedUnlocked = new CountDownLatch(unlockedCancelledInvocations); + + when(connectFutureMock.cancel(false)) + .thenAnswer(args -> { + cancelledInvoked.await(); // proceed after 3 invocations of `#cancelled` + return false; + }); + + ClientCnxnSocketNetty target = new ClientCnxnSocketNetty(new ZKClientConfig()) { + @Override + Bootstrap configureBootstrapAllocator(Bootstrap bootstrap) { + return bootstrapMock; + } + @Override + boolean cancelled(ChannelFuture channelFuture) { + cancelledInvoked.countDown(); + cancelledInvokedUnlocked.countDown(); + return super.cancelled(channelFuture); + } + @Override + void afterConnectFutureCancel() { + try { + cancelledInvokedUnlocked.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }; + + target.introduce(mockSendThread, 0, new LinkedBlockingDeque<>()); + target.connect(InetSocketAddress.createUnresolved("127.0.0.1", 6000)); + + CompletableFuture pendingCleanup = CompletableFuture + .runAsync(() -> target.cleanup()) + .thenApply(nothing -> System.currentTimeMillis()); + channelFutureListener.operationComplete(mockChannelFuture(true)); + + long endOperationComplete = System.currentTimeMillis(); + long endCleanup = pendingCleanup.get(); + + // `cleanup` completes before `operationComplete` (exits when `#cancelled` is true) + assertTrue(endCleanup <= endOperationComplete); + assertFalse(target.isConnected()); + } + + ChannelFuture mockChannelFuture(boolean successful) { + ChannelFuture channelFuture = mock(ChannelFuture.class); + when(channelFuture.isSuccess()).thenReturn(successful); + if (successful) { + when(channelFuture.channel()).thenReturn(mock(Channel.class)); + } + return channelFuture; + } +}