Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
7272b85
fix the bug of double master when use LeaderLatch to select the leader
Oct 29, 2021
cf3d66f
Merge branch 'master' into fixbug/LeaderLatch-double-master
tisonkun Sep 25, 2022
106f705
Merge branch 'master' into fixbug/LeaderLatch-double-master
tisonkun Sep 27, 2022
492506c
fix test
tisonkun Sep 29, 2022
fd91873
[CURATOR-653] Fixes typo in name: rest -> reset
XComp Oct 10, 2022
d0a6ab5
[CURATOR-653] Aligns name of debugResetWaitForNodeDelete with other l…
XComp Oct 10, 2022
d1d3188
[CURATOR-653] Removes unused ExecutorService
XComp Oct 10, 2022
01788e5
[CURATOR-653] Adds final to local variables
XComp Oct 10, 2022
e109b2e
[CURATOR-653] Replace Timing with Timing2 and utilize it in timeout c…
XComp Oct 10, 2022
ace59cd
[CURATOR-653] Introduce a single finally block for closing any open r…
XComp Oct 10, 2022
25404ae
[CURATOR-653] Adds comment about wait after initialization
XComp Oct 10, 2022
8668dfd
[CURATOR-653] Adds comment about calling reset from within a separate…
XComp Oct 10, 2022
293411e
[CURATOR-653] Makes local variables latch1 and latch2 more descriptive
XComp Oct 10, 2022
70fbca5
[CURATOR-653] Makes test method name more descriptive
XComp Oct 10, 2022
361ef76
[CURATOR-653] Use lambda expression instead of more verbose anonymous…
XComp Oct 10, 2022
5a62adb
[hotfix] Removes unused imports
XComp Oct 10, 2022
adaee91
[CURATOR-653] Refactors test to use proper event processing based on ID
XComp Oct 10, 2022
f715c59
Empty commit to trigger CI once more
XComp Oct 12, 2022
a658431
naming
tisonkun Oct 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -540,10 +540,17 @@ public String getLastPathIsLeader()
@VisibleForTesting
volatile CountDownLatch debugResetWaitLatch = null;

@VisibleForTesting
volatile CountDownLatch debugResetWaitBeforeNodeDeleteLatch = null;

