-
Notifications
You must be signed in to change notification settings - Fork 1.2k
CURATOR-653: fix potential double leader for LeaderLatch #398
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7272b85
cf3d66f
106f705
492506c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -30,6 +30,8 @@ | |||||||||
| import com.google.common.collect.Queues; | ||||||||||
| import com.google.common.util.concurrent.ThreadFactoryBuilder; | ||||||||||
| import java.time.Duration; | ||||||||||
| import java.util.concurrent.CompletableFuture; | ||||||||||
| import java.util.concurrent.ForkJoinPool; | ||||||||||
| import org.apache.curator.framework.CuratorFramework; | ||||||||||
| import org.apache.curator.framework.CuratorFrameworkFactory; | ||||||||||
| import org.apache.curator.framework.imps.TestCleanState; | ||||||||||
|
|
@@ -48,6 +50,7 @@ | |||||||||
| import org.apache.curator.test.compatibility.Timing2; | ||||||||||
| import org.apache.curator.utils.CloseableUtils; | ||||||||||
| import org.awaitility.Awaitility; | ||||||||||
| import org.awaitility.core.ThrowingRunnable; | ||||||||||
| import org.junit.jupiter.api.Tag; | ||||||||||
| import org.junit.jupiter.api.Test; | ||||||||||
|
|
||||||||||
|
|
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception | |||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Test | ||||||||||
| public void testCheckLeaderShipTiming() throws Exception | ||||||||||
| { | ||||||||||
| final String latchPath = "/test"; | ||||||||||
| Timing timing = new Timing(); | ||||||||||
| List<LeaderLatch> latches = Lists.newArrayList(); | ||||||||||
| List<CuratorFramework> clients = Lists.newArrayList(); | ||||||||||
| final BlockingQueue<String> states = Queues.newLinkedBlockingQueue(); | ||||||||||
| ExecutorService executorService = Executors.newFixedThreadPool(2); | ||||||||||
| for ( int i = 0; i < 2; ++i ) { | ||||||||||
| try { | ||||||||||
| CuratorFramework client = CuratorFrameworkFactory.builder() | ||||||||||
| .connectString(server.getConnectString()) | ||||||||||
| .connectionTimeoutMs(10000) | ||||||||||
| .sessionTimeoutMs(60000) | ||||||||||
| .retryPolicy(new RetryOneTime(1)) | ||||||||||
| .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy()) | ||||||||||
| .build(); | ||||||||||
| ConnectionStateListener stateListener = new ConnectionStateListener() | ||||||||||
| { | ||||||||||
| @Override | ||||||||||
| public void stateChanged(CuratorFramework client, ConnectionState newState) | ||||||||||
| { | ||||||||||
| if (newState == ConnectionState.CONNECTED) { | ||||||||||
| states.add(newState.name()); | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Adding the |
||||||||||
| } | ||||||||||
| } | ||||||||||
| }; | ||||||||||
| client.getConnectionStateListenable().addListener(stateListener); | ||||||||||
| client.start(); | ||||||||||
| clients.add(client); | ||||||||||
| LeaderLatch latch = new LeaderLatch(client, latchPath, String.valueOf(i)); | ||||||||||
| LeaderLatchListener listener = new LeaderLatchListener() { | ||||||||||
| @Override | ||||||||||
| public void isLeader() { | ||||||||||
| states.add("true"); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Override | ||||||||||
| public void notLeader() { | ||||||||||
| states.add("false"); | ||||||||||
| } | ||||||||||
| }; | ||||||||||
|
Comment on lines
+258
to
+268
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using the |
||||||||||
| latch.addListener(listener); | ||||||||||
| latch.start(); | ||||||||||
| latches.add(latch); | ||||||||||
| assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name()); | ||||||||||
| if (i == 0) { | ||||||||||
| assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
nit: maybe adding a bit more context to this polling here to describe the test case |
||||||||||
| } | ||||||||||
| } | ||||||||||
| catch (Exception e){ | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we hiding thrown exceptions here? Shouldn't we expose it as part of the test run if something went wrong? 🤔 The test would succeed if the Exception is thrown in this block and caught here. |
||||||||||
| return; | ||||||||||
| } | ||||||||||
| } | ||||||||||
| timing.forWaiting().sleepABit(); | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the purpose of waiting here? 🤔 |
||||||||||
| // now latch1 is leader, latch2 is not leader. latch2 listens to the ephemeral node created by latch1 | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Moving comments into assert messages improves the test output and still works as some kind of comment. This comment could be added to the |
||||||||||
| LeaderLatch latch1 = latches.get(0); | ||||||||||
| LeaderLatch latch2 = latches.get(1); | ||||||||||
|
Comment on lines
+283
to
+284
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
nit: maybe, making the variable more descriptive to avoid confusion. Especially because we're switching the order here in comparison to what is described in the PR description and the corresponding ticket. |
||||||||||
| assertTrue(latch1.hasLeadership()); | ||||||||||
| assertFalse(latch2.hasLeadership()); | ||||||||||
| try { | ||||||||||
| latch2.debugRestWaitBeforeNodeDelete = new CountDownLatch(1); | ||||||||||
| latch2.debugResetWaitLatch = new CountDownLatch(1); | ||||||||||
| latch1.debugResetWaitLatch = new CountDownLatch(1); | ||||||||||
|
|
||||||||||
| // force latch1 and latch2 reset | ||||||||||
| latch1.reset(); | ||||||||||
| ForkJoinPool.commonPool().submit(() -> { | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we add a comment here on why we're calling |
||||||||||
| latch2.reset(); | ||||||||||
| return null; | ||||||||||
| }); | ||||||||||
|
|
||||||||||
| // latch1 set itself is not the leader state and will delete old path and create new path then wait before getChildren | ||||||||||
| // latch2 wait before delete its old path and receive nodeDeleteEvent and then getChildren find itself is leader | ||||||||||
| assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "false"); //latch1 is not leader | ||||||||||
| assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); //latch2 is leader | ||||||||||
| assertTrue(latch2.hasLeadership()); | ||||||||||
| assertFalse(latch1.hasLeadership()); | ||||||||||
| // latch1 continue and getChildren and find itself is not the leader and listen to the node created by latch2 | ||||||||||
| latch1.debugResetWaitLatch.countDown(); | ||||||||||
| timing.sleepABit(); | ||||||||||
| // latch2 continue and delete old path and create new path then wait before getChildren | ||||||||||
| latch2.debugRestWaitBeforeNodeDelete.countDown(); | ||||||||||
| // latch1 receive nodeDeleteEvent and then getChildren find itself is leader | ||||||||||
| assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); | ||||||||||
| assertTrue(latch1.hasLeadership()); | ||||||||||
| latch2.debugResetWaitLatch.countDown(); // latch2 continue and getChildren find itself is not leader | ||||||||||
| timing.forWaiting().sleepABit(); | ||||||||||
|
|
||||||||||
| assertTrue(latch1.hasLeadership()); | ||||||||||
| assertFalse(latch2.hasLeadership()); | ||||||||||
| } | ||||||||||
| finally { | ||||||||||
| for(int i = 0; i < clients.size(); ++i) { | ||||||||||
| CloseableUtils.closeQuietly(latches.get(i)); | ||||||||||
| CloseableUtils.closeQuietly(clients.get(i)); | ||||||||||
| } | ||||||||||
| executorService.shutdown(); | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Test | ||||||||||
| public void testLeadershipElectionWhenNodeDisappearsAfterChildrenAreRetrieved() throws Exception | ||||||||||
| { | ||||||||||
|
|
||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a typo in the name. Additionally, we might want to add
Latchat the end to reflect the purpose of this member analogously to the other latches.