From a054902b3e7302616b43ced32f593f709baf044e Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 18 Dec 2024 00:13:55 +0800 Subject: [PATCH 1/3] CURATOR-724. Fix LeaderLatch recover on reconnected and missing leaderPath Signed-off-by: tison --- .../framework/recipes/leader/LeaderLatch.java | 31 +++++++++++++------ 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index 4d20f9afd..a1cddde7e 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -509,7 +509,7 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex getChildren(); } } else { - log.error("getChildren() failed. rc = {}", event.getResultCode()); + log.error("creatingParentContainersIfNeeded() failed (rc = {})", event.getResultCode()); } } }; @@ -528,7 +528,7 @@ private synchronized void internalStart() { reset(); } catch (Exception e) { ThreadUtils.checkInterrupted(e); - log.error("An error occurred checking resetting leadership.", e); + log.error("failed to check resetting leadership.", e); } } } @@ -548,7 +548,7 @@ private void checkLeadership(List children) throws Exception { log.debug("checkLeadership with id: {}, ourPath: {}, children: {}", id, localOurPath, sortedChildren); if (ourIndex < 0) { - log.error("Can't find our node. Resetting. Index: {}", ourIndex); + log.error("failed to find our node; resetting (index: {})", ourIndex); reset(); return; } @@ -582,7 +582,7 @@ public void process(WatchedEvent event) { getChildren(); } catch (Exception ex) { ThreadUtils.checkInterrupted(ex); - log.error("An error occurred checking the leadership.", ex); + log.error("failed to check the leadership.", ex); } } } @@ -607,6 +607,17 @@ private void getChildren() throws Exception { public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if (event.getResultCode() == KeeperException.Code.OK.intValue()) { checkLeadership(event.getChildren()); + } else if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) { + // latchPath has gone - reset + // + // This is possible when RECONNECTED during: + // (1) Scale the zk cluster to 0 nodes. + // (2) Scale it back. + // + // See also https://issues.apache.org/jira/browse/CURATOR-724 + reset(); + } else { + log.error("getChildren() failed (rc = {})", event.getResultCode()); } } }; @@ -616,11 +627,6 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex @VisibleForTesting protected void handleStateChange(ConnectionState newState) { switch (newState) { - default: { - // NOP - break; - } - case RECONNECTED: { try { if (client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) @@ -629,7 +635,7 @@ protected void handleStateChange(ConnectionState newState) { } } catch (Exception e) { ThreadUtils.checkInterrupted(e); - log.error("Could not reset leader latch", e); + log.error("failed to reset leader latch", e); setLeadership(false); } break; @@ -646,6 +652,11 @@ protected void handleStateChange(ConnectionState newState) { setLeadership(false); break; } + + default: { + // NOP + break; + } } } From 5d9043f1752400e1abf1696c33dbd6df1f3749da Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 22 Dec 2024 02:38:58 +0800 Subject: [PATCH 2/3] add test Signed-off-by: tison --- .mvn/extensions.xml | 2 +- .../framework/recipes/leader/LeaderLatch.java | 11 ++-- .../recipes/leader/TestLeaderLatch.java | 57 +++++++++++++++++++ 3 files changed, 65 insertions(+), 5 deletions(-) diff --git a/.mvn/extensions.xml b/.mvn/extensions.xml index 1ca382883..98c357118 100644 --- a/.mvn/extensions.xml +++ b/.mvn/extensions.xml @@ -24,7 +24,7 @@ com.gradle develocity-maven-extension - 1.21.4 + 1.23 com.gradle diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index a1cddde7e..4226e8819 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -624,18 +624,21 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null)); } + @VisibleForTesting + volatile CountDownLatch debugHandleReconnectedLatch = null; + @VisibleForTesting protected void handleStateChange(ConnectionState newState) { switch (newState) { case RECONNECTED: { try { - if (client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) - || !hasLeadership.get()) { - getChildren(); + if (debugHandleReconnectedLatch != null) { + debugHandleReconnectedLatch.await(); } + getChildren(); } catch (Exception e) { ThreadUtils.checkInterrupted(e); - log.error("failed to reset leader latch", e); + log.error("failed to recheck leadership on reconnected", e); setLeadership(false); } break; diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 528b317ff..0dc375d16 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -179,6 +179,63 @@ public void testUncreatedPathGetLeader() throws Exception { } } + // @see https://issues.apache.org/jira/browse/CURATOR-724 + @Test + public void testGetChildrenHitsNoNode() throws Exception { + final String latchPath = "/testGetChildrenHitsNoNode"; + final Timing2 timing = new Timing2(); + final BlockingQueue events0 = new LinkedBlockingQueue<>(); + final BlockingQueue events1 = new LinkedBlockingQueue<>(); + + final List closeableResources = new ArrayList<>(); + try { + final String id0 = "id0"; + final CuratorFramework client0 = createAndStartClient(server.getConnectString(), timing, id0, null); + closeableResources.add(client0); + final LeaderLatch latch0 = createAndStartLeaderLatch(client0, latchPath, id0, events0); + closeableResources.add(latch0); + + assertThat(events0.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)) + .isNotNull() + .isEqualTo(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP)); + + final String id1 = "id1"; + final CuratorFramework client1 = createAndStartClient(server.getConnectString(), timing, id1, null); + closeableResources.add(client1); + final LeaderLatch latch1 = createAndStartLeaderLatch(client1, latchPath, id1, events1); + closeableResources.add(latch1); + + // wait for the non-leading LeaderLatch (i.e. latch1) instance to be done with its creation + // this call is time-consuming but necessary because we don't have a handle to detect the end of the reset + // call + timing.forWaiting().sleepABit(); + + assertTrue(latch0.hasLeadership()); + assertFalse(latch1.hasLeadership()); + + // ensure we can observe the leadership transferred to latch1 + latch0.debugHandleReconnectedLatch = new CountDownLatch(1); + + // scale to zero - recreate the cluster + final int port = server.getPort(); + server.close(); + server = new TestingServer(port, true); + + assertThat(events1.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)) + .isNotNull() + .isEqualTo(new TestEvent(id1, TestEventType.GAINED_LEADERSHIP)); + + latch0.debugHandleReconnectedLatch.countDown(); + assertThat(events0.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)) + .isNotNull() + .isEqualTo(new TestEvent(id0, TestEventType.LOST_LEADERSHIP)); + } finally { + // reverse is necessary for closing the LeaderLatch instances before closing the corresponding client + Collections.reverse(closeableResources); + closeableResources.forEach(CloseableUtils::closeQuietly); + } + } + @Test public void testWatchedNodeDeletedOnReconnect() throws Exception { final String latchPath = "/foo/bar"; From b0383f5d0ffdcb63e8ca75ddfc1c7dae63add77f Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 22 Dec 2024 02:48:18 +0800 Subject: [PATCH 3/3] improve logs Signed-off-by: tison --- .../framework/recipes/leader/LeaderLatch.java | 21 +++++++++---------- .../src/test/resources/log4j.properties | 2 +- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index 4226e8819..99e0fb9a4 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -509,7 +509,7 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex getChildren(); } } else { - log.error("creatingParentContainersIfNeeded() failed (rc = {})", event.getResultCode()); + log.error("[id={}] creatingParentContainersIfNeeded() failed (rc = {})", id, event.getResultCode()); } } }; @@ -528,7 +528,7 @@ private synchronized void internalStart() { reset(); } catch (Exception e) { ThreadUtils.checkInterrupted(e); - log.error("failed to check resetting leadership.", e); + log.error("[id={}] failed to check resetting leadership.", id, e); } } } @@ -545,10 +545,10 @@ private void checkLeadership(List children) throws Exception { List sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children); int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1; - log.debug("checkLeadership with id: {}, ourPath: {}, children: {}", id, localOurPath, sortedChildren); + log.debug("[id={}] checkLeadership with ourPath: {}, children: {}", id, localOurPath, sortedChildren); if (ourIndex < 0) { - log.error("failed to find our node; resetting (index: {})", ourIndex); + log.error("[id={}] failed to find our node; resetting (index: {})", id, ourIndex); reset(); return; } @@ -582,7 +582,7 @@ public void process(WatchedEvent event) { getChildren(); } catch (Exception ex) { ThreadUtils.checkInterrupted(ex); - log.error("failed to check the leadership.", ex); + log.error("[id={}] failed to check the leadership.", id, ex); } } } @@ -592,7 +592,7 @@ public void process(WatchedEvent event) { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) { - // previous node is gone - retry getChildren + log.debug("[id={}] previous node is gone; retry getChildren", id); getChildren(); } } @@ -608,8 +608,7 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex if (event.getResultCode() == KeeperException.Code.OK.intValue()) { checkLeadership(event.getChildren()); } else if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) { - // latchPath has gone - reset - // + log.debug("[id={}] latchPath has gone; reset", id); // This is possible when RECONNECTED during: // (1) Scale the zk cluster to 0 nodes. // (2) Scale it back. @@ -617,7 +616,7 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex // See also https://issues.apache.org/jira/browse/CURATOR-724 reset(); } else { - log.error("getChildren() failed (rc = {})", event.getResultCode()); + log.error("[id={}] getChildren() failed (rc = {})", id, event.getResultCode()); } } }; @@ -638,7 +637,7 @@ protected void handleStateChange(ConnectionState newState) { getChildren(); } catch (Exception e) { ThreadUtils.checkInterrupted(e); - log.error("failed to recheck leadership on reconnected", e); + log.error("[id={}] failed to recheck leadership on reconnected", id, e); setLeadership(false); } break; @@ -676,7 +675,7 @@ private synchronized void setLeadership(boolean newValue) { private void setNode(String newValue) throws Exception { String oldPath = ourPath.getAndSet(newValue); - log.debug("setNode with id: {}, oldPath: {}, newValue: {}", id, oldPath, newValue); + log.debug("[id={}] setNode with oldPath: {}, newValue: {}", id, oldPath, newValue); if (oldPath != null) { client.delete().guaranteed().inBackground().forPath(oldPath); } diff --git a/curator-recipes/src/test/resources/log4j.properties b/curator-recipes/src/test/resources/log4j.properties index 706484ce5..98720873e 100644 --- a/curator-recipes/src/test/resources/log4j.properties +++ b/curator-recipes/src/test/resources/log4j.properties @@ -22,4 +22,4 @@ log4j.additivity.org.apache.curator=false log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%-5p %c %x %m [%t]%n +log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p %c %x %m [%t]%n