@VisibleForTesting
void reset() throws Exception
{
setLeadership(false);
if ( debugResetWaitBeforeNodeDeleteLatch != null )
{
debugResetWaitBeforeNodeDeleteLatch.await();
}
setNode(null);

BackgroundCallback callback = new BackgroundCallback()
Expand Down Expand Up @@ -623,6 +630,7 @@ else if ( ourIndex == 0 )
}
else
{
setLeadership(false);
String watchPath = sortedChildren.get(ourIndex - 1);
Watcher watcher = new Watcher()
{
Expand Down Expand Up @@ -726,7 +734,6 @@ protected void handleStateChange(ConnectionState newState)
private synchronized void setLeadership(boolean newValue)
{
boolean oldValue = hasLeadership.getAndSet(newValue);

if ( oldValue && !newValue )
{ // Lost leadership, was true, now false
listeners.forEach(LeaderLatchListener::notLeader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.ForkJoinPool;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.TestCleanState;
Expand Down Expand Up @@ -220,6 +224,153 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception
}
}

@Test
public void testResettingOfLeadershipAfterConcurrentLeadershipChange() throws Exception
{
final String latchPath = "/test";
final Timing2 timing = new Timing2();
final BlockingQueue<TestEvent> events = Queues.newLinkedBlockingQueue();

final List<Closeable> closeableResources = new ArrayList<>();
try
{
final String id0 = "id0";
final CuratorFramework client0 = createAndStartClient(server.getConnectString(), timing, id0, events);
closeableResources.add(client0);
final LeaderLatch latch0 = createAndStartLeaderLatch(client0, latchPath, id0, events);
closeableResources.add(latch0);

assertEquals(new TestEvent(id0, TestEventType.GAINED_CONNECTION), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
assertEquals(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));

final String id1 = "id1";
final CuratorFramework client1 = createAndStartClient(server.getConnectString(), timing, id1, events);
closeableResources.add(client1);
final LeaderLatch latch1 = createAndStartLeaderLatch(client1, latchPath, id1, events);
closeableResources.add(latch1);

assertEquals(new TestEvent(id1, TestEventType.GAINED_CONNECTION), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));

// wait for the non-leading LeaderLatch (i.e. latch1) instance to be done with its creation
// this call is time-consuming but necessary because we don't have a handle to detect the end of the reset call
timing.forWaiting().sleepABit();

assertTrue(latch0.hasLeadership());
assertFalse(latch1.hasLeadership());

latch1.debugResetWaitBeforeNodeDeleteLatch = new CountDownLatch(1);
latch1.debugResetWaitLatch = new CountDownLatch(1);
latch0.debugResetWaitLatch = new CountDownLatch(1);

// force latch0 and latch1 reset to trigger the actual test
latch0.reset();
// latch1 needs to be called within a separate thread since it's going to be blocked by the CountDownLatch outside an async call
ForkJoinPool.commonPool().submit(() -> {
latch1.reset();
return null;
});

// latch0.reset() will result in it losing its leadership, deleting its old child node and creating a new child node before being blocked by its debugResetWaitLatch
assertEquals(new TestEvent(id0, TestEventType.LOST_LEADERSHIP), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
// latch1.reset() is blocked but latch1 will gain leadership due its node watching latch0's node to be deleted
assertEquals(new TestEvent(id1, TestEventType.GAINED_LEADERSHIP), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));

assertFalse(latch0.hasLeadership());
assertTrue(latch1.hasLeadership());

// latch0.reset() continues with the getChildren call, finds itself not being the leader and starts listening to the node created by latch1
latch0.debugResetWaitLatch.countDown();
timing.sleepABit();

// latch1.reset() continues, deletes its old child node and creates a new child node before being blocked by its debugResetWaitLatch
latch1.debugResetWaitBeforeNodeDeleteLatch.countDown();

// latch0 receives NodeDeleteEvent and then finds itself to be the leader
assertEquals(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP), events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS));
assertTrue(latch0.hasLeadership());

// latch1.reset() continues and finds itself not being the leader
latch1.debugResetWaitLatch.countDown();
// this call is time-consuming but necessary because we don't have a handle to detect the end of the reset call
timing.forWaiting().sleepABit();

assertTrue(latch0.hasLeadership());
assertFalse(latch1.hasLeadership());
}
finally
{
// reverse is necessary for closing the LeaderLatch instances before closing the corresponding client
Collections.reverse(closeableResources);
closeableResources.forEach(CloseableUtils::closeQuietly);
}
}

private static CuratorFramework createAndStartClient(String zkConnectString, Timing2 timing, String id, Collection<TestEvent> events) {
final CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zkConnectString)
.connectionTimeoutMs(timing.connection())
.sessionTimeoutMs(timing.session())
.retryPolicy(new RetryOneTime(1))
.connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy())
.build();

client.getConnectionStateListenable().addListener((client1, newState) -> {
if ( newState == ConnectionState.CONNECTED )
{
events.add(new TestEvent(id, TestEventType.GAINED_CONNECTION));
}
});

client.start();

return client;
}

private static LeaderLatch createAndStartLeaderLatch(CuratorFramework client, String latchPath, String id, Collection<TestEvent> events) throws Exception
{
final LeaderLatch latch = new LeaderLatch(client, latchPath, id);
latch.addListener(new LeaderLatchListener() {
@Override
public void isLeader() {
events.add(new TestEvent(latch.getId(), TestEventType.GAINED_LEADERSHIP));
}

@Override
public void notLeader() {
events.add(new TestEvent(latch.getId(), TestEventType.LOST_LEADERSHIP));
}
});
latch.start();

return latch;
}

private enum TestEventType
{
GAINED_LEADERSHIP,
LOST_LEADERSHIP,
GAINED_CONNECTION;
}

private static class TestEvent {
private final String id;
private final TestEventType eventType;

public TestEvent(String id, TestEventType eventType)
{
this.id = id;
this.eventType = eventType;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TestEvent testEvent = (TestEvent) o;
return Objects.equals(id, testEvent.id) && eventType == testEvent.eventType;
}
}

@Test
public void testLeadershipElectionWhenNodeDisappearsAfterChildrenAreRetrieved() throws Exception
{
Expand Down