diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index e8187cecb4..dc51082ed3 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -540,10 +540,17 @@ public String getLastPathIsLeader() @VisibleForTesting volatile CountDownLatch debugResetWaitLatch = null; + @VisibleForTesting + volatile CountDownLatch debugRestWaitBeforeNodeDelete = null; + @VisibleForTesting void reset() throws Exception { setLeadership(false); + if ( debugRestWaitBeforeNodeDelete != null ) + { + debugRestWaitBeforeNodeDelete.await(); + } setNode(null); BackgroundCallback callback = new BackgroundCallback() @@ -623,6 +630,7 @@ else if ( ourIndex == 0 ) } else { + setLeadership(false); String watchPath = sortedChildren.get(ourIndex - 1); Watcher watcher = new Watcher() { @@ -726,7 +734,6 @@ protected void handleStateChange(ConnectionState newState) private synchronized void setLeadership(boolean newValue) { boolean oldValue = hasLeadership.getAndSet(newValue); - if ( oldValue && !newValue ) { // Lost leadership, was true, now false listeners.forEach(LeaderLatchListener::notLeader); diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 671e3c4b03..367dd99446 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -30,6 +30,8 @@ import com.google.common.collect.Queues; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.imps.TestCleanState; @@ -48,6 +50,7 @@ import org.apache.curator.test.compatibility.Timing2; import org.apache.curator.utils.CloseableUtils; import org.awaitility.Awaitility; +import org.awaitility.core.ThrowingRunnable; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception } } + @Test + public void testCheckLeaderShipTiming() throws Exception + { + final String latchPath = "/test"; + Timing timing = new Timing(); + List latches = Lists.newArrayList(); + List clients = Lists.newArrayList(); + final BlockingQueue states = Queues.newLinkedBlockingQueue(); + ExecutorService executorService = Executors.newFixedThreadPool(2); + for ( int i = 0; i < 2; ++i ) { + try { + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .connectionTimeoutMs(10000) + .sessionTimeoutMs(60000) + .retryPolicy(new RetryOneTime(1)) + .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy()) + .build(); + ConnectionStateListener stateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if (newState == ConnectionState.CONNECTED) { + states.add(newState.name()); + } + } + }; + client.getConnectionStateListenable().addListener(stateListener); + client.start(); + clients.add(client); + LeaderLatch latch = new LeaderLatch(client, latchPath, String.valueOf(i)); + LeaderLatchListener listener = new LeaderLatchListener() { + @Override + public void isLeader() { + states.add("true"); + } + + @Override + public void notLeader() { + states.add("false"); + } + }; + latch.addListener(listener); + latch.start(); + latches.add(latch); + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name()); + if (i == 0) { + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); + } + } + catch (Exception e){ + return; + } + } + timing.forWaiting().sleepABit(); + // now latch1 is leader, latch2 is not leader. latch2 listens to the ephemeral node created by latch1 + LeaderLatch latch1 = latches.get(0); + LeaderLatch latch2 = latches.get(1); + assertTrue(latch1.hasLeadership()); + assertFalse(latch2.hasLeadership()); + try { + latch2.debugRestWaitBeforeNodeDelete = new CountDownLatch(1); + latch2.debugResetWaitLatch = new CountDownLatch(1); + latch1.debugResetWaitLatch = new CountDownLatch(1); + + // force latch1 and latch2 reset + latch1.reset(); + ForkJoinPool.commonPool().submit(() -> { + latch2.reset(); + return null; + }); + + // latch1 set itself is not the leader state and will delete old path and create new path then wait before getChildren + // latch2 wait before delete its old path and receive nodeDeleteEvent and then getChildren find itself is leader + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "false"); //latch1 is not leader + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); //latch2 is leader + assertTrue(latch2.hasLeadership()); + assertFalse(latch1.hasLeadership()); + // latch1 continue and getChildren and find itself is not the leader and listen to the node created by latch2 + latch1.debugResetWaitLatch.countDown(); + timing.sleepABit(); + // latch2 continue and delete old path and create new path then wait before getChildren + latch2.debugRestWaitBeforeNodeDelete.countDown(); + // latch1 receive nodeDeleteEvent and then getChildren find itself is leader + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); + assertTrue(latch1.hasLeadership()); + latch2.debugResetWaitLatch.countDown(); // latch2 continue and getChildren find itself is not leader + timing.forWaiting().sleepABit(); + + assertTrue(latch1.hasLeadership()); + assertFalse(latch2.hasLeadership()); + } + finally { + for(int i = 0; i < clients.size(); ++i) { + CloseableUtils.closeQuietly(latches.get(i)); + CloseableUtils.closeQuietly(clients.get(i)); + } + executorService.shutdown(); + } + } + @Test public void testLeadershipElectionWhenNodeDisappearsAfterChildrenAreRetrieved() throws Exception {