From d741b0a63e47b540c2f19f6d64c85ecc1e70a0fe Mon Sep 17 00:00:00 2001 From: Kezhu Wang Date: Fri, 1 Apr 2022 15:31:46 +0800 Subject: [PATCH 1/2] ZOOKEEPER-4508: Fix endless-loop in ClientCnxn.SendThread.run when all zk servers down The observable behavior is that client will not get expired event from watcher. The cause is twofold: 1. `updateLastSendAndHeard` is called in reconnection so the session will not timeout. 2. No `break` after session timeout in `ClientCnxn.SendThread.run`. --- .../java/org/apache/zookeeper/ClientCnxn.java | 10 ++++-- .../apache/zookeeper/test/ReconfigTest.java | 2 +- .../apache/zookeeper/test/SessionTest.java | 35 +++++++++++-------- .../apache/zookeeper/test/WatcherTest.java | 14 ++++++++ 4 files changed, 43 insertions(+), 18 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java index c29a2141d58..83d5e975c02 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java @@ -1192,7 +1192,6 @@ public void run() { startConnect(serverAddress); // Update now to start the connection timer right after we make a connection attempt clientCnxnSocket.updateNow(); - clientCnxnSocket.updateLastSendAndHeard(); } if (state.isConnected()) { @@ -1239,6 +1238,7 @@ public void run() { clientCnxnSocket.getIdleRecv(), Long.toHexString(sessionId)); LOG.warn(warnInfo); + changeZkState(States.CLOSED); throw new SessionTimeoutException(warnInfo); } if (state.isConnected()) { @@ -1305,7 +1305,12 @@ public void run() { if (state.isAlive()) { eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null)); } - eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null)); + if (closing) { + eventThread.queueEvent(new WatchedEvent(Event.EventType.None, KeeperState.Closed, null)); + } else if (state == States.CLOSED) { + eventThread.queueEvent(new WatchedEvent(Event.EventType.None, KeeperState.Expired, null)); + } + eventThread.queueEventOfDeath(); if (zooKeeperSaslClient != null) { zooKeeperSaslClient.shutdown(); @@ -1322,7 +1327,6 @@ private void cleanAndNotifyState() { eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null)); } clientCnxnSocket.updateNow(); - clientCnxnSocket.updateLastSendAndHeard(); } private void pingRwServer() throws RWServerFoundException { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java index 20ec82d0989..0ce363071c5 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java @@ -804,7 +804,7 @@ private void testPortChangeToBlockedPort(boolean testLeader) throws Exception { Thread.sleep(1000); zkArr[serverIndex].setData("/test", "teststr".getBytes(), -1); fail("New client connected to new client port!"); - } catch (KeeperException.ConnectionLossException e) { + } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) { // Exception is expected } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTest.java index 85f76c21381..1f288529da9 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTest.java @@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.fail; import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -90,10 +91,18 @@ public void tearDown() throws Exception { private static class CountdownWatcher implements Watcher { volatile CountDownLatch clientConnected = new CountDownLatch(1); + final CountDownLatch sessionTerminated = new CountDownLatch(1); public void process(WatchedEvent event) { - if (event.getState() == KeeperState.SyncConnected) { - clientConnected.countDown(); + switch (event.getState()) { + case SyncConnected: + clientConnected.countDown(); + break; + case AuthFailed: + case Expired: + case Closed: + sessionTerminated.countDown(); + break; } } @@ -286,17 +295,15 @@ public void testSessionStateNoDupStateReporting() throws IOException, Interrupte // shutdown the server serverFactory.shutdown(); - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - // ignore - } + watcher.sessionTerminated.await(); - // verify that the size is just 2 - ie connect then disconnect - // if the client attempts reconnect and we are not handling current - // state correctly (ie eventing on duplicate disconnects) then we'll - // see a disconnect for each failed connection attempt - assertEquals(2, watcher.states.size()); + // verify that there is no duplicated disconnected event. + List states = Arrays.asList( + KeeperState.SyncConnected, + KeeperState.Disconnected, + KeeperState.Expired + ); + assertEquals(states, watcher.states); zk.close(); } @@ -331,11 +338,11 @@ public void testSessionTimeoutAccess() throws Exception { private class DupWatcher extends CountdownWatcher { - public List states = new LinkedList<>(); + public List states = new LinkedList<>(); public void process(WatchedEvent event) { super.process(event); if (event.getType() == EventType.None) { - states.add(event); + states.add(event.getState()); } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherTest.java index 22da89a2e67..5afce28b012 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherTest.java @@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assertions.fail; import java.io.IOException; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -188,6 +189,19 @@ public void processResult(int rc, String path, Object ctx, Stat stat) { } + @Test + public void testWatcherExpiredAfterAllServerDown() throws Exception { + ZooKeeper zk = createClient(); + CompletableFuture expired = new CompletableFuture<>(); + zk.register(event -> { + if (event.getState() == Watcher.Event.KeeperState.Expired) { + expired.complete(null); + } + }); + stopServer(); + expired.join(); + } + @Test public void testWatcherCount() throws IOException, InterruptedException, KeeperException { ZooKeeper zk1 = null, zk2 = null; From d46d3212b63d8acebf7a0c592de50e27a0ec2331 Mon Sep 17 00:00:00 2001 From: Kezhu Wang Date: Fri, 19 May 2023 15:50:33 +0800 Subject: [PATCH 2/2] Use dedicated timeout value to expire session solely on client side timing --- .../java/org/apache/zookeeper/ClientCnxn.java | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java index 83d5e975c02..8c43961d796 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java @@ -164,6 +164,8 @@ static class AuthData { private int readTimeout; + private int expirationTimeout; + private final int sessionTimeout; private final ZKWatchManager watchManager; @@ -418,6 +420,7 @@ public ClientCnxn( this.connectTimeout = sessionTimeout / hostProvider.size(); this.readTimeout = sessionTimeout * 2 / 3; + this.expirationTimeout = sessionTimeout * 4 / 3; this.sendThread = new SendThread(clientCnxnSocket); this.eventThread = new EventThread(); @@ -814,6 +817,12 @@ public String toString() { } + private static class ConnectionTimeoutException extends IOException { + public ConnectionTimeoutException(String message) { + super(message); + } + } + private static class SessionTimeoutException extends IOException { private static final long serialVersionUID = 824482094072071178L; @@ -1232,7 +1241,7 @@ public void run() { to = connectTimeout - clientCnxnSocket.getIdleRecv(); } - if (to <= 0) { + if (expirationTimeout - clientCnxnSocket.getIdleRecv() <= 0) { String warnInfo = String.format( "Client session timed out, have not heard from server in %dms for session id 0x%s", clientCnxnSocket.getIdleRecv(), @@ -1240,6 +1249,12 @@ public void run() { LOG.warn(warnInfo); changeZkState(States.CLOSED); throw new SessionTimeoutException(warnInfo); + } else if (to <= 0) { + String warnInfo = String.format( + "Client connection timed out, have not heard from server in %dms for session id 0x%s", + clientCnxnSocket.getIdleRecv(), + Long.toHexString(sessionId)); + throw new ConnectionTimeoutException(warnInfo); } if (state.isConnected()) { //1000(1 second) is to prevent race condition missing to send the second ping @@ -1284,7 +1299,7 @@ public void run() { } else { LOG.warn( "Session 0x{} for server {}, Closing socket connection. " - + "Attempting reconnect except it is a SessionExpiredException.", + + "Attempting reconnect except it is a SessionExpiredException or SessionTimeoutException.", Long.toHexString(getSessionId()), serverAddress, e); @@ -1426,6 +1441,7 @@ void onConnected( } readTimeout = negotiatedSessionTimeout * 2 / 3; + expirationTimeout = negotiatedSessionTimeout * 4 / 3; connectTimeout = negotiatedSessionTimeout / hostProvider.size(); hostProvider.onConnected(); sessionId = _sessionId;