diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index e8187cecb4..553e50703e 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -540,10 +540,17 @@ public String getLastPathIsLeader() @VisibleForTesting volatile CountDownLatch debugResetWaitLatch = null; + @VisibleForTesting + volatile CountDownLatch debugResetWaitBeforeNodeDeleteLatch = null; + @VisibleForTesting void reset() throws Exception { setLeadership(false); + if ( debugResetWaitBeforeNodeDeleteLatch != null ) + { + debugResetWaitBeforeNodeDeleteLatch.await(); + } setNode(null); BackgroundCallback callback = new BackgroundCallback() @@ -623,6 +630,7 @@ else if ( ourIndex == 0 ) } else { + setLeadership(false); String watchPath = sortedChildren.get(ourIndex - 1); Watcher watcher = new Watcher() { @@ -726,7 +734,6 @@ protected void handleStateChange(ConnectionState newState) private synchronized void setLeadership(boolean newValue) { boolean oldValue = hasLeadership.getAndSet(newValue); - if ( oldValue && !newValue ) { // Lost leadership, was true, now false listeners.forEach(LeaderLatchListener::notLeader); diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 671e3c4b03..69deb20750 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -29,7 +29,11 @@ import com.google.common.collect.Lists; import com.google.common.collect.Queues; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.Closeable; import java.time.Duration; +import java.util.ArrayList; +import java.util.Objects; +import java.util.concurrent.ForkJoinPool; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.imps.TestCleanState; @@ -220,6 +224,153 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception } } + @Test + public void testResettingOfLeadershipAfterConcurrentLeadershipChange() throws Exception + { + final String latchPath = "/test"; + final Timing2 timing = new Timing2(); + final BlockingQueue events = Queues.newLinkedBlockingQueue(); + + final List closeableResources = new ArrayList<>(); + try + { + final String id0 = "id0"; + final CuratorFramework client0 = createAndStartClient(server.getConnectString(), timing, id0, events); + closeableResources.add(client0); + final LeaderLatch latch0 = createAndStartLeaderLatch(client0, latchPath, id0, events); + closeableResources.add(latch0); + + assertEquals(new TestEvent(id0, TestEventType.GAINED_CONNECTION), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); + assertEquals(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); + + final String id1 = "id1"; + final CuratorFramework client1 = createAndStartClient(server.getConnectString(), timing, id1, events); + closeableResources.add(client1); + final LeaderLatch latch1 = createAndStartLeaderLatch(client1, latchPath, id1, events); + closeableResources.add(latch1); + + assertEquals(new TestEvent(id1, TestEventType.GAINED_CONNECTION), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); + + // wait for the non-leading LeaderLatch (i.e. latch1) instance to be done with its creation + // this call is time-consuming but necessary because we don't have a handle to detect the end of the reset call + timing.forWaiting().sleepABit(); + + assertTrue(latch0.hasLeadership()); + assertFalse(latch1.hasLeadership()); + + latch1.debugResetWaitBeforeNodeDeleteLatch = new CountDownLatch(1); + latch1.debugResetWaitLatch = new CountDownLatch(1); + latch0.debugResetWaitLatch = new CountDownLatch(1); + + // force latch0 and latch1 reset to trigger the actual test + latch0.reset(); + // latch1 needs to be called within a separate thread since it's going to be blocked by the CountDownLatch outside an async call + ForkJoinPool.commonPool().submit(() -> { + latch1.reset(); + return null; + }); + + // latch0.reset() will result in it losing its leadership, deleting its old child node and creating a new child node before being blocked by its debugResetWaitLatch + assertEquals(new TestEvent(id0, TestEventType.LOST_LEADERSHIP), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); + // latch1.reset() is blocked but latch1 will gain leadership due its node watching latch0's node to be deleted + assertEquals(new TestEvent(id1, TestEventType.GAINED_LEADERSHIP), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); + + assertFalse(latch0.hasLeadership()); + assertTrue(latch1.hasLeadership()); + + // latch0.reset() continues with the getChildren call, finds itself not being the leader and starts listening to the node created by latch1 + latch0.debugResetWaitLatch.countDown(); + timing.sleepABit(); + + // latch1.reset() continues, deletes its old child node and creates a new child node before being blocked by its debugResetWaitLatch + latch1.debugResetWaitBeforeNodeDeleteLatch.countDown(); + + // latch0 receives NodeDeleteEvent and then finds itself to be the leader + assertEquals(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); + assertTrue(latch0.hasLeadership()); + + // latch1.reset() continues and finds itself not being the leader + latch1.debugResetWaitLatch.countDown(); + // this call is time-consuming but necessary because we don't have a handle to detect the end of the reset call + timing.forWaiting().sleepABit(); + + assertTrue(latch0.hasLeadership()); + assertFalse(latch1.hasLeadership()); + } + finally + { + // reverse is necessary for closing the LeaderLatch instances before closing the corresponding client + Collections.reverse(closeableResources); + closeableResources.forEach(CloseableUtils::closeQuietly); + } + } + + private static CuratorFramework createAndStartClient(String zkConnectString, Timing2 timing, String id, Collection events) { + final CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(zkConnectString) + .connectionTimeoutMs(timing.connection()) + .sessionTimeoutMs(timing.session()) + .retryPolicy(new RetryOneTime(1)) + .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy()) + .build(); + + client.getConnectionStateListenable().addListener((client1, newState) -> { + if ( newState == ConnectionState.CONNECTED ) + { + events.add(new TestEvent(id, TestEventType.GAINED_CONNECTION)); + } + }); + + client.start(); + + return client; + } + + private static LeaderLatch createAndStartLeaderLatch(CuratorFramework client, String latchPath, String id, Collection events) throws Exception + { + final LeaderLatch latch = new LeaderLatch(client, latchPath, id); + latch.addListener(new LeaderLatchListener() { + @Override + public void isLeader() { + events.add(new TestEvent(latch.getId(), TestEventType.GAINED_LEADERSHIP)); + } + + @Override + public void notLeader() { + events.add(new TestEvent(latch.getId(), TestEventType.LOST_LEADERSHIP)); + } + }); + latch.start(); + + return latch; + } + + private enum TestEventType + { + GAINED_LEADERSHIP, + LOST_LEADERSHIP, + GAINED_CONNECTION; + } + + private static class TestEvent { + private final String id; + private final TestEventType eventType; + + public TestEvent(String id, TestEventType eventType) + { + this.id = id; + this.eventType = eventType; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TestEvent testEvent = (TestEvent) o; + return Objects.equals(id, testEvent.id) && eventType == testEvent.eventType; + } + } + @Test public void testLeadershipElectionWhenNodeDisappearsAfterChildrenAreRetrieved() throws Exception {