diff --git a/curator-recipes/pom.xml b/curator-recipes/pom.xml
index d27d7fa3cb..b84a94896b 100644
--- a/curator-recipes/pom.xml
+++ b/curator-recipes/pom.xml
@@ -86,6 +86,12 @@
commons-math
test
+
+
+ org.awaitility
+ awaitility
+ test
+
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 5d1c249b08..e8187cecb4 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -190,6 +190,12 @@ public void close() throws IOException
close(closeMode);
}
+ @VisibleForTesting
+ void closeOnDemand() throws IOException
+ {
+ internalClose(closeMode, false);
+ }
+
/**
* Remove this instance from the leadership election. If this instance is the leader, leadership
* is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
@@ -198,9 +204,25 @@ public void close() throws IOException
* @param closeMode allows the default close mode to be overridden at the time the latch is closed.
* @throws IOException errors
*/
- public synchronized void close(CloseMode closeMode) throws IOException
+ public void close(CloseMode closeMode) throws IOException
{
- Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
+ internalClose(closeMode, true);
+ }
+
+ private synchronized void internalClose(CloseMode closeMode, boolean failOnClosed) throws IOException
+ {
+ if (!state.compareAndSet(State.STARTED, State.CLOSED))
+ {
+ if (failOnClosed)
+ {
+ throw new IllegalStateException("Already closed or has not been started");
+ }
+ else
+ {
+ return;
+ }
+ }
+
Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
cancelStartTask();
@@ -586,6 +608,9 @@ private void checkLeadership(List children) throws Exception
final String localOurPath = ourPath.get();
List sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
+
+ log.debug("checkLeadership with id: {}, ourPath: {}, children: {}", id, localOurPath, sortedChildren);
+
if ( ourIndex < 0 )
{
log.error("Can't find our node. Resetting. Index: " + ourIndex);
@@ -604,7 +629,7 @@ else if ( ourIndex == 0 )
@Override
public void process(WatchedEvent event)
{
- if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
+ if ( state.get() == State.STARTED && event.getType() == Event.EventType.NodeDeleted )
{
try
{
@@ -626,8 +651,8 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex
{
if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
{
- // previous node is gone - reset
- reset();
+ // previous node is gone - retry getChildren
+ getChildren();
}
}
};
@@ -669,7 +694,7 @@ protected void handleStateChange(ConnectionState newState)
{
if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) || !hasLeadership.get() )
{
- reset();
+ getChildren();
}
}
catch ( Exception e )
@@ -717,6 +742,7 @@ else if ( !oldValue && newValue )
private void setNode(String newValue) throws Exception
{
String oldPath = ourPath.getAndSet(newValue);
+ log.debug("setNode with id: {}, oldPath: {}, newValue: {}", id, oldPath, newValue);
if ( oldPath != null )
{
client.delete().guaranteed().inBackground().forPath(oldPath);
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index d64e7cfee5..671e3c4b03 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -29,6 +29,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.time.Duration;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.TestCleanState;
@@ -46,6 +47,7 @@
import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.test.compatibility.Timing2;
import org.apache.curator.utils.CloseableUtils;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -218,6 +220,62 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception
}
}
+ @Test
+ public void testLeadershipElectionWhenNodeDisappearsAfterChildrenAreRetrieved() throws Exception
+ {
+ final String latchPath = "/foo/bar";
+ final Timing2 timing = new Timing2();
+ final Duration pollInterval = Duration.ofMillis(100);
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
+ {
+ client.start();
+ LeaderLatch latchInitialLeader = new LeaderLatch(client, latchPath, "initial-leader");
+ LeaderLatch latchCandidate0 = new LeaderLatch(client, latchPath, "candidate-0");
+ LeaderLatch latchCandidate1 = new LeaderLatch(client, latchPath, "candidate-1");
+
+ try
+ {
+ latchInitialLeader.start();
+
+ // we want to make sure that the leader gets leadership before other instances are going to join the party
+ waitForALeader(Collections.singletonList(latchInitialLeader), new Timing());
+ // candidate #0 will wait for the leader to go away - this should happen after the child nodes are retrieved by candidate #0
+ latchCandidate0.debugCheckLeaderShipLatch = new CountDownLatch(1);
+ latchCandidate0.start();
+
+ final int expectedChildrenAfterCandidate0Joins = 2;
+ Awaitility.await("There should be " + expectedChildrenAfterCandidate0Joins + " child nodes created after candidate #0 joins the leader election.")
+ .pollInterval(pollInterval)
+ .pollInSameThread()
+ .until(() -> client.getChildren().forPath(latchPath).size() == expectedChildrenAfterCandidate0Joins);
+ // no extra CountDownLatch needs to be set here because candidate #1 will rely on candidate #0
+ latchCandidate1.start();
+
+ final int expectedChildrenAfterCandidate1Joins = 3;
+ Awaitility.await("There should be " + expectedChildrenAfterCandidate1Joins + " child nodes created after candidate #1 joins the leader election.")
+ .pollInterval(pollInterval)
+ .pollInSameThread()
+ .until(() -> client.getChildren().forPath(latchPath).size() == expectedChildrenAfterCandidate1Joins);
+
+ // triggers the removal of the corresponding child node after candidate #0 retrieved the children
+ latchInitialLeader.close();
+
+ latchCandidate0.debugCheckLeaderShipLatch.countDown();
+
+ waitForALeader(Arrays.asList(latchCandidate0, latchCandidate1), new Timing());
+
+ assertTrue(latchCandidate0.hasLeadership() ^ latchCandidate1.hasLeadership());
+ }
+ finally
+ {
+ for (LeaderLatch latchToClose : Arrays.asList(latchInitialLeader, latchCandidate0, latchCandidate1))
+ {
+ latchToClose.closeOnDemand();
+ }
+ }
+ }
+ }
+
@Test
public void testSessionErrorPolicy() throws Exception
{
@@ -248,7 +306,8 @@ public void stateChanged(CuratorFramework client, ConnectionState newState)
client.getConnectionStateListenable().addListener(stateListener);
client.start();
- latch = new LeaderLatch(client, "/test");
+ final String latchPatch = "/test";
+ latch = new LeaderLatch(client, latchPatch);
LeaderLatchListener listener = new LeaderLatchListener()
{
@Override
@@ -267,6 +326,7 @@ public void notLeader()
latch.start();
assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
+ final List beforeResetChildren = client.getChildren().forPath(latchPatch);
server.stop();
if ( isSessionIteration )
{
@@ -284,6 +344,8 @@ public void notLeader()
server.restart();
assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED.name());
assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
+ final List afterResetChildren = client.getChildren().forPath(latchPatch);
+ assertEquals(beforeResetChildren, afterResetChildren);
}
}
finally
diff --git a/curator-test-zk35/pom.xml b/curator-test-zk35/pom.xml
index 2e22ac825a..088249e57b 100644
--- a/curator-test-zk35/pom.xml
+++ b/curator-test-zk35/pom.xml
@@ -166,6 +166,12 @@
slf4j-log4j12
test
+
+
+ org.awaitility
+ awaitility
+ test
+