From b95b94497a8205e19d0d52ebe240f59aedfbf8a1 Mon Sep 17 00:00:00 2001 From: Razin Bouzar Date: Fri, 31 May 2024 12:34:55 -0400 Subject: [PATCH 1/8] Addressing https://github.com/apache/druid/issues/16411 Added listener method that tracks ZK leader state --- .../discovery/CuratorDruidLeaderSelector.java | 160 ++++++++++-------- 1 file changed, 93 insertions(+), 67 deletions(-) diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java index 1aa77cec5fd1..acb9c72f1124 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java @@ -24,6 +24,7 @@ import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.framework.recipes.leader.LeaderLatchListener; import org.apache.curator.framework.recipes.leader.Participant; +import org.apache.curator.framework.state.ConnectionState; import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.guice.annotations.Self; @@ -39,9 +40,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicReference; -/** - * - */ public class CuratorDruidLeaderSelector implements DruidLeaderSelector { private static final EmittingLogger log = new EmittingLogger(CuratorDruidLeaderSelector.class); @@ -65,11 +63,10 @@ public CuratorDruidLeaderSelector(CuratorFramework curator, @Self DruidNode self this.curator = curator; this.self = self; this.latchPath = latchPath; - - // Creating a LeaderLatch here allows us to query for the current leader. We will not be considered for leadership - // election until LeaderLatch.start() is called in registerListener(). This allows clients to observe the current - // leader without being involved in the election. this.leaderLatch.set(createNewLeaderLatch()); + + // Adding ConnectionStateListener to handle session changes using a method reference + curator.getConnectionStateListenable().addListener(this::handleConnectionStateChanged); } private LeaderLatch createNewLeaderLatch() @@ -80,66 +77,62 @@ private LeaderLatch createNewLeaderLatch() private LeaderLatch createNewLeaderLatchWithListener() { final LeaderLatch newLeaderLatch = createNewLeaderLatch(); + newLeaderLatch.addListener(new LeaderLatchListener() + { + @Override + public void isLeader() + { + try { + if (leader) { + log.warn("I'm being asked to become leader. But I am already the leader. Ignored event."); + return; + } - newLeaderLatch.addListener( - new LeaderLatchListener() - { - @Override - public void isLeader() - { - try { - if (leader) { - log.warn("I'm being asked to become leader. But I am already the leader. Ignored event."); - return; - } - - leader = true; - term++; - listener.becomeLeader(); - } - catch (Exception ex) { - log.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit(); - - // give others a chance to become leader. - CloseableUtils.closeAndSuppressExceptions( - createNewLeaderLatchWithListener(), - e -> log.warn("Could not close old leader latch; continuing with new one anyway.") - ); - - leader = false; - try { - //Small delay before starting the latch so that others waiting are chosen to become leader. - Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); - leaderLatch.get().start(); - } - catch (Exception e) { - // If an exception gets thrown out here, then the node will zombie out 'cause it won't be looking for - // the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but - // Curator likes to have "throws Exception" on methods so it might happen... - log.makeAlert(e, "I am a zombie").emit(); - } - } + leader = true; + term++; + listener.becomeLeader(); + } + catch (Exception ex) { + log.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit(); + + // give others a chance to become leader. + CloseableUtils.closeAndSuppressExceptions( + createNewLeaderLatchWithListener(), + e -> log.warn("Could not close old leader latch; continuing with new one anyway.") + ); + + leader = false; + try { + // Small delay before starting the latch so that others waiting are chosen to become leader. + Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); + leaderLatch.get().start(); + } + catch (Exception e) { + // If an exception gets thrown out here, then the node will zombie out 'cause it won't be looking for + // the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but + // Curator likes to have "throws Exception" on methods so it might happen... + log.makeAlert(e, "I am a zombie").emit(); } + } + } - @Override - public void notLeader() - { - try { - if (!leader) { - log.warn("I'm being asked to stop being leader. But I am not the leader. Ignored event."); - return; - } - - leader = false; - listener.stopBeingLeader(); - } - catch (Exception ex) { - log.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to stopBeingLeader").emit(); - } + @Override + public void notLeader() + { + try { + if (!leader) { + log.warn("I'm being asked to stop being leader. But I am not the leader. Ignored event."); + return; } - }, - listenerExecutor - ); + + leader = false; + listener.stopBeingLeader(); + } + catch (Exception ex) { + log.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to stopBeingLeader").emit(); + } + } + }, listenerExecutor); return leaderLatch.getAndSet(newLeaderLatch); } @@ -186,10 +179,10 @@ public void registerListener(DruidLeaderSelector.Listener listener) try { this.listener = listener; this.listenerExecutor = Execs.singleThreaded( - StringUtils.format( - "LeaderSelector[%s]", - StringUtils.encodeForFormat(latchPath) - ) + StringUtils.format( + "LeaderSelector[%s]", + StringUtils.encodeForFormat(latchPath) + ) ); createNewLeaderLatchWithListener(); @@ -215,4 +208,37 @@ public void unregisterListener() CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e -> log.warn(e, "Failed to close LeaderLatch.")); listenerExecutor.shutdownNow(); } + + // Method to handle connection state changes + private void handleConnectionStateChanged(CuratorFramework client, ConnectionState newState) + { + switch (newState) { + case SUSPENDED: + case LOST: + recreateLeaderLatch(); + break; + case RECONNECTED: + // Connection reestablished, no action needed here + break; + default: + // Do nothing for other states + break; + } + } + + private void recreateLeaderLatch() + { + // Close existing leader latch + CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e -> log.warn(e, "Failed to close LeaderLatch.")); + + // Create and start a new leader latch + LeaderLatch newLeaderLatch = createNewLeaderLatchWithListener(); + try { + newLeaderLatch.start(); + } + catch (Exception ex) { + throw new RuntimeException("Failed to start new LeaderLatch after session change", ex); + } + leaderLatch.set(newLeaderLatch); + } } From c4ba9ff73a1934178e2cf6791eef4a0b97446fa9 Mon Sep 17 00:00:00 2001 From: Razin Bouzar Date: Fri, 31 May 2024 12:43:46 -0400 Subject: [PATCH 2/8] Eliminate whitespace --- .../curator/discovery/CuratorDruidLeaderSelector.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java index acb9c72f1124..3f394e0affbb 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java @@ -179,10 +179,10 @@ public void registerListener(DruidLeaderSelector.Listener listener) try { this.listener = listener; this.listenerExecutor = Execs.singleThreaded( - StringUtils.format( - "LeaderSelector[%s]", - StringUtils.encodeForFormat(latchPath) - ) + StringUtils.format( + "LeaderSelector[%s]", + StringUtils.encodeForFormat(latchPath) + ) ); createNewLeaderLatchWithListener(); From 90ea5e8c9126dcb4b5321abe1676ecc617480100 Mon Sep 17 00:00:00 2001 From: Razin Bouzar Date: Fri, 31 May 2024 17:56:23 -0400 Subject: [PATCH 3/8] Format cleanup --- .../discovery/CuratorDruidLeaderSelector.java | 37 ++++++++++++------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java index 3f394e0affbb..828add5e8ce9 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java @@ -65,7 +65,6 @@ public CuratorDruidLeaderSelector(CuratorFramework curator, @Self DruidNode self this.latchPath = latchPath; this.leaderLatch.set(createNewLeaderLatch()); - // Adding ConnectionStateListener to handle session changes using a method reference curator.getConnectionStateListenable().addListener(this::handleConnectionStateChanged); } @@ -97,13 +96,13 @@ public void isLeader() // give others a chance to become leader. CloseableUtils.closeAndSuppressExceptions( - createNewLeaderLatchWithListener(), - e -> log.warn("Could not close old leader latch; continuing with new one anyway.") + createNewLeaderLatchWithListener(), + e -> log.warn("Could not close old leader latch; continuing with new one anyway.") ); leader = false; try { - // Small delay before starting the latch so that others waiting are chosen to become leader. + //Small delay before starting the latch so that others waiting are chosen to become leader. Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); leaderLatch.get().start(); } @@ -132,7 +131,9 @@ public void notLeader() log.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to stopBeingLeader").emit(); } } - }, listenerExecutor); + }, + listenerExecutor + ); return leaderLatch.getAndSet(newLeaderLatch); } @@ -209,7 +210,9 @@ public void unregisterListener() listenerExecutor.shutdownNow(); } - // Method to handle connection state changes + /** + * Handles connection state changes. Recreates the leader latch if connection to zookeeper is lost. + */ private void handleConnectionStateChanged(CuratorFramework client, ConnectionState newState) { switch (newState) { @@ -228,17 +231,23 @@ private void handleConnectionStateChanged(CuratorFramework client, ConnectionSta private void recreateLeaderLatch() { - // Close existing leader latch - CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e -> log.warn(e, "Failed to close LeaderLatch.")); + // give others a chance to become leader. + CloseableUtils.closeAndSuppressExceptions( + createNewLeaderLatchWithListener(), + e -> log.warn("Could not close old leader latch; continuing with new one anyway.") + ); - // Create and start a new leader latch - LeaderLatch newLeaderLatch = createNewLeaderLatchWithListener(); + leader = false; try { - newLeaderLatch.start(); + //Small delay before starting the latch so that others waiting are chosen to become leader. + Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); + leaderLatch.get().start(); } - catch (Exception ex) { - throw new RuntimeException("Failed to start new LeaderLatch after session change", ex); + catch (Exception e) { + // If an exception gets thrown out here, then the node will zombie out 'cause it won't be looking for + // the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but + // Curator likes to have "throws Exception" on methods so it might happen... + log.makeAlert(e, "I am a zombie").emit(); } - leaderLatch.set(newLeaderLatch); } } From 1b55b69881ab5bd9ca2f4912c01b8045dc9badad Mon Sep 17 00:00:00 2001 From: razinbouzar Date: Fri, 31 May 2024 18:03:20 -0400 Subject: [PATCH 4/8] Revert "Format cleanup" This reverts commit 90ea5e8c9126dcb4b5321abe1676ecc617480100. --- .../discovery/CuratorDruidLeaderSelector.java | 37 +++++++------------ 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java index 828add5e8ce9..3f394e0affbb 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java @@ -65,6 +65,7 @@ public CuratorDruidLeaderSelector(CuratorFramework curator, @Self DruidNode self this.latchPath = latchPath; this.leaderLatch.set(createNewLeaderLatch()); + // Adding ConnectionStateListener to handle session changes using a method reference curator.getConnectionStateListenable().addListener(this::handleConnectionStateChanged); } @@ -96,13 +97,13 @@ public void isLeader() // give others a chance to become leader. CloseableUtils.closeAndSuppressExceptions( - createNewLeaderLatchWithListener(), - e -> log.warn("Could not close old leader latch; continuing with new one anyway.") + createNewLeaderLatchWithListener(), + e -> log.warn("Could not close old leader latch; continuing with new one anyway.") ); leader = false; try { - //Small delay before starting the latch so that others waiting are chosen to become leader. + // Small delay before starting the latch so that others waiting are chosen to become leader. Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); leaderLatch.get().start(); } @@ -131,9 +132,7 @@ public void notLeader() log.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to stopBeingLeader").emit(); } } - }, - listenerExecutor - ); + }, listenerExecutor); return leaderLatch.getAndSet(newLeaderLatch); } @@ -210,9 +209,7 @@ public void unregisterListener() listenerExecutor.shutdownNow(); } - /** - * Handles connection state changes. Recreates the leader latch if connection to zookeeper is lost. - */ + // Method to handle connection state changes private void handleConnectionStateChanged(CuratorFramework client, ConnectionState newState) { switch (newState) { @@ -231,23 +228,17 @@ private void handleConnectionStateChanged(CuratorFramework client, ConnectionSta private void recreateLeaderLatch() { - // give others a chance to become leader. - CloseableUtils.closeAndSuppressExceptions( - createNewLeaderLatchWithListener(), - e -> log.warn("Could not close old leader latch; continuing with new one anyway.") - ); + // Close existing leader latch + CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e -> log.warn(e, "Failed to close LeaderLatch.")); - leader = false; + // Create and start a new leader latch + LeaderLatch newLeaderLatch = createNewLeaderLatchWithListener(); try { - //Small delay before starting the latch so that others waiting are chosen to become leader. - Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); - leaderLatch.get().start(); + newLeaderLatch.start(); } - catch (Exception e) { - // If an exception gets thrown out here, then the node will zombie out 'cause it won't be looking for - // the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but - // Curator likes to have "throws Exception" on methods so it might happen... - log.makeAlert(e, "I am a zombie").emit(); + catch (Exception ex) { + throw new RuntimeException("Failed to start new LeaderLatch after session change", ex); } + leaderLatch.set(newLeaderLatch); } } From 2bd922f338b3d59700bf380014d04f9e22b8e551 Mon Sep 17 00:00:00 2001 From: razinbouzar Date: Sun, 2 Jun 2024 01:07:48 -0400 Subject: [PATCH 5/8] Incorporate feedback - Cleanup file formatting and comments - Reduce complexity of the first go by calling the recreateLeaderLatch in the notLeader() method --- .../discovery/CuratorDruidLeaderSelector.java | 139 ++++++++++-------- 1 file changed, 78 insertions(+), 61 deletions(-) diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java index 3f394e0affbb..9ea021cce71a 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java @@ -40,6 +40,9 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicReference; +/** + * + */ public class CuratorDruidLeaderSelector implements DruidLeaderSelector { private static final EmittingLogger log = new EmittingLogger(CuratorDruidLeaderSelector.class); @@ -63,9 +66,12 @@ public CuratorDruidLeaderSelector(CuratorFramework curator, @Self DruidNode self this.curator = curator; this.self = self; this.latchPath = latchPath; + + // Creating a LeaderLatch here allows us to query for the current leader. We will not be considered for leadership + // election until LeaderLatch.start() is called in registerListener(). This allows clients to observe the current + // leader without being involved in the election. this.leaderLatch.set(createNewLeaderLatch()); - // Adding ConnectionStateListener to handle session changes using a method reference curator.getConnectionStateListenable().addListener(this::handleConnectionStateChanged); } @@ -77,62 +83,67 @@ private LeaderLatch createNewLeaderLatch() private LeaderLatch createNewLeaderLatchWithListener() { final LeaderLatch newLeaderLatch = createNewLeaderLatch(); - newLeaderLatch.addListener(new LeaderLatchListener() - { - @Override - public void isLeader() - { - try { - if (leader) { - log.warn("I'm being asked to become leader. But I am already the leader. Ignored event."); - return; - } - leader = true; - term++; - listener.becomeLeader(); - } - catch (Exception ex) { - log.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit(); - - // give others a chance to become leader. - CloseableUtils.closeAndSuppressExceptions( - createNewLeaderLatchWithListener(), - e -> log.warn("Could not close old leader latch; continuing with new one anyway.") - ); - - leader = false; - try { - // Small delay before starting the latch so that others waiting are chosen to become leader. - Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); - leaderLatch.get().start(); - } - catch (Exception e) { - // If an exception gets thrown out here, then the node will zombie out 'cause it won't be looking for - // the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but - // Curator likes to have "throws Exception" on methods so it might happen... - log.makeAlert(e, "I am a zombie").emit(); + newLeaderLatch.addListener( + new LeaderLatchListener() + { + @Override + public void isLeader() + { + try { + if (leader) { + log.warn("I'm being asked to become leader. But I am already the leader. Ignored event."); + return; + } + + leader = true; + term++; + listener.becomeLeader(); + } + catch (Exception ex) { + log.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit(); + + // give others a chance to become leader. + CloseableUtils.closeAndSuppressExceptions( + createNewLeaderLatchWithListener(), + e -> log.warn("Could not close old leader latch; continuing with new one anyway.") + ); + + leader = false; + try { + //Small delay before starting the latch so that others waiting are chosen to become leader. + Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); + leaderLatch.get().start(); + } + catch (Exception e) { + // If an exception gets thrown out here, then the node will zombie out 'cause it won't be looking for + // the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but + // Curator likes to have "throws Exception" on methods so it might happen... + log.makeAlert(e, "I am a zombie").emit(); + } + } } - } - } - @Override - public void notLeader() - { - try { - if (!leader) { - log.warn("I'm being asked to stop being leader. But I am not the leader. Ignored event."); - return; + @Override + public void notLeader() + { + try { + if (!leader) { + log.warn("I'm being asked to stop being leader. But I am not the leader. Ignored event."); + return; + } + + leader = false; + listener.stopBeingLeader(); + recreateLeaderLatch(); + } + catch (Exception ex) { + log.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to stopBeingLeader").emit(); + } } - - leader = false; - listener.stopBeingLeader(); - } - catch (Exception ex) { - log.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to stopBeingLeader").emit(); - } - } - }, listenerExecutor); + }, + listenerExecutor + ); return leaderLatch.getAndSet(newLeaderLatch); } @@ -228,17 +239,23 @@ private void handleConnectionStateChanged(CuratorFramework client, ConnectionSta private void recreateLeaderLatch() { - // Close existing leader latch - CloseableUtils.closeAndSuppressExceptions(leaderLatch.get(), e -> log.warn(e, "Failed to close LeaderLatch.")); + // give others a chance to become leader. + CloseableUtils.closeAndSuppressExceptions( + createNewLeaderLatchWithListener(), + e -> log.warn("Could not close old leader latch; continuing with new one anyway.") + ); - // Create and start a new leader latch - LeaderLatch newLeaderLatch = createNewLeaderLatchWithListener(); + leader = false; try { - newLeaderLatch.start(); + //Small delay before starting the latch so that others waiting are chosen to become leader. + Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); + leaderLatch.get().start(); } - catch (Exception ex) { - throw new RuntimeException("Failed to start new LeaderLatch after session change", ex); + catch (Exception e) { + // If an exception gets thrown out here, then the node will zombie out 'cause it won't be looking for + // the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but + // Curator likes to have "throws Exception" on methods so it might happen... + log.makeAlert(e, "I am a zombie").emit(); } - leaderLatch.set(newLeaderLatch); } } From a037fafcae101a5f7201833a01702c15142d3f46 Mon Sep 17 00:00:00 2001 From: razinbouzar Date: Sun, 2 Jun 2024 01:11:49 -0400 Subject: [PATCH 6/8] Update server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java Co-authored-by: Kashif Faraz --- .../druid/curator/discovery/CuratorDruidLeaderSelector.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java index 9ea021cce71a..f20c4afa7ebb 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java @@ -220,7 +220,9 @@ public void unregisterListener() listenerExecutor.shutdownNow(); } - // Method to handle connection state changes + /** + * Handles connection state changes. Recreates the leader latch if connection to zookeeper is lost. + */ private void handleConnectionStateChanged(CuratorFramework client, ConnectionState newState) { switch (newState) { From d4ff87ccbbb0cd39072dd0f37057c8532aad4ed6 Mon Sep 17 00:00:00 2001 From: razinbouzar Date: Wed, 5 Jun 2024 10:33:56 -0400 Subject: [PATCH 7/8] Updates on feedback - Remove handleConnectionStateChagned method - Remove duplicate code and use recreate leader latch method - Handle LeaderLatch.State.CLOSED in the isLeader() function, log a warning. --- .../discovery/CuratorDruidLeaderSelector.java | 43 +++---------------- 1 file changed, 6 insertions(+), 37 deletions(-) diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java index 9ea021cce71a..b6b2edda9f6b 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java @@ -71,8 +71,6 @@ public CuratorDruidLeaderSelector(CuratorFramework curator, @Self DruidNode self // election until LeaderLatch.start() is called in registerListener(). This allows clients to observe the current // leader without being involved in the election. this.leaderLatch.set(createNewLeaderLatch()); - - curator.getConnectionStateListenable().addListener(this::handleConnectionStateChanged); } private LeaderLatch createNewLeaderLatch() @@ -91,6 +89,11 @@ private LeaderLatch createNewLeaderLatchWithListener() public void isLeader() { try { + if (newLeaderLatch.getState().equals(LeaderLatch.State.CLOSED)) { + log.warn("I'm being asked to become leader, but the latch is CLOSED. Ignored event."); + return; + } + if (leader) { log.warn("I'm being asked to become leader. But I am already the leader. Ignored event."); return; @@ -103,24 +106,7 @@ public void isLeader() catch (Exception ex) { log.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit(); - // give others a chance to become leader. - CloseableUtils.closeAndSuppressExceptions( - createNewLeaderLatchWithListener(), - e -> log.warn("Could not close old leader latch; continuing with new one anyway.") - ); - - leader = false; - try { - //Small delay before starting the latch so that others waiting are chosen to become leader. - Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); - leaderLatch.get().start(); - } - catch (Exception e) { - // If an exception gets thrown out here, then the node will zombie out 'cause it won't be looking for - // the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but - // Curator likes to have "throws Exception" on methods so it might happen... - log.makeAlert(e, "I am a zombie").emit(); - } + recreateLeaderLatch(); } } @@ -220,23 +206,6 @@ public void unregisterListener() listenerExecutor.shutdownNow(); } - // Method to handle connection state changes - private void handleConnectionStateChanged(CuratorFramework client, ConnectionState newState) - { - switch (newState) { - case SUSPENDED: - case LOST: - recreateLeaderLatch(); - break; - case RECONNECTED: - // Connection reestablished, no action needed here - break; - default: - // Do nothing for other states - break; - } - } - private void recreateLeaderLatch() { // give others a chance to become leader. From 4b63a2360de500b49084ed6ba119616b1acd42fa Mon Sep 17 00:00:00 2001 From: razinbouzar Date: Wed, 5 Jun 2024 10:49:04 -0400 Subject: [PATCH 8/8] Style check. Remove unused import --- .../druid/curator/discovery/CuratorDruidLeaderSelector.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java index b6b2edda9f6b..b2179f5d69d4 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java @@ -24,7 +24,6 @@ import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.framework.recipes.leader.LeaderLatchListener; import org.apache.curator.framework.recipes.leader.Participant; -import org.apache.curator.framework.state.ConnectionState; import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.guice.annotations.Self;