From 7272b85165976a3e0d4dccee818388ab0a3728f8 Mon Sep 17 00:00:00 2001 From: shixiaoxiao Date: Fri, 29 Oct 2021 16:02:30 +0800 Subject: [PATCH 1/2] fix the bug of double master when use LeaderLatch to select the leader --- .../framework/recipes/leader/LeaderLatch.java | 9 +- .../recipes/leader/TestLeaderLatch.java | 99 +++++++++++++++++++ 2 files changed, 107 insertions(+), 1 deletion(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index 7d9ca3cadd..7e3f92bf64 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -496,10 +496,17 @@ public String getOurPath() @VisibleForTesting volatile CountDownLatch debugResetWaitLatch = null; + @VisibleForTesting + volatile CountDownLatch debugRestWaitBeforeNodeDelete = null; + @VisibleForTesting void reset() throws Exception { setLeadership(false); + if ( debugRestWaitBeforeNodeDelete != null ) + { + debugRestWaitBeforeNodeDelete.await(); + } setNode(null); BackgroundCallback callback = new BackgroundCallback() @@ -575,6 +582,7 @@ else if ( ourIndex == 0 ) } else { + setLeadership(false); String watchPath = sortedChildren.get(ourIndex - 1); Watcher watcher = new Watcher() { @@ -678,7 +686,6 @@ protected void handleStateChange(ConnectionState newState) private synchronized void setLeadership(boolean newValue) { boolean oldValue = hasLeadership.getAndSet(newValue); - if ( oldValue && !newValue ) { // Lost leadership, was true, now false listeners.forEach(LeaderLatchListener::notLeader); diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index d64e7cfee5..6307213c65 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -218,6 +218,105 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception } } + @Test + public void testCheckLeaderShipTiming() throws Exception + { + final String latchPath = "/test"; + Timing timing = new Timing(); + List latches = Lists.newArrayList(); + List clients = Lists.newArrayList(); + final BlockingQueue states = Queues.newLinkedBlockingQueue(); + ExecutorService executorService = Executors.newFixedThreadPool(2); + for ( int i = 0; i < 2; ++i ) { + try { + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .connectionTimeoutMs(10000) + .sessionTimeoutMs(60000) + .retryPolicy(new RetryOneTime(1)) + .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy()) + .build(); + ConnectionStateListener stateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if (newState == ConnectionState.CONNECTED) { + states.add(newState.name()); + } + } + }; + client.getConnectionStateListenable().addListener(stateListener); + client.start(); + clients.add(client); + LeaderLatch latch = new LeaderLatch(client, latchPath, String.valueOf(i)); + LeaderLatchListener listener = new LeaderLatchListener() { + @Override + public void isLeader() { + states.add("true"); + } + + @Override + public void notLeader() { + states.add("false"); + } + }; + latch.addListener(listener); + latch.start(); + latches.add(latch); + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name()); + if (i == 0) { + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); + } + } + catch (Exception e){ + return; + } + } + timing.forWaiting().sleepABit(); + // now latch1 is leader, latch2 is not leader. latch2 listens to the ephemeral node created by latch1 + LeaderLatch latch1 = latches.get(0); + LeaderLatch latch2 = latches.get(1); + assertTrue(latch1.hasLeadership()); + assertTrue(!latch2.hasLeadership()); + try { + latch2.debugRestWaitBeforeNodeDelete = new CountDownLatch(1); + latch2.debugResetWaitLatch = new CountDownLatch(1); + latch1.debugResetWaitLatch = new CountDownLatch(1); + server.restart(); + // latch1 and latch2 connection stat from suspend to reconnected + // latch1 set itself is not the leader state and will delete old path and create new path then wait before getChildren + // latch2 wait before delete its old path and receive nodeDeleteEvent and then getChildren find itself is leader + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), + "false"); //latch1 is not leader + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), + "true"); //latch2 is leader + assertTrue(latch2.hasLeadership()); + assertTrue(!latch1.hasLeadership()); + // latch1 continue and getChildren and find itself is not the leader and listen to the node created by latch2 + latch1.debugResetWaitLatch.countDown(); + timing.sleepABit(); + // latch2 continue and delete old path and create new path then wait before getChildren + latch2.debugRestWaitBeforeNodeDelete.countDown(); + // latch1 receive nodeDeleteEvent and then getChildren find itself is leader + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), + "true"); + assertTrue(latch1.hasLeadership()); + latch2.debugResetWaitLatch.countDown(); // latch2 continue and getChildren find itself is not leader + timing.forWaiting().sleepABit(); + + assertTrue(latch1.hasLeadership()); + assertTrue(!latch2.hasLeadership()); + } + finally { + for(int i = 0; i < clients.size(); ++i) { + CloseableUtils.closeQuietly(latches.get(i)); + CloseableUtils.closeQuietly(clients.get(i)); + } + executorService.shutdown(); + } + } + @Test public void testSessionErrorPolicy() throws Exception { From 492506c6dcee19c1ea02a7e15b09efdc923ad90e Mon Sep 17 00:00:00 2001 From: tison Date: Thu, 29 Sep 2022 21:17:27 +0800 Subject: [PATCH 2/2] fix test Signed-off-by: tison --- .../recipes/leader/TestLeaderLatch.java | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 5e477af82c..367dd99446 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -30,6 +30,8 @@ import com.google.common.collect.Queues; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.imps.TestCleanState; @@ -48,6 +50,7 @@ import org.apache.curator.test.compatibility.Timing2; import org.apache.curator.utils.CloseableUtils; import org.awaitility.Awaitility; +import org.awaitility.core.ThrowingRunnable; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -280,35 +283,38 @@ public void notLeader() { LeaderLatch latch1 = latches.get(0); LeaderLatch latch2 = latches.get(1); assertTrue(latch1.hasLeadership()); - assertTrue(!latch2.hasLeadership()); + assertFalse(latch2.hasLeadership()); try { latch2.debugRestWaitBeforeNodeDelete = new CountDownLatch(1); latch2.debugResetWaitLatch = new CountDownLatch(1); latch1.debugResetWaitLatch = new CountDownLatch(1); - server.restart(); - // latch1 and latch2 connection stat from suspend to reconnected + + // force latch1 and latch2 reset + latch1.reset(); + ForkJoinPool.commonPool().submit(() -> { + latch2.reset(); + return null; + }); + // latch1 set itself is not the leader state and will delete old path and create new path then wait before getChildren // latch2 wait before delete its old path and receive nodeDeleteEvent and then getChildren find itself is leader - assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), - "false"); //latch1 is not leader - assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), - "true"); //latch2 is leader + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "false"); //latch1 is not leader + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); //latch2 is leader assertTrue(latch2.hasLeadership()); - assertTrue(!latch1.hasLeadership()); + assertFalse(latch1.hasLeadership()); // latch1 continue and getChildren and find itself is not the leader and listen to the node created by latch2 latch1.debugResetWaitLatch.countDown(); timing.sleepABit(); // latch2 continue and delete old path and create new path then wait before getChildren latch2.debugRestWaitBeforeNodeDelete.countDown(); // latch1 receive nodeDeleteEvent and then getChildren find itself is leader - assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), - "true"); + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); assertTrue(latch1.hasLeadership()); latch2.debugResetWaitLatch.countDown(); // latch2 continue and getChildren find itself is not leader timing.forWaiting().sleepABit(); assertTrue(latch1.hasLeadership()); - assertTrue(!latch2.hasLeadership()); + assertFalse(latch2.hasLeadership()); } finally { for(int i = 0; i < clients.size(); ++i) {