From d25af9b3786d705e8bad2860aeb417af8754425e Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 10 May 2024 15:31:57 +0800 Subject: [PATCH 1/4] CURATOR-696. Fix double leader for LeaderLatch Signed-off-by: tison --- .../framework/recipes/leader/LeaderLatch.java | 79 +++++++++++-------- 1 file changed, 47 insertions(+), 32 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index 80509dbf8..4411c5815 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -509,7 +509,7 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex getChildren(); } } else { - log.error("getChildren() failed. rc = " + event.getResultCode()); + log.error("getChildren() failed. rc = {}", event.getResultCode()); } } }; @@ -548,43 +548,58 @@ private void checkLeadership(List children) throws Exception { log.debug("checkLeadership with id: {}, ourPath: {}, children: {}", id, localOurPath, sortedChildren); if (ourIndex < 0) { - log.error("Can't find our node. Resetting. Index: " + ourIndex); + log.error("Can't find our node. Resetting. Index: {}", ourIndex); reset(); - } else if (ourIndex == 0) { - lastPathIsLeader.set(localOurPath); - setLeadership(true); - } else { - setLeadership(false); - String watchPath = sortedChildren.get(ourIndex - 1); - Watcher watcher = new Watcher() { - @Override - public void process(WatchedEvent event) { - if (state.get() == State.STARTED && event.getType() == Event.EventType.NodeDeleted) { - try { - getChildren(); - } catch (Exception ex) { - ThreadUtils.checkInterrupted(ex); - log.error("An error occurred checking the leadership.", ex); + return; + } + + if (ourIndex == 0) { + client.getData() + .inBackground((client, event) -> { + final long ephemeralOwner = + event.getStat() != null ? event.getStat().getEphemeralOwner() : -1; + final long thisSessionId = + client.getZookeeperClient().getZooKeeper().getSessionId(); + if (ephemeralOwner != thisSessionId) { + // this node is gone - reset + reset(); + } else { + lastPathIsLeader.set(localOurPath); + setLeadership(true); } - } - } - }; + }) + .forPath(ZKPaths.makePath(latchPath, localOurPath)); - BackgroundCallback callback = new BackgroundCallback() { - @Override - public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { - if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) { - // previous node is gone - retry getChildren + return; + } + + setLeadership(false); + String watchPath = sortedChildren.get(ourIndex - 1); + Watcher watcher = new Watcher() { + @Override + public void process(WatchedEvent event) { + if (state.get() == State.STARTED && event.getType() == Event.EventType.NodeDeleted) { + try { getChildren(); + } catch (Exception ex) { + ThreadUtils.checkInterrupted(ex); + log.error("An error occurred checking the leadership.", ex); } } - }; - // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak - client.getData() - .usingWatcher(watcher) - .inBackground(callback) - .forPath(ZKPaths.makePath(latchPath, watchPath)); - } + } + }; + + BackgroundCallback callback = new BackgroundCallback() { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { + if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) { + // previous node is gone - retry getChildren + getChildren(); + } + } + }; + // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak + client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath)); } private void getChildren() throws Exception { From 7c3fe06e5d0cb05896b4f31819678592eb48535d Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 10 May 2024 18:53:41 +0800 Subject: [PATCH 2/4] fixup path Signed-off-by: tison --- .../apache/curator/framework/recipes/leader/LeaderLatch.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index 4411c5815..4d20f9afd 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -568,8 +568,7 @@ private void checkLeadership(List children) throws Exception { setLeadership(true); } }) - .forPath(ZKPaths.makePath(latchPath, localOurPath)); - + .forPath(localOurPath); return; } From 78851b325af0a7bf068b0604db3d560ee85dc9b2 Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 12 May 2024 21:53:23 +0800 Subject: [PATCH 3/4] try add a test Signed-off-by: tison --- curator-recipes/pom.xml | 6 ++ .../recipes/leader/TestLeaderLatch.java | 67 ++++++++++++++++++- 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/curator-recipes/pom.xml b/curator-recipes/pom.xml index 927af0c28..1ac5a8b75 100644 --- a/curator-recipes/pom.xml +++ b/curator-recipes/pom.xml @@ -83,6 +83,12 @@ test + + org.assertj + assertj-core + test + + org.awaitility awaitility diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index dc79f6668..a2e61c972 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -19,6 +19,7 @@ package org.apache.curator.framework.recipes.leader; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; @@ -52,6 +53,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.IntStream; +import javax.annotation.Nonnull; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.imps.TestCleanState; @@ -72,9 +74,12 @@ import org.awaitility.Awaitility; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Tag(CuratorTestBase.zk35TestCompatibilityGroup) public class TestLeaderLatch extends BaseClassForTests { + private static final Logger LOG = LoggerFactory.getLogger(TestLeaderLatch.class); private static final String PATH_NAME = "/one/two/me"; private static final int MAX_LOOPS = 5; @@ -208,6 +213,59 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception { } } + @Test + public void testSessionInterruptionDoNotCauseBrainSplit() throws Exception { + final String latchPath = "/testSessionInterruptionDoNotCauseBrainSplit"; + final Timing2 timing = new Timing2(); + final BlockingQueue events = new LinkedBlockingQueue() { + @Override + public boolean add(@Nonnull TestEvent testEvent) { + LOG.debug("Add event: {}", testEvent); + return super.add(testEvent); + } + }; + + final List closeableResources = new ArrayList<>(); + try { + final String id0 = "id0"; + final CuratorFramework client0 = createAndStartClient(server.getConnectString(), timing, id0, null); + closeableResources.add(client0); + final LeaderLatch latch0 = createAndStartLeaderLatch(client0, latchPath, id0, events); + closeableResources.add(latch0); + + assertThat(events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)) + .isNotNull() + .isEqualTo(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP)); + + final String id1 = "id1"; + final CuratorFramework client1 = createAndStartClient(server.getConnectString(), timing, id1, null); + closeableResources.add(client1); + final LeaderLatch latch1 = createAndStartLeaderLatch(client1, latchPath, id1, events); + closeableResources.add(latch1); + + // wait for the non-leading LeaderLatch (i.e. latch1) instance to be done with its creation + // this call is time-consuming but necessary because we don't have a handle to detect the end of the reset + // call + timing.forWaiting().sleepABit(); + + assertTrue(latch0.hasLeadership()); + assertFalse(latch1.hasLeadership()); + + client0.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration(); + + assertThat(events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)) + .isNotNull() + .isEqualTo(new TestEvent(id0, TestEventType.LOST_LEADERSHIP)); + assertThat(events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)) + .isNotNull() + .isEqualTo(new TestEvent(id1, TestEventType.GAINED_LEADERSHIP)); + } finally { + // reverse is necessary for closing the LeaderLatch instances before closing the corresponding client + Collections.reverse(closeableResources); + closeableResources.forEach(CloseableUtils::closeQuietly); + } + } + @Test public void testResettingOfLeadershipAfterConcurrentLeadershipChange() throws Exception { final String latchPath = "/test"; @@ -316,7 +374,9 @@ private static CuratorFramework createAndStartClient( client.getConnectionStateListenable().addListener((client1, newState) -> { if (newState == ConnectionState.CONNECTED) { - events.add(new TestEvent(id, TestEventType.GAINED_CONNECTION)); + if (events != null) { + events.add(new TestEvent(id, TestEventType.GAINED_CONNECTION)); + } } }); @@ -366,6 +426,11 @@ public boolean equals(Object o) { TestEvent testEvent = (TestEvent) o; return Objects.equals(id, testEvent.id) && eventType == testEvent.eventType; } + + @Override + public String toString() { + return "TestEvent{" + "eventType=" + eventType + ", id='" + id + '\'' + '}'; + } } @Test From 969f689e653e6235db513599b722a6310634c913 Mon Sep 17 00:00:00 2001 From: Kezhu Wang Date: Mon, 13 May 2024 17:54:34 +0800 Subject: [PATCH 4/4] separate events for latches to make assertion a bit obvious --- .../recipes/leader/TestLeaderLatch.java | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index a2e61c972..528b317ff 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -53,7 +53,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.IntStream; -import javax.annotation.Nonnull; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.imps.TestCleanState; @@ -217,30 +216,25 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception { public void testSessionInterruptionDoNotCauseBrainSplit() throws Exception { final String latchPath = "/testSessionInterruptionDoNotCauseBrainSplit"; final Timing2 timing = new Timing2(); - final BlockingQueue events = new LinkedBlockingQueue() { - @Override - public boolean add(@Nonnull TestEvent testEvent) { - LOG.debug("Add event: {}", testEvent); - return super.add(testEvent); - } - }; + final BlockingQueue events0 = new LinkedBlockingQueue<>(); + final BlockingQueue events1 = new LinkedBlockingQueue<>(); final List closeableResources = new ArrayList<>(); try { final String id0 = "id0"; final CuratorFramework client0 = createAndStartClient(server.getConnectString(), timing, id0, null); closeableResources.add(client0); - final LeaderLatch latch0 = createAndStartLeaderLatch(client0, latchPath, id0, events); + final LeaderLatch latch0 = createAndStartLeaderLatch(client0, latchPath, id0, events0); closeableResources.add(latch0); - assertThat(events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)) + assertThat(events0.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)) .isNotNull() .isEqualTo(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP)); final String id1 = "id1"; final CuratorFramework client1 = createAndStartClient(server.getConnectString(), timing, id1, null); closeableResources.add(client1); - final LeaderLatch latch1 = createAndStartLeaderLatch(client1, latchPath, id1, events); + final LeaderLatch latch1 = createAndStartLeaderLatch(client1, latchPath, id1, events1); closeableResources.add(latch1); // wait for the non-leading LeaderLatch (i.e. latch1) instance to be done with its creation @@ -253,12 +247,16 @@ public boolean add(@Nonnull TestEvent testEvent) { client0.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration(); - assertThat(events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)) - .isNotNull() - .isEqualTo(new TestEvent(id0, TestEventType.LOST_LEADERSHIP)); - assertThat(events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)) + assertThat(events1.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)) .isNotNull() .isEqualTo(new TestEvent(id1, TestEventType.GAINED_LEADERSHIP)); + + assertThat(events0.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)) + .isNotNull() + .isEqualTo(new TestEvent(id0, TestEventType.LOST_LEADERSHIP)); + // No leadership grained to old leader after session changed, hence no brain split. + assertThat(events0.poll(20, TimeUnit.MILLISECONDS)) + .isNotEqualTo(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP)); } finally { // reverse is necessary for closing the LeaderLatch instances before closing the corresponding client Collections.reverse(closeableResources);