Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions curator-recipes/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@
<artifactId>commons-math</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ public void close() throws IOException
close(closeMode);
}

@VisibleForTesting
void closeOnDemand() throws IOException
{
internalClose(closeMode, false);
}
Comment thread
tisonkun marked this conversation as resolved.

/**
* Remove this instance from the leadership election. If this instance is the leader, leadership
* is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
Expand All @@ -198,9 +204,25 @@ public void close() throws IOException
* @param closeMode allows the default close mode to be overridden at the time the latch is closed.
* @throws IOException errors
*/
public synchronized void close(CloseMode closeMode) throws IOException
public void close(CloseMode closeMode) throws IOException
{
Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
internalClose(closeMode, true);
}

private synchronized void internalClose(CloseMode closeMode, boolean failOnClosed) throws IOException
{
if (!state.compareAndSet(State.STARTED, State.CLOSED))
{
if (failOnClosed)
{
throw new IllegalStateException("Already closed or has not been started");
}
else
{
return;
}
Comment thread
tisonkun marked this conversation as resolved.
}

Preconditions.checkNotNull(closeMode, "closeMode cannot be null");

cancelStartTask();
Expand Down Expand Up @@ -586,6 +608,9 @@ private void checkLeadership(List<String> children) throws Exception
final String localOurPath = ourPath.get();
List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;

log.debug("checkLeadership with id: {}, ourPath: {}, children: {}", id, localOurPath, sortedChildren);

if ( ourIndex < 0 )
{
log.error("Can't find our node. Resetting. Index: " + ourIndex);
Expand All @@ -604,7 +629,7 @@ else if ( ourIndex == 0 )
@Override
public void process(WatchedEvent event)
{
if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
Comment thread
tisonkun marked this conversation as resolved.
if ( state.get() == State.STARTED && event.getType() == Event.EventType.NodeDeleted )
{
try
{
Expand All @@ -626,8 +651,8 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex
{
if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
{
// previous node is gone - reset
reset();
// previous node is gone - retry getChildren
getChildren();
}
}
};
Expand Down Expand Up @@ -669,7 +694,7 @@ protected void handleStateChange(ConnectionState newState)
{
if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) || !hasLeadership.get() )
Comment thread
tisonkun marked this conversation as resolved.
Comment thread
tisonkun marked this conversation as resolved.
{
reset();
getChildren();
Comment thread
tisonkun marked this conversation as resolved.
}
}
catch ( Exception e )
Expand Down Expand Up @@ -717,6 +742,7 @@ else if ( !oldValue && newValue )
private void setNode(String newValue) throws Exception
{
String oldPath = ourPath.getAndSet(newValue);
log.debug("setNode with id: {}, oldPath: {}, newValue: {}", id, oldPath, newValue);
if ( oldPath != null )
{
client.delete().guaranteed().inBackground().forPath(oldPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Duration;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.TestCleanState;
Expand All @@ -46,6 +47,7 @@
import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.test.compatibility.Timing2;
import org.apache.curator.utils.CloseableUtils;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -218,6 +220,62 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception
}
}

@Test
public void testLeadershipElectionWhenNodeDisappearsAfterChildrenAreRetrieved() throws Exception
{
final String latchPath = "/foo/bar";
final Timing2 timing = new Timing2();
final Duration pollInterval = Duration.ofMillis(100);
try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)))
{
client.start();
LeaderLatch latchInitialLeader = new LeaderLatch(client, latchPath, "initial-leader");
LeaderLatch latchCandidate0 = new LeaderLatch(client, latchPath, "candidate-0");
LeaderLatch latchCandidate1 = new LeaderLatch(client, latchPath, "candidate-1");

try
{
latchInitialLeader.start();

// we want to make sure that the leader gets leadership before other instances are going to join the party
waitForALeader(Collections.singletonList(latchInitialLeader), new Timing());
// candidate #0 will wait for the leader to go away - this should happen after the child nodes are retrieved by candidate #0
latchCandidate0.debugCheckLeaderShipLatch = new CountDownLatch(1);
latchCandidate0.start();

final int expectedChildrenAfterCandidate0Joins = 2;
Awaitility.await("There should be " + expectedChildrenAfterCandidate0Joins + " child nodes created after candidate #0 joins the leader election.")
.pollInterval(pollInterval)
.pollInSameThread()
.until(() -> client.getChildren().forPath(latchPath).size() == expectedChildrenAfterCandidate0Joins);
// no extra CountDownLatch needs to be set here because candidate #1 will rely on candidate #0
latchCandidate1.start();

final int expectedChildrenAfterCandidate1Joins = 3;
Awaitility.await("There should be " + expectedChildrenAfterCandidate1Joins + " child nodes created after candidate #1 joins the leader election.")
.pollInterval(pollInterval)
.pollInSameThread()
.until(() -> client.getChildren().forPath(latchPath).size() == expectedChildrenAfterCandidate1Joins);

// triggers the removal of the corresponding child node after candidate #0 retrieved the children
latchInitialLeader.close();

latchCandidate0.debugCheckLeaderShipLatch.countDown();

waitForALeader(Arrays.asList(latchCandidate0, latchCandidate1), new Timing());

assertTrue(latchCandidate0.hasLeadership() ^ latchCandidate1.hasLeadership());
}
finally
{
for (LeaderLatch latchToClose : Arrays.asList(latchInitialLeader, latchCandidate0, latchCandidate1))
{
latchToClose.closeOnDemand();
}
}
}
}

@Test
public void testSessionErrorPolicy() throws Exception
{
Expand Down Expand Up @@ -248,7 +306,8 @@ public void stateChanged(CuratorFramework client, ConnectionState newState)
client.getConnectionStateListenable().addListener(stateListener);
client.start();

latch = new LeaderLatch(client, "/test");
final String latchPatch = "/test";
latch = new LeaderLatch(client, latchPatch);
LeaderLatchListener listener = new LeaderLatchListener()
{
@Override
Expand All @@ -267,6 +326,7 @@ public void notLeader()
latch.start();
assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
final List<String> beforeResetChildren = client.getChildren().forPath(latchPatch);
server.stop();
if ( isSessionIteration )
{
Expand All @@ -284,6 +344,8 @@ public void notLeader()
server.restart();
assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED.name());
assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
final List<String> afterResetChildren = client.getChildren().forPath(latchPatch);
assertEquals(beforeResetChildren, afterResetChildren);
}
}
finally
Expand Down
6 changes: 6 additions & 0 deletions curator-test-zk35/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
Comment thread
tisonkun marked this conversation as resolved.
</dependencies>

<build>
Expand Down