From 7272b85165976a3e0d4dccee818388ab0a3728f8 Mon Sep 17 00:00:00 2001 From: shixiaoxiao Date: Fri, 29 Oct 2021 16:02:30 +0800 Subject: [PATCH 01/17] fix the bug of double master when use LeaderLatch to select the leader --- .../framework/recipes/leader/LeaderLatch.java | 9 +- .../recipes/leader/TestLeaderLatch.java | 99 +++++++++++++++++++ 2 files changed, 107 insertions(+), 1 deletion(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index 7d9ca3cadd..7e3f92bf64 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -496,10 +496,17 @@ public String getOurPath() @VisibleForTesting volatile CountDownLatch debugResetWaitLatch = null; + @VisibleForTesting + volatile CountDownLatch debugRestWaitBeforeNodeDelete = null; + @VisibleForTesting void reset() throws Exception { setLeadership(false); + if ( debugRestWaitBeforeNodeDelete != null ) + { + debugRestWaitBeforeNodeDelete.await(); + } setNode(null); BackgroundCallback callback = new BackgroundCallback() @@ -575,6 +582,7 @@ else if ( ourIndex == 0 ) } else { + setLeadership(false); String watchPath = sortedChildren.get(ourIndex - 1); Watcher watcher = new Watcher() { @@ -678,7 +686,6 @@ protected void handleStateChange(ConnectionState newState) private synchronized void setLeadership(boolean newValue) { boolean oldValue = hasLeadership.getAndSet(newValue); - if ( oldValue && !newValue ) { // Lost leadership, was true, now false listeners.forEach(LeaderLatchListener::notLeader); diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index d64e7cfee5..6307213c65 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -218,6 +218,105 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception } } + @Test + public void testCheckLeaderShipTiming() throws Exception + { + final String latchPath = "/test"; + Timing timing = new Timing(); + List latches = Lists.newArrayList(); + List clients = Lists.newArrayList(); + final BlockingQueue states = Queues.newLinkedBlockingQueue(); + ExecutorService executorService = Executors.newFixedThreadPool(2); + for ( int i = 0; i < 2; ++i ) { + try { + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .connectionTimeoutMs(10000) + .sessionTimeoutMs(60000) + .retryPolicy(new RetryOneTime(1)) + .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy()) + .build(); + ConnectionStateListener stateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if (newState == ConnectionState.CONNECTED) { + states.add(newState.name()); + } + } + }; + client.getConnectionStateListenable().addListener(stateListener); + client.start(); + clients.add(client); + LeaderLatch latch = new LeaderLatch(client, latchPath, String.valueOf(i)); + LeaderLatchListener listener = new LeaderLatchListener() { + @Override + public void isLeader() { + states.add("true"); + } + + @Override + public void notLeader() { + states.add("false"); + } + }; + latch.addListener(listener); + latch.start(); + latches.add(latch); + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name()); + if (i == 0) { + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); + } + } + catch (Exception e){ + return; + } + } + timing.forWaiting().sleepABit(); + // now latch1 is leader, latch2 is not leader. latch2 listens to the ephemeral node created by latch1 + LeaderLatch latch1 = latches.get(0); + LeaderLatch latch2 = latches.get(1); + assertTrue(latch1.hasLeadership()); + assertTrue(!latch2.hasLeadership()); + try { + latch2.debugRestWaitBeforeNodeDelete = new CountDownLatch(1); + latch2.debugResetWaitLatch = new CountDownLatch(1); + latch1.debugResetWaitLatch = new CountDownLatch(1); + server.restart(); + // latch1 and latch2 connection stat from suspend to reconnected + // latch1 set itself is not the leader state and will delete old path and create new path then wait before getChildren + // latch2 wait before delete its old path and receive nodeDeleteEvent and then getChildren find itself is leader + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), + "false"); //latch1 is not leader + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), + "true"); //latch2 is leader + assertTrue(latch2.hasLeadership()); + assertTrue(!latch1.hasLeadership()); + // latch1 continue and getChildren and find itself is not the leader and listen to the node created by latch2 + latch1.debugResetWaitLatch.countDown(); + timing.sleepABit(); + // latch2 continue and delete old path and create new path then wait before getChildren + latch2.debugRestWaitBeforeNodeDelete.countDown(); + // latch1 receive nodeDeleteEvent and then getChildren find itself is leader + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), + "true"); + assertTrue(latch1.hasLeadership()); + latch2.debugResetWaitLatch.countDown(); // latch2 continue and getChildren find itself is not leader + timing.forWaiting().sleepABit(); + + assertTrue(latch1.hasLeadership()); + assertTrue(!latch2.hasLeadership()); + } + finally { + for(int i = 0; i < clients.size(); ++i) { + CloseableUtils.closeQuietly(latches.get(i)); + CloseableUtils.closeQuietly(clients.get(i)); + } + executorService.shutdown(); + } + } + @Test public void testSessionErrorPolicy() throws Exception { From 492506c6dcee19c1ea02a7e15b09efdc923ad90e Mon Sep 17 00:00:00 2001 From: tison Date: Thu, 29 Sep 2022 21:17:27 +0800 Subject: [PATCH 02/17] fix test Signed-off-by: tison --- .../recipes/leader/TestLeaderLatch.java | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 5e477af82c..367dd99446 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -30,6 +30,8 @@ import com.google.common.collect.Queues; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.imps.TestCleanState; @@ -48,6 +50,7 @@ import org.apache.curator.test.compatibility.Timing2; import org.apache.curator.utils.CloseableUtils; import org.awaitility.Awaitility; +import org.awaitility.core.ThrowingRunnable; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -280,35 +283,38 @@ public void notLeader() { LeaderLatch latch1 = latches.get(0); LeaderLatch latch2 = latches.get(1); assertTrue(latch1.hasLeadership()); - assertTrue(!latch2.hasLeadership()); + assertFalse(latch2.hasLeadership()); try { latch2.debugRestWaitBeforeNodeDelete = new CountDownLatch(1); latch2.debugResetWaitLatch = new CountDownLatch(1); latch1.debugResetWaitLatch = new CountDownLatch(1); - server.restart(); - // latch1 and latch2 connection stat from suspend to reconnected + + // force latch1 and latch2 reset + latch1.reset(); + ForkJoinPool.commonPool().submit(() -> { + latch2.reset(); + return null; + }); + // latch1 set itself is not the leader state and will delete old path and create new path then wait before getChildren // latch2 wait before delete its old path and receive nodeDeleteEvent and then getChildren find itself is leader - assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), - "false"); //latch1 is not leader - assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), - "true"); //latch2 is leader + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "false"); //latch1 is not leader + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); //latch2 is leader assertTrue(latch2.hasLeadership()); - assertTrue(!latch1.hasLeadership()); + assertFalse(latch1.hasLeadership()); // latch1 continue and getChildren and find itself is not the leader and listen to the node created by latch2 latch1.debugResetWaitLatch.countDown(); timing.sleepABit(); // latch2 continue and delete old path and create new path then wait before getChildren latch2.debugRestWaitBeforeNodeDelete.countDown(); // latch1 receive nodeDeleteEvent and then getChildren find itself is leader - assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), - "true"); + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); assertTrue(latch1.hasLeadership()); latch2.debugResetWaitLatch.countDown(); // latch2 continue and getChildren find itself is not leader timing.forWaiting().sleepABit(); assertTrue(latch1.hasLeadership()); - assertTrue(!latch2.hasLeadership()); + assertFalse(latch2.hasLeadership()); } finally { for(int i = 0; i < clients.size(); ++i) { From fd91873afe7239db7e8993bc5244fed9126e7117 Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Mon, 10 Oct 2022 07:04:42 +0200 Subject: [PATCH 03/17] [CURATOR-653] Fixes typo in name: rest -> reset --- .../curator/framework/recipes/leader/LeaderLatch.java | 6 +++--- .../curator/framework/recipes/leader/TestLeaderLatch.java | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index dc51082ed3..aab7c10d80 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -541,15 +541,15 @@ public String getLastPathIsLeader() volatile CountDownLatch debugResetWaitLatch = null; @VisibleForTesting - volatile CountDownLatch debugRestWaitBeforeNodeDelete = null; + volatile CountDownLatch debugResetWaitBeforeNodeDelete = null; @VisibleForTesting void reset() throws Exception { setLeadership(false); - if ( debugRestWaitBeforeNodeDelete != null ) + if ( debugResetWaitBeforeNodeDelete != null ) { - debugRestWaitBeforeNodeDelete.await(); + debugResetWaitBeforeNodeDelete.await(); } setNode(null); diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 367dd99446..04bbd19a9e 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -285,7 +285,7 @@ public void notLeader() { assertTrue(latch1.hasLeadership()); assertFalse(latch2.hasLeadership()); try { - latch2.debugRestWaitBeforeNodeDelete = new CountDownLatch(1); + latch2.debugResetWaitBeforeNodeDelete = new CountDownLatch(1); latch2.debugResetWaitLatch = new CountDownLatch(1); latch1.debugResetWaitLatch = new CountDownLatch(1); @@ -306,7 +306,7 @@ public void notLeader() { latch1.debugResetWaitLatch.countDown(); timing.sleepABit(); // latch2 continue and delete old path and create new path then wait before getChildren - latch2.debugRestWaitBeforeNodeDelete.countDown(); + latch2.debugResetWaitBeforeNodeDelete.countDown(); // latch1 receive nodeDeleteEvent and then getChildren find itself is leader assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); assertTrue(latch1.hasLeadership()); From d0a6ab5874dce0341d030f83e4f3b70f625ae698 Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Mon, 10 Oct 2022 07:05:35 +0200 Subject: [PATCH 04/17] [CURATOR-653] Aligns name of debugResetWaitForNodeDelete with other latch fields --- .../curator/framework/recipes/leader/LeaderLatch.java | 6 +++--- .../curator/framework/recipes/leader/TestLeaderLatch.java | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index aab7c10d80..553e50703e 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -541,15 +541,15 @@ public String getLastPathIsLeader() volatile CountDownLatch debugResetWaitLatch = null; @VisibleForTesting - volatile CountDownLatch debugResetWaitBeforeNodeDelete = null; + volatile CountDownLatch debugResetWaitBeforeNodeDeleteLatch = null; @VisibleForTesting void reset() throws Exception { setLeadership(false); - if ( debugResetWaitBeforeNodeDelete != null ) + if ( debugResetWaitBeforeNodeDeleteLatch != null ) { - debugResetWaitBeforeNodeDelete.await(); + debugResetWaitBeforeNodeDeleteLatch.await(); } setNode(null); diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 04bbd19a9e..9ad980f7a1 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -285,7 +285,7 @@ public void notLeader() { assertTrue(latch1.hasLeadership()); assertFalse(latch2.hasLeadership()); try { - latch2.debugResetWaitBeforeNodeDelete = new CountDownLatch(1); + latch2.debugResetWaitBeforeNodeDeleteLatch = new CountDownLatch(1); latch2.debugResetWaitLatch = new CountDownLatch(1); latch1.debugResetWaitLatch = new CountDownLatch(1); @@ -306,7 +306,7 @@ public void notLeader() { latch1.debugResetWaitLatch.countDown(); timing.sleepABit(); // latch2 continue and delete old path and create new path then wait before getChildren - latch2.debugResetWaitBeforeNodeDelete.countDown(); + latch2.debugResetWaitBeforeNodeDeleteLatch.countDown(); // latch1 receive nodeDeleteEvent and then getChildren find itself is leader assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); assertTrue(latch1.hasLeadership()); From d1d318866b312958cef6a8b9223b7a6df1d1e515 Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Mon, 10 Oct 2022 07:21:26 +0200 Subject: [PATCH 05/17] [CURATOR-653] Removes unused ExecutorService --- .../curator/framework/recipes/leader/TestLeaderLatch.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 9ad980f7a1..7d0f3cc429 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -231,7 +231,6 @@ public void testCheckLeaderShipTiming() throws Exception List latches = Lists.newArrayList(); List clients = Lists.newArrayList(); final BlockingQueue states = Queues.newLinkedBlockingQueue(); - ExecutorService executorService = Executors.newFixedThreadPool(2); for ( int i = 0; i < 2; ++i ) { try { CuratorFramework client = CuratorFrameworkFactory.builder() @@ -321,7 +320,6 @@ public void notLeader() { CloseableUtils.closeQuietly(latches.get(i)); CloseableUtils.closeQuietly(clients.get(i)); } - executorService.shutdown(); } } From 01788e5a47f427fe43765fc9ca15f541a193ffff Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Mon, 10 Oct 2022 07:23:09 +0200 Subject: [PATCH 06/17] [CURATOR-653] Adds final to local variables --- .../curator/framework/recipes/leader/TestLeaderLatch.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 7d0f3cc429..14638a091a 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -227,9 +227,9 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception public void testCheckLeaderShipTiming() throws Exception { final String latchPath = "/test"; - Timing timing = new Timing(); - List latches = Lists.newArrayList(); - List clients = Lists.newArrayList(); + final Timing timing = new Timing(); + final List latches = Lists.newArrayList(); + final List clients = Lists.newArrayList(); final BlockingQueue states = Queues.newLinkedBlockingQueue(); for ( int i = 0; i < 2; ++i ) { try { From e109b2e12087badead9fff3cfd2c6209b26d1e06 Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Mon, 10 Oct 2022 08:51:59 +0200 Subject: [PATCH 07/17] [CURATOR-653] Replace Timing with Timing2 and utilize it in timeout client config --- .../curator/framework/recipes/leader/TestLeaderLatch.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 14638a091a..5f16c393ff 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -227,7 +227,7 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception public void testCheckLeaderShipTiming() throws Exception { final String latchPath = "/test"; - final Timing timing = new Timing(); + final Timing2 timing = new Timing2(); final List latches = Lists.newArrayList(); final List clients = Lists.newArrayList(); final BlockingQueue states = Queues.newLinkedBlockingQueue(); @@ -235,8 +235,8 @@ public void testCheckLeaderShipTiming() throws Exception try { CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(server.getConnectString()) - .connectionTimeoutMs(10000) - .sessionTimeoutMs(60000) + .connectionTimeoutMs(timing.connection()) + .sessionTimeoutMs(timing.session()) .retryPolicy(new RetryOneTime(1)) .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy()) .build(); From ace59cdb706735c4679ba84f3540db59d3244a0f Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Mon, 10 Oct 2022 13:14:11 +0200 Subject: [PATCH 08/17] [CURATOR-653] Introduce a single finally block for closing any open resources --- .../recipes/leader/TestLeaderLatch.java | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 5f16c393ff..ec0cc0182c 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -231,8 +231,10 @@ public void testCheckLeaderShipTiming() throws Exception final List latches = Lists.newArrayList(); final List clients = Lists.newArrayList(); final BlockingQueue states = Queues.newLinkedBlockingQueue(); - for ( int i = 0; i < 2; ++i ) { - try { + try + { + for ( int i = 0; i < 2; ++i ) + { CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(server.getConnectString()) .connectionTimeoutMs(timing.connection()) @@ -273,17 +275,12 @@ public void notLeader() { assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); } } - catch (Exception e){ - return; - } - } - timing.forWaiting().sleepABit(); - // now latch1 is leader, latch2 is not leader. latch2 listens to the ephemeral node created by latch1 - LeaderLatch latch1 = latches.get(0); - LeaderLatch latch2 = latches.get(1); - assertTrue(latch1.hasLeadership()); - assertFalse(latch2.hasLeadership()); - try { + timing.forWaiting().sleepABit(); + // now latch1 is leader, latch2 is not leader. latch2 listens to the ephemeral node created by latch1 + LeaderLatch latch1 = latches.get(0); + LeaderLatch latch2 = latches.get(1); + assertTrue(latch1.hasLeadership()); + assertFalse(latch2.hasLeadership()); latch2.debugResetWaitBeforeNodeDeleteLatch = new CountDownLatch(1); latch2.debugResetWaitLatch = new CountDownLatch(1); latch1.debugResetWaitLatch = new CountDownLatch(1); From 25404ae59013d380b455edef7087557cf3c63b9d Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Mon, 10 Oct 2022 13:15:19 +0200 Subject: [PATCH 09/17] [CURATOR-653] Adds comment about wait after initialization --- .../curator/framework/recipes/leader/TestLeaderLatch.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index ec0cc0182c..d77bc0d27a 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -275,7 +275,9 @@ public void notLeader() { assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); } } + // waiting for the non-leading LeaderLatch instance to finalize its initialization timing.forWaiting().sleepABit(); + // now latch1 is leader, latch2 is not leader. latch2 listens to the ephemeral node created by latch1 LeaderLatch latch1 = latches.get(0); LeaderLatch latch2 = latches.get(1); From 8668dfdf43593b39b41a64069baaca7601338ef3 Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Mon, 10 Oct 2022 13:16:46 +0200 Subject: [PATCH 10/17] [CURATOR-653] Adds comment about calling reset from within a separate thread --- .../apache/curator/framework/recipes/leader/TestLeaderLatch.java | 1 + 1 file changed, 1 insertion(+) diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index d77bc0d27a..02c3d68b8b 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -289,6 +289,7 @@ public void notLeader() { // force latch1 and latch2 reset latch1.reset(); + // latch2 needs to be reset in a separate thread because it will block the thread due to the beforeNodeDeletion latch ForkJoinPool.commonPool().submit(() -> { latch2.reset(); return null; From 293411e67d16fa5e89229bd6cb2c7de76130014d Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Mon, 10 Oct 2022 13:20:50 +0200 Subject: [PATCH 11/17] [CURATOR-653] Makes local variables latch1 and latch2 more descriptive --- .../recipes/leader/TestLeaderLatch.java | 56 +++++++++---------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 02c3d68b8b..bc4f76661b 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -278,42 +278,42 @@ public void notLeader() { // waiting for the non-leading LeaderLatch instance to finalize its initialization timing.forWaiting().sleepABit(); - // now latch1 is leader, latch2 is not leader. latch2 listens to the ephemeral node created by latch1 - LeaderLatch latch1 = latches.get(0); - LeaderLatch latch2 = latches.get(1); - assertTrue(latch1.hasLeadership()); - assertFalse(latch2.hasLeadership()); - latch2.debugResetWaitBeforeNodeDeleteLatch = new CountDownLatch(1); - latch2.debugResetWaitLatch = new CountDownLatch(1); - latch1.debugResetWaitLatch = new CountDownLatch(1); - - // force latch1 and latch2 reset - latch1.reset(); - // latch2 needs to be reset in a separate thread because it will block the thread due to the beforeNodeDeletion latch + // now initiallyLeadingLeaderLatch is leader, initiallyNotLeadingLeaderLatch is not leader. initiallyNotLeadingLeaderLatch listens to the ephemeral node created by initiallyLeadingLeaderLatch + LeaderLatch initiallyLeadingLeaderLatch = latches.get(0); + LeaderLatch initiallyNotLeadingLeaderLatch = latches.get(1); + assertTrue(initiallyLeadingLeaderLatch.hasLeadership()); + assertFalse(initiallyNotLeadingLeaderLatch.hasLeadership()); + initiallyNotLeadingLeaderLatch.debugResetWaitBeforeNodeDeleteLatch = new CountDownLatch(1); + initiallyNotLeadingLeaderLatch.debugResetWaitLatch = new CountDownLatch(1); + initiallyLeadingLeaderLatch.debugResetWaitLatch = new CountDownLatch(1); + + // force initiallyLeadingLeaderLatch and initiallyNotLeadingLeaderLatch reset + initiallyLeadingLeaderLatch.reset(); + // initiallyNotLeadingLeaderLatch needs to be reset in a separate thread because it will block the thread due to the beforeNodeDeletion latch ForkJoinPool.commonPool().submit(() -> { - latch2.reset(); + initiallyNotLeadingLeaderLatch.reset(); return null; }); - // latch1 set itself is not the leader state and will delete old path and create new path then wait before getChildren - // latch2 wait before delete its old path and receive nodeDeleteEvent and then getChildren find itself is leader - assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "false"); //latch1 is not leader - assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); //latch2 is leader - assertTrue(latch2.hasLeadership()); - assertFalse(latch1.hasLeadership()); - // latch1 continue and getChildren and find itself is not the leader and listen to the node created by latch2 - latch1.debugResetWaitLatch.countDown(); + // initiallyLeadingLeaderLatch set itself is not the leader state and will delete old path and create new path then wait before getChildren + // initiallyNotLeadingLeaderLatch wait before delete its old path and receive nodeDeleteEvent and then getChildren find itself is leader + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "false"); //initiallyLeadingLeaderLatch is not leader + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); //initiallyNotLeadingLeaderLatch is leader + assertTrue(initiallyNotLeadingLeaderLatch.hasLeadership()); + assertFalse(initiallyLeadingLeaderLatch.hasLeadership()); + // initiallyLeadingLeaderLatch continue and getChildren and find itself is not the leader and listen to the node created by initiallyNotLeadingLeaderLatch + initiallyLeadingLeaderLatch.debugResetWaitLatch.countDown(); timing.sleepABit(); - // latch2 continue and delete old path and create new path then wait before getChildren - latch2.debugResetWaitBeforeNodeDeleteLatch.countDown(); - // latch1 receive nodeDeleteEvent and then getChildren find itself is leader + // initiallyNotLeadingLeaderLatch continue and delete old path and create new path then wait before getChildren + initiallyNotLeadingLeaderLatch.debugResetWaitBeforeNodeDeleteLatch.countDown(); + // initiallyLeadingLeaderLatch receive nodeDeleteEvent and then getChildren find itself is leader assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); - assertTrue(latch1.hasLeadership()); - latch2.debugResetWaitLatch.countDown(); // latch2 continue and getChildren find itself is not leader + assertTrue(initiallyLeadingLeaderLatch.hasLeadership()); + initiallyNotLeadingLeaderLatch.debugResetWaitLatch.countDown(); // initiallyNotLeadingLeaderLatch continue and getChildren find itself is not leader timing.forWaiting().sleepABit(); - assertTrue(latch1.hasLeadership()); - assertFalse(latch2.hasLeadership()); + assertTrue(initiallyLeadingLeaderLatch.hasLeadership()); + assertFalse(initiallyNotLeadingLeaderLatch.hasLeadership()); } finally { for(int i = 0; i < clients.size(); ++i) { From 70fbca5c36964b5f5aeaaafd1272526200e79628 Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Mon, 10 Oct 2022 13:24:20 +0200 Subject: [PATCH 12/17] [CURATOR-653] Makes test method name more descriptive --- .../curator/framework/recipes/leader/TestLeaderLatch.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index bc4f76661b..287c18214f 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -224,7 +224,7 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception } @Test - public void testCheckLeaderShipTiming() throws Exception + public void testResettingOfLeadershipAfterConcurrentLeadershipChange() throws Exception { final String latchPath = "/test"; final Timing2 timing = new Timing2(); From 361ef7678b160243c392f5aed8c7c56c3858628d Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Mon, 10 Oct 2022 13:25:02 +0200 Subject: [PATCH 13/17] [CURATOR-653] Use lambda expression instead of more verbose anonymous class declaration --- .../framework/recipes/leader/TestLeaderLatch.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 287c18214f..89bd540514 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -242,14 +242,10 @@ public void testResettingOfLeadershipAfterConcurrentLeadershipChange() throws Ex .retryPolicy(new RetryOneTime(1)) .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy()) .build(); - ConnectionStateListener stateListener = new ConnectionStateListener() - { - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) + ConnectionStateListener stateListener = (client1, newState) -> { + if ( newState == ConnectionState.CONNECTED ) { - if (newState == ConnectionState.CONNECTED) { - states.add(newState.name()); - } + states.add(newState.name()); } }; client.getConnectionStateListenable().addListener(stateListener); From 5a62adb80e5f4b0e5625c4064d332bfe2b1479d5 Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Mon, 10 Oct 2022 14:20:46 +0200 Subject: [PATCH 14/17] [hotfix] Removes unused imports --- .../curator/framework/recipes/leader/TestLeaderLatch.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 89bd540514..d8deb3fd0f 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -30,7 +30,6 @@ import com.google.common.collect.Queues; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.time.Duration; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ForkJoinPool; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -50,7 +49,6 @@ import org.apache.curator.test.compatibility.Timing2; import org.apache.curator.utils.CloseableUtils; import org.awaitility.Awaitility; -import org.awaitility.core.ThrowingRunnable; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; From adaee91289014f06b60e512b8377de7f9fe4ebd6 Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Mon, 10 Oct 2022 14:53:48 +0200 Subject: [PATCH 15/17] [CURATOR-653] Refactors test to use proper event processing based on ID --- .../recipes/leader/TestLeaderLatch.java | 199 +++++++++++------- 1 file changed, 125 insertions(+), 74 deletions(-) diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index d8deb3fd0f..37deae7f59 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -29,7 +30,9 @@ import com.google.common.collect.Lists; import com.google.common.collect.Queues; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.Closeable; import java.time.Duration; +import java.util.ArrayList; import java.util.concurrent.ForkJoinPool; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -226,94 +229,142 @@ public void testResettingOfLeadershipAfterConcurrentLeadershipChange() throws Ex { final String latchPath = "/test"; final Timing2 timing = new Timing2(); - final List latches = Lists.newArrayList(); - final List clients = Lists.newArrayList(); - final BlockingQueue states = Queues.newLinkedBlockingQueue(); + final BlockingQueue states = Queues.newLinkedBlockingQueue(); + + final List closeableResources = new ArrayList<>(); try { - for ( int i = 0; i < 2; ++i ) - { - CuratorFramework client = CuratorFrameworkFactory.builder() - .connectString(server.getConnectString()) - .connectionTimeoutMs(timing.connection()) - .sessionTimeoutMs(timing.session()) - .retryPolicy(new RetryOneTime(1)) - .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy()) - .build(); - ConnectionStateListener stateListener = (client1, newState) -> { - if ( newState == ConnectionState.CONNECTED ) - { - states.add(newState.name()); - } - }; - client.getConnectionStateListenable().addListener(stateListener); - client.start(); - clients.add(client); - LeaderLatch latch = new LeaderLatch(client, latchPath, String.valueOf(i)); - LeaderLatchListener listener = new LeaderLatchListener() { - @Override - public void isLeader() { - states.add("true"); - } + final String connectionLabel0 = "connection #0"; + final CuratorFramework client0 = createAndStartClient(server.getConnectString(), timing, connectionLabel0, states); + closeableResources.add(client0); + final LeaderLatch latch0 = createAndStartLeaderLatch(client0, latchPath, connectionLabel0, states); + closeableResources.add(latch0); - @Override - public void notLeader() { - states.add("false"); - } - }; - latch.addListener(listener); - latch.start(); - latches.add(latch); - assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name()); - if (i == 0) { - assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); - } - } - // waiting for the non-leading LeaderLatch instance to finalize its initialization + pollAndAssertNextEvent(states, timing.forWaiting().milliseconds(), connectionLabel0, TestEventType.GAINED_CONNECTION); + pollAndAssertNextEvent(states, timing.forWaiting().milliseconds(), connectionLabel0, TestEventType.GAINED_LEADERSHIP); + + final String connectionLabel1 = "connection #1"; + final CuratorFramework client1 = createAndStartClient(server.getConnectString(), timing, connectionLabel1, states); + closeableResources.add(client1); + final LeaderLatch latch1 = createAndStartLeaderLatch(client1, latchPath, connectionLabel1, states); + closeableResources.add(latch1); + + pollAndAssertNextEvent(states, timing.forWaiting().milliseconds(), connectionLabel1, TestEventType.GAINED_CONNECTION); + + // wait for the non-leading LeaderLatch (i.e. latch1) instance to be done with its creation + // this call is time-consuming but necessary because we don't have a handle to detect the end of the reset call timing.forWaiting().sleepABit(); - // now initiallyLeadingLeaderLatch is leader, initiallyNotLeadingLeaderLatch is not leader. initiallyNotLeadingLeaderLatch listens to the ephemeral node created by initiallyLeadingLeaderLatch - LeaderLatch initiallyLeadingLeaderLatch = latches.get(0); - LeaderLatch initiallyNotLeadingLeaderLatch = latches.get(1); - assertTrue(initiallyLeadingLeaderLatch.hasLeadership()); - assertFalse(initiallyNotLeadingLeaderLatch.hasLeadership()); - initiallyNotLeadingLeaderLatch.debugResetWaitBeforeNodeDeleteLatch = new CountDownLatch(1); - initiallyNotLeadingLeaderLatch.debugResetWaitLatch = new CountDownLatch(1); - initiallyLeadingLeaderLatch.debugResetWaitLatch = new CountDownLatch(1); - - // force initiallyLeadingLeaderLatch and initiallyNotLeadingLeaderLatch reset - initiallyLeadingLeaderLatch.reset(); - // initiallyNotLeadingLeaderLatch needs to be reset in a separate thread because it will block the thread due to the beforeNodeDeletion latch + assertTrue(latch0.hasLeadership()); + assertFalse(latch1.hasLeadership()); + + latch1.debugResetWaitBeforeNodeDeleteLatch = new CountDownLatch(1); + latch1.debugResetWaitLatch = new CountDownLatch(1); + latch0.debugResetWaitLatch = new CountDownLatch(1); + + // force latch0 and latch1 reset to trigger the actual test + latch0.reset(); + // latch1 needs to be called within a separate thread since it's going to be blocked by the CountDownLatch outside of an async call ForkJoinPool.commonPool().submit(() -> { - initiallyNotLeadingLeaderLatch.reset(); + latch1.reset(); return null; }); - // initiallyLeadingLeaderLatch set itself is not the leader state and will delete old path and create new path then wait before getChildren - // initiallyNotLeadingLeaderLatch wait before delete its old path and receive nodeDeleteEvent and then getChildren find itself is leader - assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "false"); //initiallyLeadingLeaderLatch is not leader - assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); //initiallyNotLeadingLeaderLatch is leader - assertTrue(initiallyNotLeadingLeaderLatch.hasLeadership()); - assertFalse(initiallyLeadingLeaderLatch.hasLeadership()); - // initiallyLeadingLeaderLatch continue and getChildren and find itself is not the leader and listen to the node created by initiallyNotLeadingLeaderLatch - initiallyLeadingLeaderLatch.debugResetWaitLatch.countDown(); + // latch0.reset() will result in it losing its leadership, deleting its old child node and creating a new child node before being blocked by its debugResetWaitLatch + pollAndAssertNextEvent(states, timing.forWaiting().milliseconds(), connectionLabel0, TestEventType.LOST_LEADERSHIP); + // latch1.reset() is blocked but latch1 will gain leadership due its node watching latch0's node to be deleted + pollAndAssertNextEvent(states, timing.forWaiting().milliseconds(), connectionLabel1, TestEventType.GAINED_LEADERSHIP); + assertFalse(latch0.hasLeadership()); + assertTrue(latch1.hasLeadership()); + + // latch0.reset() continues with the getChildren call, finds itself not being the leader and starts listening to the node created by latch1 + latch0.debugResetWaitLatch.countDown(); timing.sleepABit(); - // initiallyNotLeadingLeaderLatch continue and delete old path and create new path then wait before getChildren - initiallyNotLeadingLeaderLatch.debugResetWaitBeforeNodeDeleteLatch.countDown(); - // initiallyLeadingLeaderLatch receive nodeDeleteEvent and then getChildren find itself is leader - assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); - assertTrue(initiallyLeadingLeaderLatch.hasLeadership()); - initiallyNotLeadingLeaderLatch.debugResetWaitLatch.countDown(); // initiallyNotLeadingLeaderLatch continue and getChildren find itself is not leader + + // latch1.reset() continues, deletes its old child node and creates a new child node before being blocked by its debugResetWaitLatch + latch1.debugResetWaitBeforeNodeDeleteLatch.countDown(); + + // latch0 receives NodeDeleteEvent and then finds itself to be the leader + pollAndAssertNextEvent(states, timing.forWaiting().milliseconds(), connectionLabel0, TestEventType.GAINED_LEADERSHIP); + assertTrue(latch0.hasLeadership()); + + // latch1.reset() continues and finds itself not being the leader + latch1.debugResetWaitLatch.countDown(); + // this call is time-consuming but necessary because we don't have a handle to detect the end of the reset call timing.forWaiting().sleepABit(); - assertTrue(initiallyLeadingLeaderLatch.hasLeadership()); - assertFalse(initiallyNotLeadingLeaderLatch.hasLeadership()); + assertTrue(latch0.hasLeadership()); + assertFalse(latch1.hasLeadership()); + } finally + { + // reverse is necessary for closing the LeaderLatch instances before closing the corresponding client + Collections.reverse(closeableResources); + closeableResources.forEach(CloseableUtils::closeQuietly); } - finally { - for(int i = 0; i < clients.size(); ++i) { - CloseableUtils.closeQuietly(latches.get(i)); - CloseableUtils.closeQuietly(clients.get(i)); + } + + private static void pollAndAssertNextEvent(BlockingQueue eventQueue, long timeoutInMillis, String expectedConnectionLabel, TestEventType expectedEventType) throws InterruptedException + { + TestEvent nextEvent = eventQueue.poll(timeoutInMillis, TimeUnit.MILLISECONDS); + assertNotNull(nextEvent); + assertEquals(expectedConnectionLabel, nextEvent.id); + assertEquals(expectedEventType, nextEvent.eventType); + } + + private static CuratorFramework createAndStartClient(String zkConnectString, Timing2 timing, String connectionLabel, Collection eventQueue) { + final CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(zkConnectString) + .connectionTimeoutMs(timing.connection()) + .sessionTimeoutMs(timing.session()) + .retryPolicy(new RetryOneTime(1)) + .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy()) + .build(); + + client.getConnectionStateListenable().addListener((client1, newState) -> { + if ( newState == ConnectionState.CONNECTED ) + { + eventQueue.add(new TestEvent(connectionLabel, TestEventType.GAINED_CONNECTION)); + } + }); + client.start(); + + return client; + } + + private static LeaderLatch createAndStartLeaderLatch(CuratorFramework zkClient, String leaderLatchPath, String connectionLabel, Collection eventQueue) throws Exception + { + final LeaderLatch latch = new LeaderLatch(zkClient, leaderLatchPath, connectionLabel); + latch.addListener(new LeaderLatchListener() { + @Override + public void isLeader() { + eventQueue.add(new TestEvent(connectionLabel, TestEventType.GAINED_LEADERSHIP)); } + + @Override + public void notLeader() { + eventQueue.add(new TestEvent(connectionLabel, TestEventType.LOST_LEADERSHIP)); + } + }); + latch.start(); + + return latch; + } + + private enum TestEventType + { + GAINED_LEADERSHIP, + LOST_LEADERSHIP, + GAINED_CONNECTION; + } + + private static class TestEvent { + private final String id; + private final TestEventType eventType; + + public TestEvent(String id, TestEventType eventType) + { + this.id = id; + this.eventType = eventType; } } From f715c597720fee7a5bdccd214c5f17b76445f33d Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Wed, 12 Oct 2022 10:16:51 +0200 Subject: [PATCH 16/17] Empty commit to trigger CI once more From a65843196c389c3c91653117a9f2f650b07d6b71 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 12 Oct 2022 17:08:14 +0800 Subject: [PATCH 17/17] naming Signed-off-by: tison --- .../recipes/leader/TestLeaderLatch.java | 63 ++++++++++--------- 1 file changed, 33 insertions(+), 30 deletions(-) diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 37deae7f59..69deb20750 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -21,7 +21,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -33,6 +32,7 @@ import java.io.Closeable; import java.time.Duration; import java.util.ArrayList; +import java.util.Objects; import java.util.concurrent.ForkJoinPool; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -229,27 +229,27 @@ public void testResettingOfLeadershipAfterConcurrentLeadershipChange() throws Ex { final String latchPath = "/test"; final Timing2 timing = new Timing2(); - final BlockingQueue states = Queues.newLinkedBlockingQueue(); + final BlockingQueue events = Queues.newLinkedBlockingQueue(); final List closeableResources = new ArrayList<>(); try { - final String connectionLabel0 = "connection #0"; - final CuratorFramework client0 = createAndStartClient(server.getConnectString(), timing, connectionLabel0, states); + final String id0 = "id0"; + final CuratorFramework client0 = createAndStartClient(server.getConnectString(), timing, id0, events); closeableResources.add(client0); - final LeaderLatch latch0 = createAndStartLeaderLatch(client0, latchPath, connectionLabel0, states); + final LeaderLatch latch0 = createAndStartLeaderLatch(client0, latchPath, id0, events); closeableResources.add(latch0); - pollAndAssertNextEvent(states, timing.forWaiting().milliseconds(), connectionLabel0, TestEventType.GAINED_CONNECTION); - pollAndAssertNextEvent(states, timing.forWaiting().milliseconds(), connectionLabel0, TestEventType.GAINED_LEADERSHIP); + assertEquals(new TestEvent(id0, TestEventType.GAINED_CONNECTION), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); + assertEquals(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); - final String connectionLabel1 = "connection #1"; - final CuratorFramework client1 = createAndStartClient(server.getConnectString(), timing, connectionLabel1, states); + final String id1 = "id1"; + final CuratorFramework client1 = createAndStartClient(server.getConnectString(), timing, id1, events); closeableResources.add(client1); - final LeaderLatch latch1 = createAndStartLeaderLatch(client1, latchPath, connectionLabel1, states); + final LeaderLatch latch1 = createAndStartLeaderLatch(client1, latchPath, id1, events); closeableResources.add(latch1); - pollAndAssertNextEvent(states, timing.forWaiting().milliseconds(), connectionLabel1, TestEventType.GAINED_CONNECTION); + assertEquals(new TestEvent(id1, TestEventType.GAINED_CONNECTION), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); // wait for the non-leading LeaderLatch (i.e. latch1) instance to be done with its creation // this call is time-consuming but necessary because we don't have a handle to detect the end of the reset call @@ -264,16 +264,17 @@ public void testResettingOfLeadershipAfterConcurrentLeadershipChange() throws Ex // force latch0 and latch1 reset to trigger the actual test latch0.reset(); - // latch1 needs to be called within a separate thread since it's going to be blocked by the CountDownLatch outside of an async call + // latch1 needs to be called within a separate thread since it's going to be blocked by the CountDownLatch outside an async call ForkJoinPool.commonPool().submit(() -> { latch1.reset(); return null; }); // latch0.reset() will result in it losing its leadership, deleting its old child node and creating a new child node before being blocked by its debugResetWaitLatch - pollAndAssertNextEvent(states, timing.forWaiting().milliseconds(), connectionLabel0, TestEventType.LOST_LEADERSHIP); + assertEquals(new TestEvent(id0, TestEventType.LOST_LEADERSHIP), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); // latch1.reset() is blocked but latch1 will gain leadership due its node watching latch0's node to be deleted - pollAndAssertNextEvent(states, timing.forWaiting().milliseconds(), connectionLabel1, TestEventType.GAINED_LEADERSHIP); + assertEquals(new TestEvent(id1, TestEventType.GAINED_LEADERSHIP), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); + assertFalse(latch0.hasLeadership()); assertTrue(latch1.hasLeadership()); @@ -285,7 +286,7 @@ public void testResettingOfLeadershipAfterConcurrentLeadershipChange() throws Ex latch1.debugResetWaitBeforeNodeDeleteLatch.countDown(); // latch0 receives NodeDeleteEvent and then finds itself to be the leader - pollAndAssertNextEvent(states, timing.forWaiting().milliseconds(), connectionLabel0, TestEventType.GAINED_LEADERSHIP); + assertEquals(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); assertTrue(latch0.hasLeadership()); // latch1.reset() continues and finds itself not being the leader @@ -295,7 +296,8 @@ public void testResettingOfLeadershipAfterConcurrentLeadershipChange() throws Ex assertTrue(latch0.hasLeadership()); assertFalse(latch1.hasLeadership()); - } finally + } + finally { // reverse is necessary for closing the LeaderLatch instances before closing the corresponding client Collections.reverse(closeableResources); @@ -303,15 +305,7 @@ public void testResettingOfLeadershipAfterConcurrentLeadershipChange() throws Ex } } - private static void pollAndAssertNextEvent(BlockingQueue eventQueue, long timeoutInMillis, String expectedConnectionLabel, TestEventType expectedEventType) throws InterruptedException - { - TestEvent nextEvent = eventQueue.poll(timeoutInMillis, TimeUnit.MILLISECONDS); - assertNotNull(nextEvent); - assertEquals(expectedConnectionLabel, nextEvent.id); - assertEquals(expectedEventType, nextEvent.eventType); - } - - private static CuratorFramework createAndStartClient(String zkConnectString, Timing2 timing, String connectionLabel, Collection eventQueue) { + private static CuratorFramework createAndStartClient(String zkConnectString, Timing2 timing, String id, Collection events) { final CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(zkConnectString) .connectionTimeoutMs(timing.connection()) @@ -323,26 +317,27 @@ private static CuratorFramework createAndStartClient(String zkConnectString, Tim client.getConnectionStateListenable().addListener((client1, newState) -> { if ( newState == ConnectionState.CONNECTED ) { - eventQueue.add(new TestEvent(connectionLabel, TestEventType.GAINED_CONNECTION)); + events.add(new TestEvent(id, TestEventType.GAINED_CONNECTION)); } }); + client.start(); return client; } - private static LeaderLatch createAndStartLeaderLatch(CuratorFramework zkClient, String leaderLatchPath, String connectionLabel, Collection eventQueue) throws Exception + private static LeaderLatch createAndStartLeaderLatch(CuratorFramework client, String latchPath, String id, Collection events) throws Exception { - final LeaderLatch latch = new LeaderLatch(zkClient, leaderLatchPath, connectionLabel); + final LeaderLatch latch = new LeaderLatch(client, latchPath, id); latch.addListener(new LeaderLatchListener() { @Override public void isLeader() { - eventQueue.add(new TestEvent(connectionLabel, TestEventType.GAINED_LEADERSHIP)); + events.add(new TestEvent(latch.getId(), TestEventType.GAINED_LEADERSHIP)); } @Override public void notLeader() { - eventQueue.add(new TestEvent(connectionLabel, TestEventType.LOST_LEADERSHIP)); + events.add(new TestEvent(latch.getId(), TestEventType.LOST_LEADERSHIP)); } }); latch.start(); @@ -366,6 +361,14 @@ public TestEvent(String id, TestEventType eventType) this.id = id; this.eventType = eventType; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TestEvent testEvent = (TestEvent) o; + return Objects.equals(id, testEvent.id) && eventType == testEvent.eventType; + } } @Test