Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
private final EventLoopGroup eventLoopGroup;
private Channel channel;
private CountDownLatch firstConnect;
private ChannelFuture connectFuture;
private volatile ChannelFuture connectFuture;
private final Lock connectLock = new ReentrantLock();
private final AtomicBoolean disconnected = new AtomicBoolean();
private final AtomicBoolean needSasl = new AtomicBoolean();
Expand Down Expand Up @@ -118,7 +118,7 @@ boolean isConnected() {
}
}

private Bootstrap configureBootstrapAllocator(Bootstrap bootstrap) {
Bootstrap configureBootstrapAllocator(Bootstrap bootstrap) {
ByteBufAllocator testAllocator = TEST_ALLOCATOR.get();
if (testAllocator != null) {
return bootstrap.option(ChannelOption.ALLOCATOR, testAllocator);
Expand Down Expand Up @@ -146,19 +146,27 @@ void connect(InetSocketAddress addr) throws IOException {
connectFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (!channelFuture.isSuccess()) {
LOG.info("future isn't success, cause:", channelFuture.cause());
return;
}

// this lock guarantees that channel won't be assigned after cleanup().
boolean connected = false;
connectLock.lock();
try {
if (!channelFuture.isSuccess()) {
LOG.info("future isn't success, cause:", channelFuture.cause());

while (!connectLock.tryLock(50, TimeUnit.MILLISECONDS)) {
/*
* Lock can not be acquired. Check if cleanup() has cancelled
* the connection and return if so.
*/
if (cancelled(channelFuture)) {
LOG.debug("Connect attempt cancelled by `cleanup` before listener could acquire lock");
return;
} else if (connectFuture == null) {
LOG.info("connect attempt cancelled");
// If the connect attempt was cancelled but succeeded
// anyway, make sure to close the channel, otherwise
// we may leak a file descriptor.
channelFuture.channel().close();
}
}

try {
if (cancelled(channelFuture)) {
return;
}
// setup channel, variables, connection, etc.
Expand Down Expand Up @@ -200,13 +208,26 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception {
}
}

boolean cancelled(ChannelFuture channelFuture) {
if (connectFuture == null) {
LOG.info("connect attempt cancelled");
// If the connect attempt was cancelled but succeeded
// anyway, make sure to close the channel, otherwise
// we may leak a file descriptor.
channelFuture.channel().close();
return true;
}
return false;
}

@Override
void cleanup() {
connectLock.lock();
try {
if (connectFuture != null) {
connectFuture.cancel(false);
connectFuture = null;
afterConnectFutureCancel();
}
if (channel != null) {
channel.close().syncUninterruptibly();
Expand All @@ -224,6 +245,10 @@ void cleanup() {
}
}

void afterConnectFutureCancel() {
// NO-OP ; this method exists only to allow test case overrides to exercise thread sync scenarios.
}

@Override
void close() {
eventLoopGroup.shutdownGracefully();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package org.apache.zookeeper;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.zookeeper.client.ZKClientConfig;
import org.junit.Before;
import org.junit.Test;

import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class ClientCnxnSocketNettyTest {

ClientCnxn.SendThread mockSendThread;
ChannelFuture connectFutureMock;
Bootstrap bootstrapMock;
ChannelFutureListener channelFutureListener;

@Before
public void setUp() throws Exception {
mockSendThread = mock(ClientCnxn.SendThread.class);
when(mockSendThread.tunnelAuthInProgress()).thenReturn(Boolean.FALSE);

connectFutureMock = mock(ChannelFuture.class);

bootstrapMock = mock(Bootstrap.class);
when(bootstrapMock.connect(any(InetSocketAddress.class))).thenReturn(connectFutureMock);

when(connectFutureMock.addListener(any()))
.thenAnswer(args -> {
channelFutureListener = args.getArgument(0);
return connectFutureMock;
});
}

@Test
public void testCleanupDoesNotBlockUnsuccessfulConnectionHandling() throws Exception {
CountDownLatch cancelInvoked = new CountDownLatch(1);
CountDownLatch operationCompleteInvoked = new CountDownLatch(1);

when(connectFutureMock.cancel(false))
.thenAnswer(args -> {
cancelInvoked.countDown();
operationCompleteInvoked.await();
return false;
});

ClientCnxnSocketNetty target = new ClientCnxnSocketNetty(new ZKClientConfig()) {
@Override
Bootstrap configureBootstrapAllocator(Bootstrap bootstrap) {
return bootstrapMock;
}
};

target.introduce(mockSendThread, 0, new LinkedBlockingDeque<>());
target.connect(InetSocketAddress.createUnresolved("127.0.0.1", 6000));

CompletableFuture<Long> pendingCleanup = CompletableFuture
.runAsync(() -> target.cleanup())
.thenApply(nothing -> System.currentTimeMillis());
cancelInvoked.await();

channelFutureListener.operationComplete(mockChannelFuture(false));
operationCompleteInvoked.countDown();

long endOperationComplete = System.currentTimeMillis();
long endCleanup = pendingCleanup.get();

// `operationComplete` (exits without lock) completes before `cleanup` (holds lock until `operationComplete` exits)
assertTrue(endOperationComplete <= endCleanup);
assertFalse(target.isConnected());
}

@Test
public void testCancelledOperationComplete() throws Exception {
testOperationCompleteLockAcquisition(3, 3);
}

@Test
public void testCancelledOperationCompleteRetriesLockAcquisition() throws Exception {
testOperationCompleteLockAcquisition(3, 4);
}

void testOperationCompleteLockAcquisition(int cancelledInvocations, int unlockedCancelledInvocations) throws Exception {
CountDownLatch cancelledInvoked = new CountDownLatch(cancelledInvocations);
CountDownLatch cancelledInvokedUnlocked = new CountDownLatch(unlockedCancelledInvocations);

when(connectFutureMock.cancel(false))
.thenAnswer(args -> {
cancelledInvoked.await(); // proceed after 3 invocations of `#cancelled`
return false;
});

ClientCnxnSocketNetty target = new ClientCnxnSocketNetty(new ZKClientConfig()) {
@Override
Bootstrap configureBootstrapAllocator(Bootstrap bootstrap) {
return bootstrapMock;
}
@Override
boolean cancelled(ChannelFuture channelFuture) {
cancelledInvoked.countDown();
cancelledInvokedUnlocked.countDown();
return super.cancelled(channelFuture);
}
@Override
void afterConnectFutureCancel() {
try {
cancelledInvokedUnlocked.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};

target.introduce(mockSendThread, 0, new LinkedBlockingDeque<>());
target.connect(InetSocketAddress.createUnresolved("127.0.0.1", 6000));

CompletableFuture<Long> pendingCleanup = CompletableFuture
.runAsync(() -> target.cleanup())
.thenApply(nothing -> System.currentTimeMillis());
channelFutureListener.operationComplete(mockChannelFuture(true));

long endOperationComplete = System.currentTimeMillis();
long endCleanup = pendingCleanup.get();

// `cleanup` completes before `operationComplete` (exits when `#cancelled` is true)
assertTrue(endCleanup <= endOperationComplete);
assertFalse(target.isConnected());
}

ChannelFuture mockChannelFuture(boolean successful) {
ChannelFuture channelFuture = mock(ChannelFuture.class);
when(channelFuture.isSuccess()).thenReturn(successful);
if (successful) {
when(channelFuture.channel()).thenReturn(mock(Channel.class));
}
return channelFuture;
}
}