From d45e257d75741597062a4ecff9121aba23496169 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 12 Jul 2022 22:46:35 +0800 Subject: [PATCH 01/20] call getChildren instead of reset when preceding node not found in callback Signed-off-by: tison --- .../curator/framework/recipes/leader/LeaderLatch.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index 5d1c249b08..c05fac1a44 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -604,7 +604,7 @@ else if ( ourIndex == 0 ) @Override public void process(WatchedEvent event) { - if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) ) + if ( state.get() == State.STARTED && event.getType() == Event.EventType.NodeDeleted ) { try { @@ -626,8 +626,8 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex { if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) { - // previous node is gone - reset - reset(); + // previous node is gone - retry getChildren + getChildren(); } } }; From a2060c1dac5d46fd9d14e07fb9ea3f3435c4922e Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 12 Jul 2022 22:53:06 +0800 Subject: [PATCH 02/20] call getChildren instead of reset when recovered from connection loss or session expire Signed-off-by: tison --- .../apache/curator/framework/recipes/leader/LeaderLatch.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index c05fac1a44..be78a72d24 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -667,9 +667,9 @@ protected void handleStateChange(ConnectionState newState) { try { - if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) || !hasLeadership.get() ) + if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) ) { - reset(); + getChildren(); } } catch ( Exception e ) From 0d4a7b75c34fc06ea36ae0cd4a0fb48d263d96cb Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 12 Jul 2022 23:55:21 +0800 Subject: [PATCH 03/20] synchronize checkLeadership Signed-off-by: tison --- .../apache/curator/framework/recipes/leader/LeaderLatch.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index be78a72d24..a069e45cdc 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -576,7 +576,7 @@ private synchronized void internalStart() @VisibleForTesting volatile CountDownLatch debugCheckLeaderShipLatch = null; - private void checkLeadership(List children) throws Exception + private synchronized void checkLeadership(List children) throws Exception { if ( debugCheckLeaderShipLatch != null ) { From 8a76593d9a06c9594cadb836de7e46d8e2ebbab2 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 13 Jul 2022 09:32:33 +0800 Subject: [PATCH 04/20] reformat Signed-off-by: tison --- .../framework/recipes/leader/LeaderLatch.java | 377 ++++++------------ 1 file changed, 124 insertions(+), 253 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index a069e45cdc..bc5f65d20e 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -21,10 +21,20 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import java.io.Closeable; +import java.io.EOFException; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.WatcherRemoveCuratorFramework; import org.apache.curator.framework.api.BackgroundCallback; -import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.listen.StandardListenerManager; import org.apache.curator.framework.recipes.AfterConnectionEstablished; import org.apache.curator.framework.recipes.locks.LockInternals; @@ -32,26 +42,14 @@ import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.PathUtils; import org.apache.curator.utils.ThreadUtils; import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; -import java.io.EOFException; -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.curator.utils.PathUtils; /** *

@@ -61,42 +59,24 @@ * group will randomly be chosen *

*/ -public class LeaderLatch implements Closeable -{ - private final Logger log = LoggerFactory.getLogger(getClass()); +public class LeaderLatch implements Closeable { + private static final String LOCK_NAME = "latch-"; + private static final LockInternalsSorter sorter = StandardLockInternalsDriver::standardFixForSorting; + private static final Logger log = LoggerFactory.getLogger(LeaderLatch.class); + private final WatcherRemoveCuratorFramework client; private final String latchPath; private final String id; - private final AtomicReference state = new AtomicReference(State.LATENT); + private final AtomicReference state = new AtomicReference<>(State.LATENT); private final AtomicBoolean hasLeadership = new AtomicBoolean(false); - private final AtomicReference ourPath = new AtomicReference(); - private final AtomicReference lastPathIsLeader = new AtomicReference(); + private final AtomicReference ourPath = new AtomicReference<>(); + private final AtomicReference lastPathIsLeader = new AtomicReference<>(); private final StandardListenerManager listeners = StandardListenerManager.standard(); private final CloseMode closeMode; - private final AtomicReference> startTask = new AtomicReference>(); - - private final ConnectionStateListener listener = new ConnectionStateListener() - { - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) - { - handleStateChange(newState); - } - }; - - private static final String LOCK_NAME = "latch-"; - - private static final LockInternalsSorter sorter = new LockInternalsSorter() - { - @Override - public String fixForSorting(String str, String lockName) - { - return StandardLockInternalsDriver.standardFixForSorting(str, lockName); - } - }; + private final AtomicReference> startTask = new AtomicReference<>(); + private final ConnectionStateListener listener = (client, newState) -> handleStateChange(newState); - public enum State - { + public enum State { LATENT, STARTED, CLOSED @@ -105,8 +85,7 @@ public enum State /** * How to handle listeners when the latch is closed */ - public enum CloseMode - { + public enum CloseMode { /** * When the latch is closed, listeners will *not* be notified (default behavior) */ @@ -122,8 +101,7 @@ public enum CloseMode * @param client the client * @param latchPath the path for this leadership group */ - public LeaderLatch(CuratorFramework client, String latchPath) - { + public LeaderLatch(CuratorFramework client, String latchPath) { this(client, latchPath, "", CloseMode.SILENT); } @@ -132,8 +110,7 @@ public LeaderLatch(CuratorFramework client, String latchPath) * @param latchPath the path for this leadership group * @param id participant ID */ - public LeaderLatch(CuratorFramework client, String latchPath, String id) - { + public LeaderLatch(CuratorFramework client, String latchPath, String id) { this(client, latchPath, id, CloseMode.SILENT); } @@ -143,8 +120,7 @@ public LeaderLatch(CuratorFramework client, String latchPath, String id) * @param id participant ID * @param closeMode behaviour of listener on explicit close. */ - public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode) - { + public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode) { this.client = Preconditions.checkNotNull(client, "client cannot be null").newWatcherRemoveCuratorFramework(); this.latchPath = PathUtils.validatePath(latchPath); this.id = Preconditions.checkNotNull(id, "id cannot be null"); @@ -156,23 +132,14 @@ public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMo * * @throws Exception errors */ - public void start() throws Exception - { + public void start() throws Exception { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); - startTask.set(AfterConnectionEstablished.execute(client, new Runnable() - { - @Override - public void run() - { - try - { - internalStart(); - } - finally - { - startTask.set(null); - } + startTask.set(AfterConnectionEstablished.execute(client, () -> { + try { + internalStart(); + } finally { + startTask.set(null); } })); } @@ -185,8 +152,7 @@ public void run() * @throws IOException errors */ @Override - public void close() throws IOException - { + public void close() throws IOException { close(closeMode); } @@ -198,52 +164,41 @@ public void close() throws IOException * @param closeMode allows the default close mode to be overridden at the time the latch is closed. * @throws IOException errors */ - public synchronized void close(CloseMode closeMode) throws IOException - { + public synchronized void close(CloseMode closeMode) throws IOException { Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started"); Preconditions.checkNotNull(closeMode, "closeMode cannot be null"); cancelStartTask(); - try - { + try { setNode(null); client.removeWatchers(); - } - catch ( Exception e ) - { + } catch (Exception e) { ThreadUtils.checkInterrupted(e); throw new IOException(e); - } - finally - { + } finally { client.getConnectionStateListenable().removeListener(listener); - switch ( closeMode ) - { - case NOTIFY_LEADER: - { - setLeadership(false); - listeners.clear(); - break; - } + switch (closeMode) { + case NOTIFY_LEADER: { + setLeadership(false); + listeners.clear(); + break; + } - default: - { - listeners.clear(); - setLeadership(false); - break; - } + case SILENT: { + listeners.clear(); + setLeadership(false); + break; + } } } } @VisibleForTesting - protected boolean cancelStartTask() - { + protected boolean cancelStartTask() { Future localStartTask = startTask.getAndSet(null); - if ( localStartTask != null ) - { + if (localStartTask != null) { localStartTask.cancel(true); return true; } @@ -262,8 +217,7 @@ protected boolean cancelStartTask() * * @param listener the listener to attach */ - public void addListener(LeaderLatchListener listener) - { + public void addListener(LeaderLatchListener listener) { listeners.addListener(listener); } @@ -280,8 +234,7 @@ public void addListener(LeaderLatchListener listener) * @param listener the listener to attach * @param executor An executor to run the methods for the listener on. */ - public void addListener(LeaderLatchListener listener, Executor executor) - { + public void addListener(LeaderLatchListener listener, Executor executor) { listeners.addListener(listener, executor); } @@ -290,8 +243,7 @@ public void addListener(LeaderLatchListener listener, Executor executor) * * @param listener the listener to remove */ - public void removeListener(LeaderLatchListener listener) - { + public void removeListener(LeaderLatchListener listener) { listeners.removeListener(listener); } @@ -322,17 +274,13 @@ public void removeListener(LeaderLatchListener listener) * @throws EOFException if the instance is {@linkplain #close() closed} * while waiting */ - public void await() throws InterruptedException, EOFException - { - synchronized(this) - { - while ( (state.get() == State.STARTED) && !hasLeadership.get() ) - { + public void await() throws InterruptedException, EOFException { + synchronized (this) { + while ((state.get() == State.STARTED) && !hasLeadership.get()) { wait(); } } - if ( state.get() != State.STARTED ) - { + if (state.get() != State.STARTED) { throw new EOFException(); } } @@ -375,26 +323,20 @@ public void await() throws InterruptedException, EOFException * @throws InterruptedException if the current thread is interrupted * while waiting */ - public boolean await(long timeout, TimeUnit unit) throws InterruptedException - { + public boolean await(long timeout, TimeUnit unit) throws InterruptedException { long waitNanos = TimeUnit.NANOSECONDS.convert(timeout, unit); - synchronized(this) - { - while ( true ) - { - if ( state.get() != State.STARTED ) - { + synchronized (this) { + while (true) { + if (state.get() != State.STARTED) { return false; } - if ( hasLeadership() ) - { + if (hasLeadership()) { return true; } - if ( waitNanos <= 0 ) - { + if (waitNanos <= 0) { return false; } @@ -411,8 +353,7 @@ public boolean await(long timeout, TimeUnit unit) throws InterruptedException * * @return participant Id */ - public String getId() - { + public String getId() { return id; } @@ -424,8 +365,7 @@ public String getId() * * @return the state of the current instance */ - public State getState() - { + public State getState() { return state.get(); } @@ -443,8 +383,7 @@ public State getState() * @return participants * @throws Exception ZK errors, interruptions, etc. */ - public Collection getParticipants() throws Exception - { + public Collection getParticipants() throws Exception { Collection participantNodes = LockInternals.getParticipantNodes(client, latchPath, LOCK_NAME, sorter); return LeaderSelector.getParticipants(client, participantNodes); } @@ -464,8 +403,7 @@ public Collection getParticipants() throws Exception * @return leader * @throws Exception ZK errors, interruptions, etc. */ - public Participant getLeader() throws Exception - { + public Participant getLeader() throws Exception { Collection participantNodes = LockInternals.getParticipantNodes(client, latchPath, LOCK_NAME, sorter); return LeaderSelector.getLeader(client, participantNodes); } @@ -475,8 +413,7 @@ public Participant getLeader() throws Exception * * @return true/false */ - public boolean hasLeadership() - { + public boolean hasLeadership() { return (state.get() == State.STARTED) && hasLeadership.get(); } @@ -494,8 +431,7 @@ public boolean hasLeadership() * * @return lock node path or null */ - public String getOurPath() - { + public String getOurPath() { return ourPath.get(); } @@ -510,8 +446,7 @@ public String getOurPath() * * @return last lock node path that was leader ever or null */ - public String getLastPathIsLeader() - { + public String getLastPathIsLeader() { return lastPathIsLeader.get(); } @@ -519,54 +454,36 @@ public String getLastPathIsLeader() volatile CountDownLatch debugResetWaitLatch = null; @VisibleForTesting - void reset() throws Exception - { + void reset() throws Exception { setLeadership(false); setNode(null); - BackgroundCallback callback = new BackgroundCallback() - { - @Override - public void processResult(CuratorFramework client, CuratorEvent event) throws Exception - { - if ( debugResetWaitLatch != null ) - { - debugResetWaitLatch.await(); - debugResetWaitLatch = null; - } + BackgroundCallback callback = (client, event) -> { + if (debugResetWaitLatch != null) { + debugResetWaitLatch.await(); + debugResetWaitLatch = null; + } - if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) - { - setNode(event.getName()); - if ( state.get() == State.CLOSED ) - { - setNode(null); - } - else - { - getChildren(); - } - } - else - { - log.error("getChildren() failed. rc = " + event.getResultCode()); + if (event.getResultCode() == KeeperException.Code.OK.intValue()) { + setNode(event.getName()); + if (state.get() == State.CLOSED) { + setNode(null); + } else { + getChildren(); } + } else { + log.error("getChildren() failed. rc = " + event.getResultCode()); } }; client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id)); } - private synchronized void internalStart() - { - if ( state.get() == State.STARTED ) - { + private synchronized void internalStart() { + if (state.get() == State.STARTED) { client.getConnectionStateListenable().addListener(listener); - try - { + try { reset(); - } - catch ( Exception e ) - { + } catch (Exception e) { ThreadUtils.checkInterrupted(e); log.error("An error occurred checking resetting leadership.", e); } @@ -576,59 +493,37 @@ private synchronized void internalStart() @VisibleForTesting volatile CountDownLatch debugCheckLeaderShipLatch = null; - private synchronized void checkLeadership(List children) throws Exception - { - if ( debugCheckLeaderShipLatch != null ) - { + private synchronized void checkLeadership(List children) throws Exception { + if (debugCheckLeaderShipLatch != null) { debugCheckLeaderShipLatch.await(); } final String localOurPath = ourPath.get(); List sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children); int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1; - if ( ourIndex < 0 ) - { + if (ourIndex < 0) { log.error("Can't find our node. Resetting. Index: " + ourIndex); reset(); - } - else if ( ourIndex == 0 ) - { + } else if (ourIndex == 0) { lastPathIsLeader.set(localOurPath); setLeadership(true); - } - else - { + } else { String watchPath = sortedChildren.get(ourIndex - 1); - Watcher watcher = new Watcher() - { - @Override - public void process(WatchedEvent event) - { - if ( state.get() == State.STARTED && event.getType() == Event.EventType.NodeDeleted ) - { - try - { - getChildren(); - } - catch ( Exception ex ) - { - ThreadUtils.checkInterrupted(ex); - log.error("An error occurred checking the leadership.", ex); - } + Watcher watcher = event -> { + if (state.get() == State.STARTED && event.getType() == Watcher.Event.EventType.NodeDeleted) { + try { + getChildren(); + } catch (Exception ex) { + ThreadUtils.checkInterrupted(ex); + log.error("An error occurred checking the leadership.", ex); } } }; - BackgroundCallback callback = new BackgroundCallback() - { - @Override - public void processResult(CuratorFramework client, CuratorEvent event) throws Exception - { - if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) - { - // previous node is gone - retry getChildren - getChildren(); - } + BackgroundCallback callback = (client, event) -> { + if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) { + // previous node is gone - retry getChildren + getChildren(); } }; // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak @@ -636,44 +531,29 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex } } - private void getChildren() throws Exception - { - BackgroundCallback callback = new BackgroundCallback() - { - @Override - public void processResult(CuratorFramework client, CuratorEvent event) throws Exception - { - if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) - { - checkLeadership(event.getChildren()); - } + private void getChildren() throws Exception { + BackgroundCallback callback = (client, event) -> { + if (event.getResultCode() == KeeperException.Code.OK.intValue()) { + checkLeadership(event.getChildren()); } }; client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null)); } @VisibleForTesting - protected void handleStateChange(ConnectionState newState) - { - switch ( newState ) - { - default: - { + protected void handleStateChange(ConnectionState newState) { + switch (newState) { + default: { // NOP break; } - case RECONNECTED: - { - try - { - if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) ) - { + case RECONNECTED: { + try { + if (client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED)) { getChildren(); } - } - catch ( Exception e ) - { + } catch (Exception e) { ThreadUtils.checkInterrupted(e); log.error("Could not reset leader latch", e); setLeadership(false); @@ -681,44 +561,35 @@ protected void handleStateChange(ConnectionState newState) break; } - case SUSPENDED: - { - if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) ) - { + case SUSPENDED: { + if (client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED)) { setLeadership(false); } break; } - case LOST: - { + case LOST: { setLeadership(false); break; } } } - private synchronized void setLeadership(boolean newValue) - { + private synchronized void setLeadership(boolean newValue) { boolean oldValue = hasLeadership.getAndSet(newValue); - if ( oldValue && !newValue ) - { // Lost leadership, was true, now false + if (oldValue && !newValue) { // Lost leadership, was true, now false listeners.forEach(LeaderLatchListener::notLeader); - } - else if ( !oldValue && newValue ) - { // Gained leadership, was false, now true + } else if (!oldValue && newValue) { // Gained leadership, was false, now true listeners.forEach(LeaderLatchListener::isLeader); } notifyAll(); } - private void setNode(String newValue) throws Exception - { + private void setNode(String newValue) throws Exception { String oldPath = ourPath.getAndSet(newValue); - if ( oldPath != null ) - { + if (oldPath != null) { client.delete().guaranteed().inBackground().forPath(oldPath); } } From 037bf81901abf4aa89b7d6646abae5d07233a5ff Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 13 Jul 2022 09:36:32 +0800 Subject: [PATCH 05/20] lock by leadershipLock object Signed-off-by: tison --- .../framework/recipes/leader/LeaderLatch.java | 158 ++++++++++-------- 1 file changed, 84 insertions(+), 74 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index bc5f65d20e..51b92f860f 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -76,6 +76,8 @@ public class LeaderLatch implements Closeable { private final AtomicReference> startTask = new AtomicReference<>(); private final ConnectionStateListener listener = (client, newState) -> handleStateChange(newState); + private final Object leadershipLock = new Object(); + public enum State { LATENT, STARTED, @@ -164,32 +166,34 @@ public void close() throws IOException { * @param closeMode allows the default close mode to be overridden at the time the latch is closed. * @throws IOException errors */ - public synchronized void close(CloseMode closeMode) throws IOException { - Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started"); - Preconditions.checkNotNull(closeMode, "closeMode cannot be null"); - - cancelStartTask(); - - try { - setNode(null); - client.removeWatchers(); - } catch (Exception e) { - ThreadUtils.checkInterrupted(e); - throw new IOException(e); - } finally { - client.getConnectionStateListenable().removeListener(listener); - - switch (closeMode) { - case NOTIFY_LEADER: { - setLeadership(false); - listeners.clear(); - break; - } + public void close(CloseMode closeMode) throws IOException { + synchronized (leadershipLock) { + Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started"); + Preconditions.checkNotNull(closeMode, "closeMode cannot be null"); - case SILENT: { - listeners.clear(); - setLeadership(false); - break; + cancelStartTask(); + + try { + setNode(null); + client.removeWatchers(); + } catch (Exception e) { + ThreadUtils.checkInterrupted(e); + throw new IOException(e); + } finally { + client.getConnectionStateListenable().removeListener(listener); + + switch (closeMode) { + case NOTIFY_LEADER: { + setLeadership(false); + listeners.clear(); + break; + } + + case SILENT: { + listeners.clear(); + setLeadership(false); + break; + } } } } @@ -275,9 +279,9 @@ public void removeListener(LeaderLatchListener listener) { * while waiting */ public void await() throws InterruptedException, EOFException { - synchronized (this) { + synchronized (leadershipLock) { while ((state.get() == State.STARTED) && !hasLeadership.get()) { - wait(); + leadershipLock.wait(); } } if (state.get() != State.STARTED) { @@ -326,7 +330,7 @@ public void await() throws InterruptedException, EOFException { public boolean await(long timeout, TimeUnit unit) throws InterruptedException { long waitNanos = TimeUnit.NANOSECONDS.convert(timeout, unit); - synchronized (this) { + synchronized (leadershipLock) { while (true) { if (state.get() != State.STARTED) { return false; @@ -341,7 +345,7 @@ public boolean await(long timeout, TimeUnit unit) throws InterruptedException { } long startNanos = System.nanoTime(); - TimeUnit.NANOSECONDS.timedWait(this, waitNanos); + TimeUnit.NANOSECONDS.timedWait(leadershipLock, waitNanos); long elapsed = System.nanoTime() - startNanos; waitNanos -= elapsed; } @@ -478,14 +482,16 @@ void reset() throws Exception { client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id)); } - private synchronized void internalStart() { - if (state.get() == State.STARTED) { - client.getConnectionStateListenable().addListener(listener); - try { - reset(); - } catch (Exception e) { - ThreadUtils.checkInterrupted(e); - log.error("An error occurred checking resetting leadership.", e); + private void internalStart() { + synchronized (leadershipLock) { + if (state.get() == State.STARTED) { + client.getConnectionStateListenable().addListener(listener); + try { + reset(); + } catch (Exception e) { + ThreadUtils.checkInterrupted(e); + log.error("An error occurred checking resetting leadership.", e); + } } } } @@ -493,41 +499,43 @@ private synchronized void internalStart() { @VisibleForTesting volatile CountDownLatch debugCheckLeaderShipLatch = null; - private synchronized void checkLeadership(List children) throws Exception { + private void checkLeadership(List children) throws Exception { if (debugCheckLeaderShipLatch != null) { debugCheckLeaderShipLatch.await(); } - final String localOurPath = ourPath.get(); - List sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children); - int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1; - if (ourIndex < 0) { - log.error("Can't find our node. Resetting. Index: " + ourIndex); - reset(); - } else if (ourIndex == 0) { - lastPathIsLeader.set(localOurPath); - setLeadership(true); - } else { - String watchPath = sortedChildren.get(ourIndex - 1); - Watcher watcher = event -> { - if (state.get() == State.STARTED && event.getType() == Watcher.Event.EventType.NodeDeleted) { - try { - getChildren(); - } catch (Exception ex) { - ThreadUtils.checkInterrupted(ex); - log.error("An error occurred checking the leadership.", ex); + synchronized (leadershipLock) { + final String localOurPath = ourPath.get(); + List sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children); + int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1; + if (ourIndex < 0) { + log.error("Can't find our node. Resetting. Index: " + ourIndex); + reset(); + } else if (ourIndex == 0) { + lastPathIsLeader.set(localOurPath); + setLeadership(true); + } else { + String watchPath = sortedChildren.get(ourIndex - 1); + Watcher watcher = event -> { + if (state.get() == State.STARTED && event.getType() == Watcher.Event.EventType.NodeDeleted) { + try { + getChildren(); + } catch (Exception ex) { + ThreadUtils.checkInterrupted(ex); + log.error("An error occurred checking the leadership.", ex); + } } - } - }; + }; - BackgroundCallback callback = (client, event) -> { - if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) { - // previous node is gone - retry getChildren - getChildren(); - } - }; - // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak - client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath)); + BackgroundCallback callback = (client, event) -> { + if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) { + // previous node is gone - retry getChildren + getChildren(); + } + }; + // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak + client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath)); + } } } @@ -575,16 +583,18 @@ protected void handleStateChange(ConnectionState newState) { } } - private synchronized void setLeadership(boolean newValue) { - boolean oldValue = hasLeadership.getAndSet(newValue); + private void setLeadership(boolean newValue) { + synchronized (leadershipLock) { + boolean oldValue = hasLeadership.getAndSet(newValue); - if (oldValue && !newValue) { // Lost leadership, was true, now false - listeners.forEach(LeaderLatchListener::notLeader); - } else if (!oldValue && newValue) { // Gained leadership, was false, now true - listeners.forEach(LeaderLatchListener::isLeader); - } + if (oldValue && !newValue) { // Lost leadership, was true, now false + listeners.forEach(LeaderLatchListener::notLeader); + } else if (!oldValue && newValue) { // Gained leadership, was false, now true + listeners.forEach(LeaderLatchListener::isLeader); + } - notifyAll(); + leadershipLock.notifyAll(); + } } private void setNode(String newValue) throws Exception { From 7b080a34b0dad5022c7a709d1919fa40ed0c50e5 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 13 Jul 2022 10:19:30 +0800 Subject: [PATCH 06/20] Revert "lock by leadershipLock object" This reverts commit 037bf81901abf4aa89b7d6646abae5d07233a5ff. --- .../framework/recipes/leader/LeaderLatch.java | 158 ++++++++---------- 1 file changed, 74 insertions(+), 84 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index 51b92f860f..bc5f65d20e 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -76,8 +76,6 @@ public class LeaderLatch implements Closeable { private final AtomicReference> startTask = new AtomicReference<>(); private final ConnectionStateListener listener = (client, newState) -> handleStateChange(newState); - private final Object leadershipLock = new Object(); - public enum State { LATENT, STARTED, @@ -166,34 +164,32 @@ public void close() throws IOException { * @param closeMode allows the default close mode to be overridden at the time the latch is closed. * @throws IOException errors */ - public void close(CloseMode closeMode) throws IOException { - synchronized (leadershipLock) { - Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started"); - Preconditions.checkNotNull(closeMode, "closeMode cannot be null"); - - cancelStartTask(); - - try { - setNode(null); - client.removeWatchers(); - } catch (Exception e) { - ThreadUtils.checkInterrupted(e); - throw new IOException(e); - } finally { - client.getConnectionStateListenable().removeListener(listener); - - switch (closeMode) { - case NOTIFY_LEADER: { - setLeadership(false); - listeners.clear(); - break; - } + public synchronized void close(CloseMode closeMode) throws IOException { + Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started"); + Preconditions.checkNotNull(closeMode, "closeMode cannot be null"); + + cancelStartTask(); + + try { + setNode(null); + client.removeWatchers(); + } catch (Exception e) { + ThreadUtils.checkInterrupted(e); + throw new IOException(e); + } finally { + client.getConnectionStateListenable().removeListener(listener); + + switch (closeMode) { + case NOTIFY_LEADER: { + setLeadership(false); + listeners.clear(); + break; + } - case SILENT: { - listeners.clear(); - setLeadership(false); - break; - } + case SILENT: { + listeners.clear(); + setLeadership(false); + break; } } } @@ -279,9 +275,9 @@ public void removeListener(LeaderLatchListener listener) { * while waiting */ public void await() throws InterruptedException, EOFException { - synchronized (leadershipLock) { + synchronized (this) { while ((state.get() == State.STARTED) && !hasLeadership.get()) { - leadershipLock.wait(); + wait(); } } if (state.get() != State.STARTED) { @@ -330,7 +326,7 @@ public void await() throws InterruptedException, EOFException { public boolean await(long timeout, TimeUnit unit) throws InterruptedException { long waitNanos = TimeUnit.NANOSECONDS.convert(timeout, unit); - synchronized (leadershipLock) { + synchronized (this) { while (true) { if (state.get() != State.STARTED) { return false; @@ -345,7 +341,7 @@ public boolean await(long timeout, TimeUnit unit) throws InterruptedException { } long startNanos = System.nanoTime(); - TimeUnit.NANOSECONDS.timedWait(leadershipLock, waitNanos); + TimeUnit.NANOSECONDS.timedWait(this, waitNanos); long elapsed = System.nanoTime() - startNanos; waitNanos -= elapsed; } @@ -482,16 +478,14 @@ void reset() throws Exception { client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id)); } - private void internalStart() { - synchronized (leadershipLock) { - if (state.get() == State.STARTED) { - client.getConnectionStateListenable().addListener(listener); - try { - reset(); - } catch (Exception e) { - ThreadUtils.checkInterrupted(e); - log.error("An error occurred checking resetting leadership.", e); - } + private synchronized void internalStart() { + if (state.get() == State.STARTED) { + client.getConnectionStateListenable().addListener(listener); + try { + reset(); + } catch (Exception e) { + ThreadUtils.checkInterrupted(e); + log.error("An error occurred checking resetting leadership.", e); } } } @@ -499,43 +493,41 @@ private void internalStart() { @VisibleForTesting volatile CountDownLatch debugCheckLeaderShipLatch = null; - private void checkLeadership(List children) throws Exception { + private synchronized void checkLeadership(List children) throws Exception { if (debugCheckLeaderShipLatch != null) { debugCheckLeaderShipLatch.await(); } - synchronized (leadershipLock) { - final String localOurPath = ourPath.get(); - List sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children); - int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1; - if (ourIndex < 0) { - log.error("Can't find our node. Resetting. Index: " + ourIndex); - reset(); - } else if (ourIndex == 0) { - lastPathIsLeader.set(localOurPath); - setLeadership(true); - } else { - String watchPath = sortedChildren.get(ourIndex - 1); - Watcher watcher = event -> { - if (state.get() == State.STARTED && event.getType() == Watcher.Event.EventType.NodeDeleted) { - try { - getChildren(); - } catch (Exception ex) { - ThreadUtils.checkInterrupted(ex); - log.error("An error occurred checking the leadership.", ex); - } - } - }; - - BackgroundCallback callback = (client, event) -> { - if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) { - // previous node is gone - retry getChildren + final String localOurPath = ourPath.get(); + List sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children); + int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1; + if (ourIndex < 0) { + log.error("Can't find our node. Resetting. Index: " + ourIndex); + reset(); + } else if (ourIndex == 0) { + lastPathIsLeader.set(localOurPath); + setLeadership(true); + } else { + String watchPath = sortedChildren.get(ourIndex - 1); + Watcher watcher = event -> { + if (state.get() == State.STARTED && event.getType() == Watcher.Event.EventType.NodeDeleted) { + try { getChildren(); + } catch (Exception ex) { + ThreadUtils.checkInterrupted(ex); + log.error("An error occurred checking the leadership.", ex); } - }; - // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak - client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath)); - } + } + }; + + BackgroundCallback callback = (client, event) -> { + if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) { + // previous node is gone - retry getChildren + getChildren(); + } + }; + // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak + client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath)); } } @@ -583,18 +575,16 @@ protected void handleStateChange(ConnectionState newState) { } } - private void setLeadership(boolean newValue) { - synchronized (leadershipLock) { - boolean oldValue = hasLeadership.getAndSet(newValue); - - if (oldValue && !newValue) { // Lost leadership, was true, now false - listeners.forEach(LeaderLatchListener::notLeader); - } else if (!oldValue && newValue) { // Gained leadership, was false, now true - listeners.forEach(LeaderLatchListener::isLeader); - } + private synchronized void setLeadership(boolean newValue) { + boolean oldValue = hasLeadership.getAndSet(newValue); - leadershipLock.notifyAll(); + if (oldValue && !newValue) { // Lost leadership, was true, now false + listeners.forEach(LeaderLatchListener::notLeader); + } else if (!oldValue && newValue) { // Gained leadership, was false, now true + listeners.forEach(LeaderLatchListener::isLeader); } + + notifyAll(); } private void setNode(String newValue) throws Exception { From b172f5fafd781f14ec896c470d21b185cea72c43 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 13 Jul 2022 10:19:33 +0800 Subject: [PATCH 07/20] Revert "reformat" This reverts commit 8a76593d9a06c9594cadb836de7e46d8e2ebbab2. --- .../framework/recipes/leader/LeaderLatch.java | 377 ++++++++++++------ 1 file changed, 253 insertions(+), 124 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index bc5f65d20e..a069e45cdc 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -21,20 +21,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import java.io.Closeable; -import java.io.EOFException; -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.WatcherRemoveCuratorFramework; import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.listen.StandardListenerManager; import org.apache.curator.framework.recipes.AfterConnectionEstablished; import org.apache.curator.framework.recipes.locks.LockInternals; @@ -42,14 +32,26 @@ import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.curator.utils.PathUtils; import org.apache.curator.utils.ThreadUtils; import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; +import java.io.EOFException; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.curator.utils.PathUtils; /** *

@@ -59,24 +61,42 @@ * group will randomly be chosen *

*/ -public class LeaderLatch implements Closeable { - private static final String LOCK_NAME = "latch-"; - private static final LockInternalsSorter sorter = StandardLockInternalsDriver::standardFixForSorting; - private static final Logger log = LoggerFactory.getLogger(LeaderLatch.class); - +public class LeaderLatch implements Closeable +{ + private final Logger log = LoggerFactory.getLogger(getClass()); private final WatcherRemoveCuratorFramework client; private final String latchPath; private final String id; - private final AtomicReference state = new AtomicReference<>(State.LATENT); + private final AtomicReference state = new AtomicReference(State.LATENT); private final AtomicBoolean hasLeadership = new AtomicBoolean(false); - private final AtomicReference ourPath = new AtomicReference<>(); - private final AtomicReference lastPathIsLeader = new AtomicReference<>(); + private final AtomicReference ourPath = new AtomicReference(); + private final AtomicReference lastPathIsLeader = new AtomicReference(); private final StandardListenerManager listeners = StandardListenerManager.standard(); private final CloseMode closeMode; - private final AtomicReference> startTask = new AtomicReference<>(); - private final ConnectionStateListener listener = (client, newState) -> handleStateChange(newState); + private final AtomicReference> startTask = new AtomicReference>(); + + private final ConnectionStateListener listener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + handleStateChange(newState); + } + }; + + private static final String LOCK_NAME = "latch-"; + + private static final LockInternalsSorter sorter = new LockInternalsSorter() + { + @Override + public String fixForSorting(String str, String lockName) + { + return StandardLockInternalsDriver.standardFixForSorting(str, lockName); + } + }; - public enum State { + public enum State + { LATENT, STARTED, CLOSED @@ -85,7 +105,8 @@ public enum State { /** * How to handle listeners when the latch is closed */ - public enum CloseMode { + public enum CloseMode + { /** * When the latch is closed, listeners will *not* be notified (default behavior) */ @@ -101,7 +122,8 @@ public enum CloseMode { * @param client the client * @param latchPath the path for this leadership group */ - public LeaderLatch(CuratorFramework client, String latchPath) { + public LeaderLatch(CuratorFramework client, String latchPath) + { this(client, latchPath, "", CloseMode.SILENT); } @@ -110,7 +132,8 @@ public LeaderLatch(CuratorFramework client, String latchPath) { * @param latchPath the path for this leadership group * @param id participant ID */ - public LeaderLatch(CuratorFramework client, String latchPath, String id) { + public LeaderLatch(CuratorFramework client, String latchPath, String id) + { this(client, latchPath, id, CloseMode.SILENT); } @@ -120,7 +143,8 @@ public LeaderLatch(CuratorFramework client, String latchPath, String id) { * @param id participant ID * @param closeMode behaviour of listener on explicit close. */ - public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode) { + public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode) + { this.client = Preconditions.checkNotNull(client, "client cannot be null").newWatcherRemoveCuratorFramework(); this.latchPath = PathUtils.validatePath(latchPath); this.id = Preconditions.checkNotNull(id, "id cannot be null"); @@ -132,14 +156,23 @@ public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMo * * @throws Exception errors */ - public void start() throws Exception { + public void start() throws Exception + { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); - startTask.set(AfterConnectionEstablished.execute(client, () -> { - try { - internalStart(); - } finally { - startTask.set(null); + startTask.set(AfterConnectionEstablished.execute(client, new Runnable() + { + @Override + public void run() + { + try + { + internalStart(); + } + finally + { + startTask.set(null); + } } })); } @@ -152,7 +185,8 @@ public void start() throws Exception { * @throws IOException errors */ @Override - public void close() throws IOException { + public void close() throws IOException + { close(closeMode); } @@ -164,41 +198,52 @@ public void close() throws IOException { * @param closeMode allows the default close mode to be overridden at the time the latch is closed. * @throws IOException errors */ - public synchronized void close(CloseMode closeMode) throws IOException { + public synchronized void close(CloseMode closeMode) throws IOException + { Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started"); Preconditions.checkNotNull(closeMode, "closeMode cannot be null"); cancelStartTask(); - try { + try + { setNode(null); client.removeWatchers(); - } catch (Exception e) { + } + catch ( Exception e ) + { ThreadUtils.checkInterrupted(e); throw new IOException(e); - } finally { + } + finally + { client.getConnectionStateListenable().removeListener(listener); - switch (closeMode) { - case NOTIFY_LEADER: { - setLeadership(false); - listeners.clear(); - break; - } + switch ( closeMode ) + { + case NOTIFY_LEADER: + { + setLeadership(false); + listeners.clear(); + break; + } - case SILENT: { - listeners.clear(); - setLeadership(false); - break; - } + default: + { + listeners.clear(); + setLeadership(false); + break; + } } } } @VisibleForTesting - protected boolean cancelStartTask() { + protected boolean cancelStartTask() + { Future localStartTask = startTask.getAndSet(null); - if (localStartTask != null) { + if ( localStartTask != null ) + { localStartTask.cancel(true); return true; } @@ -217,7 +262,8 @@ protected boolean cancelStartTask() { * * @param listener the listener to attach */ - public void addListener(LeaderLatchListener listener) { + public void addListener(LeaderLatchListener listener) + { listeners.addListener(listener); } @@ -234,7 +280,8 @@ public void addListener(LeaderLatchListener listener) { * @param listener the listener to attach * @param executor An executor to run the methods for the listener on. */ - public void addListener(LeaderLatchListener listener, Executor executor) { + public void addListener(LeaderLatchListener listener, Executor executor) + { listeners.addListener(listener, executor); } @@ -243,7 +290,8 @@ public void addListener(LeaderLatchListener listener, Executor executor) { * * @param listener the listener to remove */ - public void removeListener(LeaderLatchListener listener) { + public void removeListener(LeaderLatchListener listener) + { listeners.removeListener(listener); } @@ -274,13 +322,17 @@ public void removeListener(LeaderLatchListener listener) { * @throws EOFException if the instance is {@linkplain #close() closed} * while waiting */ - public void await() throws InterruptedException, EOFException { - synchronized (this) { - while ((state.get() == State.STARTED) && !hasLeadership.get()) { + public void await() throws InterruptedException, EOFException + { + synchronized(this) + { + while ( (state.get() == State.STARTED) && !hasLeadership.get() ) + { wait(); } } - if (state.get() != State.STARTED) { + if ( state.get() != State.STARTED ) + { throw new EOFException(); } } @@ -323,20 +375,26 @@ public void await() throws InterruptedException, EOFException { * @throws InterruptedException if the current thread is interrupted * while waiting */ - public boolean await(long timeout, TimeUnit unit) throws InterruptedException { + public boolean await(long timeout, TimeUnit unit) throws InterruptedException + { long waitNanos = TimeUnit.NANOSECONDS.convert(timeout, unit); - synchronized (this) { - while (true) { - if (state.get() != State.STARTED) { + synchronized(this) + { + while ( true ) + { + if ( state.get() != State.STARTED ) + { return false; } - if (hasLeadership()) { + if ( hasLeadership() ) + { return true; } - if (waitNanos <= 0) { + if ( waitNanos <= 0 ) + { return false; } @@ -353,7 +411,8 @@ public boolean await(long timeout, TimeUnit unit) throws InterruptedException { * * @return participant Id */ - public String getId() { + public String getId() + { return id; } @@ -365,7 +424,8 @@ public String getId() { * * @return the state of the current instance */ - public State getState() { + public State getState() + { return state.get(); } @@ -383,7 +443,8 @@ public State getState() { * @return participants * @throws Exception ZK errors, interruptions, etc. */ - public Collection getParticipants() throws Exception { + public Collection getParticipants() throws Exception + { Collection participantNodes = LockInternals.getParticipantNodes(client, latchPath, LOCK_NAME, sorter); return LeaderSelector.getParticipants(client, participantNodes); } @@ -403,7 +464,8 @@ public Collection getParticipants() throws Exception { * @return leader * @throws Exception ZK errors, interruptions, etc. */ - public Participant getLeader() throws Exception { + public Participant getLeader() throws Exception + { Collection participantNodes = LockInternals.getParticipantNodes(client, latchPath, LOCK_NAME, sorter); return LeaderSelector.getLeader(client, participantNodes); } @@ -413,7 +475,8 @@ public Participant getLeader() throws Exception { * * @return true/false */ - public boolean hasLeadership() { + public boolean hasLeadership() + { return (state.get() == State.STARTED) && hasLeadership.get(); } @@ -431,7 +494,8 @@ public boolean hasLeadership() { * * @return lock node path or null */ - public String getOurPath() { + public String getOurPath() + { return ourPath.get(); } @@ -446,7 +510,8 @@ public String getOurPath() { * * @return last lock node path that was leader ever or null */ - public String getLastPathIsLeader() { + public String getLastPathIsLeader() + { return lastPathIsLeader.get(); } @@ -454,36 +519,54 @@ public String getLastPathIsLeader() { volatile CountDownLatch debugResetWaitLatch = null; @VisibleForTesting - void reset() throws Exception { + void reset() throws Exception + { setLeadership(false); setNode(null); - BackgroundCallback callback = (client, event) -> { - if (debugResetWaitLatch != null) { - debugResetWaitLatch.await(); - debugResetWaitLatch = null; - } + BackgroundCallback callback = new BackgroundCallback() + { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { + if ( debugResetWaitLatch != null ) + { + debugResetWaitLatch.await(); + debugResetWaitLatch = null; + } - if (event.getResultCode() == KeeperException.Code.OK.intValue()) { - setNode(event.getName()); - if (state.get() == State.CLOSED) { - setNode(null); - } else { - getChildren(); + if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) + { + setNode(event.getName()); + if ( state.get() == State.CLOSED ) + { + setNode(null); + } + else + { + getChildren(); + } + } + else + { + log.error("getChildren() failed. rc = " + event.getResultCode()); } - } else { - log.error("getChildren() failed. rc = " + event.getResultCode()); } }; client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id)); } - private synchronized void internalStart() { - if (state.get() == State.STARTED) { + private synchronized void internalStart() + { + if ( state.get() == State.STARTED ) + { client.getConnectionStateListenable().addListener(listener); - try { + try + { reset(); - } catch (Exception e) { + } + catch ( Exception e ) + { ThreadUtils.checkInterrupted(e); log.error("An error occurred checking resetting leadership.", e); } @@ -493,37 +576,59 @@ private synchronized void internalStart() { @VisibleForTesting volatile CountDownLatch debugCheckLeaderShipLatch = null; - private synchronized void checkLeadership(List children) throws Exception { - if (debugCheckLeaderShipLatch != null) { + private synchronized void checkLeadership(List children) throws Exception + { + if ( debugCheckLeaderShipLatch != null ) + { debugCheckLeaderShipLatch.await(); } final String localOurPath = ourPath.get(); List sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children); int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1; - if (ourIndex < 0) { + if ( ourIndex < 0 ) + { log.error("Can't find our node. Resetting. Index: " + ourIndex); reset(); - } else if (ourIndex == 0) { + } + else if ( ourIndex == 0 ) + { lastPathIsLeader.set(localOurPath); setLeadership(true); - } else { + } + else + { String watchPath = sortedChildren.get(ourIndex - 1); - Watcher watcher = event -> { - if (state.get() == State.STARTED && event.getType() == Watcher.Event.EventType.NodeDeleted) { - try { - getChildren(); - } catch (Exception ex) { - ThreadUtils.checkInterrupted(ex); - log.error("An error occurred checking the leadership.", ex); + Watcher watcher = new Watcher() + { + @Override + public void process(WatchedEvent event) + { + if ( state.get() == State.STARTED && event.getType() == Event.EventType.NodeDeleted ) + { + try + { + getChildren(); + } + catch ( Exception ex ) + { + ThreadUtils.checkInterrupted(ex); + log.error("An error occurred checking the leadership.", ex); + } } } }; - BackgroundCallback callback = (client, event) -> { - if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) { - // previous node is gone - retry getChildren - getChildren(); + BackgroundCallback callback = new BackgroundCallback() + { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { + if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) + { + // previous node is gone - retry getChildren + getChildren(); + } } }; // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak @@ -531,29 +636,44 @@ private synchronized void checkLeadership(List children) throws Exceptio } } - private void getChildren() throws Exception { - BackgroundCallback callback = (client, event) -> { - if (event.getResultCode() == KeeperException.Code.OK.intValue()) { - checkLeadership(event.getChildren()); + private void getChildren() throws Exception + { + BackgroundCallback callback = new BackgroundCallback() + { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { + if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) + { + checkLeadership(event.getChildren()); + } } }; client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null)); } @VisibleForTesting - protected void handleStateChange(ConnectionState newState) { - switch (newState) { - default: { + protected void handleStateChange(ConnectionState newState) + { + switch ( newState ) + { + default: + { // NOP break; } - case RECONNECTED: { - try { - if (client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED)) { + case RECONNECTED: + { + try + { + if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) ) + { getChildren(); } - } catch (Exception e) { + } + catch ( Exception e ) + { ThreadUtils.checkInterrupted(e); log.error("Could not reset leader latch", e); setLeadership(false); @@ -561,35 +681,44 @@ protected void handleStateChange(ConnectionState newState) { break; } - case SUSPENDED: { - if (client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED)) { + case SUSPENDED: + { + if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) ) + { setLeadership(false); } break; } - case LOST: { + case LOST: + { setLeadership(false); break; } } } - private synchronized void setLeadership(boolean newValue) { + private synchronized void setLeadership(boolean newValue) + { boolean oldValue = hasLeadership.getAndSet(newValue); - if (oldValue && !newValue) { // Lost leadership, was true, now false + if ( oldValue && !newValue ) + { // Lost leadership, was true, now false listeners.forEach(LeaderLatchListener::notLeader); - } else if (!oldValue && newValue) { // Gained leadership, was false, now true + } + else if ( !oldValue && newValue ) + { // Gained leadership, was false, now true listeners.forEach(LeaderLatchListener::isLeader); } notifyAll(); } - private void setNode(String newValue) throws Exception { + private void setNode(String newValue) throws Exception + { String oldPath = ourPath.getAndSet(newValue); - if (oldPath != null) { + if ( oldPath != null ) + { client.delete().guaranteed().inBackground().forPath(oldPath); } } From 99a9009eb77fe2f297e4f6a1a41b4cea8d3a3809 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 13 Jul 2022 10:19:34 +0800 Subject: [PATCH 08/20] Revert "synchronize checkLeadership" This reverts commit 0d4a7b75c34fc06ea36ae0cd4a0fb48d263d96cb. --- .../apache/curator/framework/recipes/leader/LeaderLatch.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index a069e45cdc..be78a72d24 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -576,7 +576,7 @@ private synchronized void internalStart() @VisibleForTesting volatile CountDownLatch debugCheckLeaderShipLatch = null; - private synchronized void checkLeadership(List children) throws Exception + private void checkLeadership(List children) throws Exception { if ( debugCheckLeaderShipLatch != null ) { From 425598cb6bf6a5b227a5fdd293fe46c7978d6578 Mon Sep 17 00:00:00 2001 From: tison Date: Thu, 14 Jul 2022 08:10:52 +0800 Subject: [PATCH 09/20] log debug and add test case testOurPathDeletedOnReconnect Signed-off-by: tison --- .../framework/recipes/leader/LeaderLatch.java | 4 ++++ .../recipes/leader/TestLeaderLatch.java | 24 +++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index be78a72d24..56975088fd 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -586,6 +586,9 @@ private void checkLeadership(List children) throws Exception final String localOurPath = ourPath.get(); List sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children); int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1; + + log.debug("checkLeadership with ourPath: {}, children: {}", localOurPath, sortedChildren); + if ( ourIndex < 0 ) { log.error("Can't find our node. Resetting. Index: " + ourIndex); @@ -717,6 +720,7 @@ else if ( !oldValue && newValue ) private void setNode(String newValue) throws Exception { String oldPath = ourPath.getAndSet(newValue); + log.debug("setNode with oldPath: {}, newValue: {}", oldPath, newValue); if ( oldPath != null ) { client.delete().guaranteed().inBackground().forPath(oldPath); diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index d64e7cfee5..eb32c5faa3 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -218,6 +218,30 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception } } + @Test + public void testOurPathDeletedOnReconnect() throws Exception + { + final String latchPath = "/foo/bar"; + Timing2 timing = new Timing2(); + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) ) + { + client.start(); + try ( LeaderLatch latch = new LeaderLatch(client, latchPath, "0") ) + { + latch.debugCheckLeaderShipLatch = new CountDownLatch(1); + latch.start(); // hold before checkLeadership() + timing.sleepABit(); + latch.reset(); // force the internal "ourPath" to get reset + latch.debugCheckLeaderShipLatch.countDown(); // allow checkLeadership() to continue + + assertTrue(latch.await(timing.forSessionSleep().forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); + timing.sleepABit(); + assertTrue(latch.hasLeadership()); + assertEquals(client.getChildren().forPath(latchPath).size(), 1); + } + } + } + @Test public void testSessionErrorPolicy() throws Exception { From 50a65159b54944790f0b2afed3bf651adad14a26 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 12 Sep 2022 00:06:39 +0800 Subject: [PATCH 10/20] testLeadershipElectionWhenNodeDisappearsAfterChildrenAreRetrieved Signed-off-by: tison Co-authored-by: Matthias Pohl --- .../recipes/leader/TestLeaderLatch.java | 48 ++++++++++++++----- 1 file changed, 37 insertions(+), 11 deletions(-) diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index eb32c5faa3..cefee53a2e 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -219,25 +219,51 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception } @Test - public void testOurPathDeletedOnReconnect() throws Exception + public void testLeadershipElectionWhenNodeDisappearsAfterChildrenAreRetrieved() throws Exception { final String latchPath = "/foo/bar"; - Timing2 timing = new Timing2(); - try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)) ) + final Timing2 timing = new Timing2(); + try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) { client.start(); - try ( LeaderLatch latch = new LeaderLatch(client, latchPath, "0") ) + LeaderLatch latchInitialLeader = new LeaderLatch(client, latchPath, "initial-leader"); + LeaderLatch latchCandidate0 = new LeaderLatch(client, latchPath, "candidate-0"); + LeaderLatch latchCandidate1 = new LeaderLatch(client, latchPath, "candidate-1"); + + try { - latch.debugCheckLeaderShipLatch = new CountDownLatch(1); - latch.start(); // hold before checkLeadership() + latchInitialLeader.start(); + + // we want to make sure that the leader gets leadership before other instances joining the party + waitForALeader(Collections.singletonList(latchInitialLeader), new Timing()); + + // candidate #0 will wait for the leader to go away - this should happen after the child nodes are retrieved by candidate #0 + latchCandidate0.debugCheckLeaderShipLatch = new CountDownLatch(1); + + latchCandidate0.start(); timing.sleepABit(); - latch.reset(); // force the internal "ourPath" to get reset - latch.debugCheckLeaderShipLatch.countDown(); // allow checkLeadership() to continue - assertTrue(latch.await(timing.forSessionSleep().forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); + // no extract CountDownLatch needs to be set here because candidate #1 will rely on candidate #0 + latchCandidate1.start(); timing.sleepABit(); - assertTrue(latch.hasLeadership()); - assertEquals(client.getChildren().forPath(latchPath).size(), 1); + + // triggers the removal of the corresponding child node after candidate #0 retrieved the children + latchInitialLeader.close(); + + latchCandidate0.debugCheckLeaderShipLatch.countDown(); + + waitForALeader(Arrays.asList(latchCandidate0, latchCandidate1), new Timing()); + + assertTrue(latchCandidate0.hasLeadership() ^ latchCandidate1.hasLeadership()); + } finally + { + for (LeaderLatch latchToClose : Arrays.asList(latchInitialLeader, latchCandidate0, latchCandidate1)) + { + if ( latchToClose.getState() != LeaderLatch.State.CLOSED ) + { + latchToClose.close(); + } + } } } } From 2775855fc80fdea8433d5dedbb746f58ddd443c4 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 12 Sep 2022 00:07:48 +0800 Subject: [PATCH 11/20] print id in debug logs Signed-off-by: tison Co-authored-by: Matthias Pohl --- .../apache/curator/framework/recipes/leader/LeaderLatch.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index 56975088fd..e491a7347b 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -720,7 +720,7 @@ else if ( !oldValue && newValue ) private void setNode(String newValue) throws Exception { String oldPath = ourPath.getAndSet(newValue); - log.debug("setNode with oldPath: {}, newValue: {}", oldPath, newValue); + log.debug("setNode with id: {}, oldPath: {}, newValue: {}", id, oldPath, newValue); if ( oldPath != null ) { client.delete().guaranteed().inBackground().forPath(oldPath); From 54dae88f049be5082e5ab7e2ae5520a326ff1e40 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 14 Sep 2022 10:01:31 +0800 Subject: [PATCH 12/20] Apply suggestions from code review Co-authored-by: Matthias Pohl --- .../curator/framework/recipes/leader/TestLeaderLatch.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index cefee53a2e..96d065056d 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -234,7 +234,7 @@ public void testLeadershipElectionWhenNodeDisappearsAfterChildrenAreRetrieved() { latchInitialLeader.start(); - // we want to make sure that the leader gets leadership before other instances joining the party + // we want to make sure that the leader gets leadership before other instances are joining the party waitForALeader(Collections.singletonList(latchInitialLeader), new Timing()); // candidate #0 will wait for the leader to go away - this should happen after the child nodes are retrieved by candidate #0 @@ -243,7 +243,7 @@ public void testLeadershipElectionWhenNodeDisappearsAfterChildrenAreRetrieved() latchCandidate0.start(); timing.sleepABit(); - // no extract CountDownLatch needs to be set here because candidate #1 will rely on candidate #0 + // no extra CountDownLatch needs to be set here because candidate #1 will rely on candidate #0 latchCandidate1.start(); timing.sleepABit(); From cc4c148a1ee3d6513f4b9f2b5fe462caee8e41fe Mon Sep 17 00:00:00 2001 From: tison Date: Sat, 17 Sep 2022 09:39:06 +0800 Subject: [PATCH 13/20] print id in log Signed-off-by: tison --- .../apache/curator/framework/recipes/leader/LeaderLatch.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index e491a7347b..c3bb72346f 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -587,7 +587,7 @@ private void checkLeadership(List children) throws Exception List sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children); int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1; - log.debug("checkLeadership with ourPath: {}, children: {}", localOurPath, sortedChildren); + log.debug("checkLeadership with id: {},ourPath: {}, children: {}", id, localOurPath, sortedChildren); if ( ourIndex < 0 ) { From ec84f755ea073b7bccc0298f77a070182131b246 Mon Sep 17 00:00:00 2001 From: tison Date: Sat, 17 Sep 2022 10:01:46 +0800 Subject: [PATCH 14/20] update test Signed-off-by: tison --- curator-recipes/pom.xml | 6 +++++ .../recipes/leader/TestLeaderLatch.java | 23 ++++++++++++++----- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/curator-recipes/pom.xml b/curator-recipes/pom.xml index d27d7fa3cb..b84a94896b 100644 --- a/curator-recipes/pom.xml +++ b/curator-recipes/pom.xml @@ -86,6 +86,12 @@ commons-math test + + + org.awaitility + awaitility + test + diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 96d065056d..9c63635d65 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -29,6 +29,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Queues; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.time.Duration; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.imps.TestCleanState; @@ -46,6 +47,7 @@ import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.test.compatibility.Timing2; import org.apache.curator.utils.CloseableUtils; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -223,6 +225,7 @@ public void testLeadershipElectionWhenNodeDisappearsAfterChildrenAreRetrieved() { final String latchPath = "/foo/bar"; final Timing2 timing = new Timing2(); + final Duration pollInterval = Duration.ofMillis(100); try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) { client.start(); @@ -234,18 +237,25 @@ public void testLeadershipElectionWhenNodeDisappearsAfterChildrenAreRetrieved() { latchInitialLeader.start(); - // we want to make sure that the leader gets leadership before other instances are joining the party + // we want to make sure that the leader gets leadership before other instances are going to join the party waitForALeader(Collections.singletonList(latchInitialLeader), new Timing()); - // candidate #0 will wait for the leader to go away - this should happen after the child nodes are retrieved by candidate #0 latchCandidate0.debugCheckLeaderShipLatch = new CountDownLatch(1); - latchCandidate0.start(); - timing.sleepABit(); + final int expectedChildrenAfterCandidate0Joins = 2; + Awaitility.await("There should be " + expectedChildrenAfterCandidate0Joins + " child nodes created after candidate #0 joins the leader election.") + .pollInterval(pollInterval) + .pollInSameThread() + .until(() -> client.getChildren().forPath(latchPath).size() == expectedChildrenAfterCandidate0Joins); // no extra CountDownLatch needs to be set here because candidate #1 will rely on candidate #0 latchCandidate1.start(); - timing.sleepABit(); + + final int expectedChildrenAfterCandidate1Joins = 3; + Awaitility.await("There should be " + expectedChildrenAfterCandidate1Joins + " child nodes created after candidate #1 joins the leader election.") + .pollInterval(pollInterval) + .pollInSameThread() + .until(() -> client.getChildren().forPath(latchPath).size() == expectedChildrenAfterCandidate1Joins); // triggers the removal of the corresponding child node after candidate #0 retrieved the children latchInitialLeader.close(); @@ -255,7 +265,8 @@ public void testLeadershipElectionWhenNodeDisappearsAfterChildrenAreRetrieved() waitForALeader(Arrays.asList(latchCandidate0, latchCandidate1), new Timing()); assertTrue(latchCandidate0.hasLeadership() ^ latchCandidate1.hasLeadership()); - } finally + } + finally { for (LeaderLatch latchToClose : Arrays.asList(latchInitialLeader, latchCandidate0, latchCandidate1)) { From bfec2325e82091b2fef81830acf27b3538e6f97d Mon Sep 17 00:00:00 2001 From: tison Date: Sat, 17 Sep 2022 11:13:51 +0800 Subject: [PATCH 15/20] harden tests Signed-off-by: tison --- .../framework/recipes/leader/LeaderLatch.java | 26 +++++++++++++++++-- .../recipes/leader/TestLeaderLatch.java | 5 +--- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index c3bb72346f..0852b543e7 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -190,6 +190,12 @@ public void close() throws IOException close(closeMode); } + // for testing + void closeOnDemand() throws IOException + { + internalClose(closeMode, false); + } + /** * Remove this instance from the leadership election. If this instance is the leader, leadership * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch @@ -198,9 +204,25 @@ public void close() throws IOException * @param closeMode allows the default close mode to be overridden at the time the latch is closed. * @throws IOException errors */ - public synchronized void close(CloseMode closeMode) throws IOException + public void close(CloseMode closeMode) throws IOException { - Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started"); + internalClose(closeMode, true); + } + + private synchronized void internalClose(CloseMode closeMode, boolean failOnClosed) throws IOException + { + if (!state.compareAndSet(State.STARTED, State.CLOSED)) + { + if (failOnClosed) + { + throw new IllegalStateException("Already closed or has not been started"); + } + else + { + return; + } + } + Preconditions.checkNotNull(closeMode, "closeMode cannot be null"); cancelStartTask(); diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index 9c63635d65..c263a6c7cc 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -270,10 +270,7 @@ public void testLeadershipElectionWhenNodeDisappearsAfterChildrenAreRetrieved() { for (LeaderLatch latchToClose : Arrays.asList(latchInitialLeader, latchCandidate0, latchCandidate1)) { - if ( latchToClose.getState() != LeaderLatch.State.CLOSED ) - { - latchToClose.close(); - } + latchToClose.closeOnDemand(); } } } From ccf5e5239c4e4616fc9acf346b9e506cc38a4ef7 Mon Sep 17 00:00:00 2001 From: tison Date: Sat, 17 Sep 2022 12:23:11 +0800 Subject: [PATCH 16/20] add dependencies Signed-off-by: tison --- curator-test-zk35/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/curator-test-zk35/pom.xml b/curator-test-zk35/pom.xml index 2e22ac825a..088249e57b 100644 --- a/curator-test-zk35/pom.xml +++ b/curator-test-zk35/pom.xml @@ -166,6 +166,12 @@ slf4j-log4j12 test + + + org.awaitility + awaitility + test + From b7e0e8ed5cd958dd9f18301464237189a0523251 Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Wed, 21 Sep 2022 05:07:23 +0300 Subject: [PATCH 17/20] [CURATOR-644][test] Extends test to verify that no new child is created if we haven't lost the child node after a reconnect. (#2) --- .../curator/framework/recipes/leader/TestLeaderLatch.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index c263a6c7cc..671e3c4b03 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -306,7 +306,8 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) client.getConnectionStateListenable().addListener(stateListener); client.start(); - latch = new LeaderLatch(client, "/test"); + final String latchPatch = "/test"; + latch = new LeaderLatch(client, latchPatch); LeaderLatchListener listener = new LeaderLatchListener() { @Override @@ -325,6 +326,7 @@ public void notLeader() latch.start(); assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name()); assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); + final List beforeResetChildren = client.getChildren().forPath(latchPatch); server.stop(); if ( isSessionIteration ) { @@ -342,6 +344,8 @@ public void notLeader() server.restart(); assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED.name()); assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); + final List afterResetChildren = client.getChildren().forPath(latchPatch); + assertEquals(beforeResetChildren, afterResetChildren); } } finally From bb72ecf0a6e5ab07fe565d2cf9de487a7824edc9 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 21 Sep 2022 16:22:02 +0800 Subject: [PATCH 18/20] Update curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java Co-authored-by: Matthias Pohl --- .../apache/curator/framework/recipes/leader/LeaderLatch.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index 0852b543e7..a289a7a6cb 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -609,7 +609,7 @@ private void checkLeadership(List children) throws Exception List sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children); int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1; - log.debug("checkLeadership with id: {},ourPath: {}, children: {}", id, localOurPath, sortedChildren); + log.debug("checkLeadership with id: {}, ourPath: {}, children: {}", id, localOurPath, sortedChildren); if ( ourIndex < 0 ) { From 781664f77fab32950dc29776f5d3aa90068eb2f2 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 27 Sep 2022 07:27:38 +0800 Subject: [PATCH 19/20] use @VisibleForTesting Signed-off-by: tison --- .../apache/curator/framework/recipes/leader/LeaderLatch.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index a289a7a6cb..5ad8f21138 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -190,7 +190,7 @@ public void close() throws IOException close(closeMode); } - // for testing + @VisibleForTesting void closeOnDemand() throws IOException { internalClose(closeMode, false); From e19e8f1c2375dc31238e44009903925da8b46a58 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 27 Sep 2022 09:08:06 +0800 Subject: [PATCH 20/20] revert looser condition Signed-off-by: tison --- .../apache/curator/framework/recipes/leader/LeaderLatch.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java index 5ad8f21138..e8187cecb4 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java @@ -692,7 +692,7 @@ protected void handleStateChange(ConnectionState newState) { try { - if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) ) + if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) || !hasLeadership.get() ) { getChildren(); }