Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ static class AuthData {

private int readTimeout;

private int expirationTimeout;

private final int sessionTimeout;

private final ZKWatchManager watchManager;
Expand Down Expand Up @@ -418,6 +420,7 @@ public ClientCnxn(

this.connectTimeout = sessionTimeout / hostProvider.size();
this.readTimeout = sessionTimeout * 2 / 3;
this.expirationTimeout = sessionTimeout * 4 / 3;

this.sendThread = new SendThread(clientCnxnSocket);
this.eventThread = new EventThread();
Expand Down Expand Up @@ -814,6 +817,12 @@ public String toString() {

}

private static class ConnectionTimeoutException extends IOException {
public ConnectionTimeoutException(String message) {
super(message);
}
}

private static class SessionTimeoutException extends IOException {

private static final long serialVersionUID = 824482094072071178L;
Expand Down Expand Up @@ -1192,7 +1201,6 @@ public void run() {
startConnect(serverAddress);
// Update now to start the connection timer right after we make a connection attempt
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you removing updateLastSendAndHeard ? (here and there)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semantically, it is because we are not heard(lastHeard) anything here and there. If we update lastHeard in these two places, then getIdleRecv will be reset to 0 in every re-connect which will cause no SessionTimeoutException.

For lastSend, I think it does not matter as it is only used for ping in CONNECTED state after successful ConnectRequest which will updateLastSend. I don't see a reason for updateLastSend in these two place.

}

if (state.isConnected()) {
Expand Down Expand Up @@ -1233,13 +1241,20 @@ public void run() {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}

if (to <= 0) {
if (expirationTimeout - clientCnxnSocket.getIdleRecv() <= 0) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7% of developers fix this issue

THREAD_SAFETY_VIOLATION: Read/Write race. Non-private method ClientCnxn$SendThread.run() reads without synchronization from this.this$0.expirationTimeout. Potentially races with write in method ClientCnxn$SendThread.onConnected(...).
Reporting because this access may occur on a background thread.


ℹ️ Expand to see all @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.

String warnInfo = String.format(
"Client session timed out, have not heard from server in %dms for session id 0x%s",
clientCnxnSocket.getIdleRecv(),
Long.toHexString(sessionId));
LOG.warn(warnInfo);
changeZkState(States.CLOSED);
throw new SessionTimeoutException(warnInfo);
} else if (to <= 0) {
String warnInfo = String.format(
"Client connection timed out, have not heard from server in %dms for session id 0x%s",
clientCnxnSocket.getIdleRecv(),
Long.toHexString(sessionId));
throw new ConnectionTimeoutException(warnInfo);
}
if (state.isConnected()) {
//1000(1 second) is to prevent race condition missing to send the second ping
Expand Down Expand Up @@ -1284,7 +1299,7 @@ public void run() {
} else {
LOG.warn(
"Session 0x{} for server {}, Closing socket connection. "
+ "Attempting reconnect except it is a SessionExpiredException.",
+ "Attempting reconnect except it is a SessionExpiredException or SessionTimeoutException.",
Long.toHexString(getSessionId()),
serverAddress,
e);
Expand All @@ -1305,7 +1320,12 @@ public void run() {
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
}
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null));
if (closing) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, KeeperState.Closed, null));
} else if (state == States.CLOSED) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, KeeperState.Expired, null));
}
eventThread.queueEventOfDeath();

if (zooKeeperSaslClient != null) {
zooKeeperSaslClient.shutdown();
Expand All @@ -1322,7 +1342,6 @@ private void cleanAndNotifyState() {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
}
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
}

private void pingRwServer() throws RWServerFoundException {
Expand Down Expand Up @@ -1422,6 +1441,7 @@ void onConnected(
}

readTimeout = negotiatedSessionTimeout * 2 / 3;
expirationTimeout = negotiatedSessionTimeout * 4 / 3;
connectTimeout = negotiatedSessionTimeout / hostProvider.size();
hostProvider.onConnected();
sessionId = _sessionId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ private void testPortChangeToBlockedPort(boolean testLeader) throws Exception {
Thread.sleep(1000);
zkArr[serverIndex].setData("/test", "teststr".getBytes(), -1);
fail("New client connected to new client port!");
} catch (KeeperException.ConnectionLossException e) {
} catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
// Exception is expected
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.junit.jupiter.api.Assertions.fail;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -90,10 +91,18 @@ public void tearDown() throws Exception {
private static class CountdownWatcher implements Watcher {

volatile CountDownLatch clientConnected = new CountDownLatch(1);
final CountDownLatch sessionTerminated = new CountDownLatch(1);

public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
clientConnected.countDown();
switch (event.getState()) {
case SyncConnected:
clientConnected.countDown();
break;
case AuthFailed:
case Expired:
case Closed:
sessionTerminated.countDown();
break;
}
}

Expand Down Expand Up @@ -286,17 +295,15 @@ public void testSessionStateNoDupStateReporting() throws IOException, Interrupte
// shutdown the server
serverFactory.shutdown();

try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// ignore
}
watcher.sessionTerminated.await();

// verify that the size is just 2 - ie connect then disconnect
// if the client attempts reconnect and we are not handling current
// state correctly (ie eventing on duplicate disconnects) then we'll
// see a disconnect for each failed connection attempt
assertEquals(2, watcher.states.size());
// verify that there is no duplicated disconnected event.
List<KeeperState> states = Arrays.asList(
KeeperState.SyncConnected,
KeeperState.Disconnected,
KeeperState.Expired
);
assertEquals(states, watcher.states);

zk.close();
}
Expand Down Expand Up @@ -331,11 +338,11 @@ public void testSessionTimeoutAccess() throws Exception {

private class DupWatcher extends CountdownWatcher {

public List<WatchedEvent> states = new LinkedList<>();
public List<KeeperState> states = new LinkedList<>();
public void process(WatchedEvent event) {
super.process(event);
if (event.getType() == EventType.None) {
states.add(event);
states.add(event.getState());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -188,6 +189,19 @@ public void processResult(int rc, String path, Object ctx, Stat stat) {

}

@Test
public void testWatcherExpiredAfterAllServerDown() throws Exception {
ZooKeeper zk = createClient();
CompletableFuture<Void> expired = new CompletableFuture<>();
zk.register(event -> {
if (event.getState() == Watcher.Event.KeeperState.Expired) {
expired.complete(null);
}
});
stopServer();
expired.join();
}

@Test
public void testWatcherCount() throws IOException, InterruptedException, KeeperException {
ZooKeeper zk1 = null, zk2 = null;
Expand Down