Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
15 changes: 15 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,21 @@ jobs:
- name: Check license header
run: docker run --rm -v $(pwd):/github/workspace -u $(id -u):$(id -g) ghcr.io/korandoru/hawkeye-native:v1 check

- name: Cache Local Maven Repository
uses: actions/cache@v3
with:
path: ~/.m2/repository
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ runner.os }}-maven-
- name: Set up JDK
uses: actions/setup-java@v3
with:
java-version: 17
distribution: 'temurin'
- name: Check code style
run: ./mvnw spotless:check

unittest:
name: Unit tests
runs-on: ubuntu-latest
Expand Down
213 changes: 83 additions & 130 deletions curator-client/src/main/java/org/apache/curator/ConnectionState.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@

package org.apache.curator;

import java.io.Closeable;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.curator.drivers.EventTrace;
import org.apache.curator.drivers.OperationTrace;
import org.apache.curator.drivers.TracerDriver;
Expand All @@ -32,17 +40,8 @@
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

class ConnectionState implements Watcher, Closeable
{
class ConnectionState implements Watcher, Closeable {
private static final int MAX_BACKGROUND_EXCEPTIONS = 10;
private static final boolean LOG_EVENTS = Boolean.getBoolean(DebugUtils.PROPERTY_LOG_EVENTS);
private static final Logger log = LoggerFactory.getLogger(ConnectionState.class);
Expand All @@ -56,48 +55,46 @@ class ConnectionState implements Watcher, Closeable
private final AtomicLong instanceIndex = new AtomicLong();
private volatile long connectionStartMs = 0;

ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly)
{
ConnectionState(
ZookeeperFactory zookeeperFactory,
EnsembleProvider ensembleProvider,
int sessionTimeoutMs,
Watcher parentWatcher,
AtomicReference<TracerDriver> tracer,
boolean canBeReadOnly) {
this.ensembleProvider = ensembleProvider;
this.tracer = tracer;
if ( parentWatcher != null )
{
if (parentWatcher != null) {
parentWatchers.offer(parentWatcher);
}

handleHolder = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
}

ZooKeeper getZooKeeper() throws Exception
{
if ( SessionFailRetryLoop.sessionForThreadHasFailed() )
{
ZooKeeper getZooKeeper() throws Exception {
if (SessionFailRetryLoop.sessionForThreadHasFailed()) {
throw new SessionFailRetryLoop.SessionFailedException();
}

Exception exception = backgroundExceptions.poll();
if ( exception != null )
{
if (exception != null) {
new EventTrace("background-exceptions", tracer.get()).commit();
throw exception;
}

boolean localIsConnected = isConnected.get();
if ( !localIsConnected )
{
if (!localIsConnected) {
checkNewConnectionString();
}

return handleHolder.getZooKeeper();
}

boolean isConnected()
{
boolean isConnected() {
return isConnected.get();
}

void start() throws Exception
{
void start() throws Exception {
log.debug("Starting");
ensembleProvider.start();
reset();
Expand All @@ -107,101 +104,82 @@ void start() throws Exception
public void close() throws IOException {
close(0);
}

public void close(int waitForShutdownTimeoutMs) throws IOException {
log.debug("Closing");

CloseableUtils.closeQuietly(ensembleProvider);
try
{
try {
handleHolder.closeAndClear(waitForShutdownTimeoutMs);
}
catch ( Exception e )
{
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
throw new IOException(e);
}
finally
{
} finally {
isConnected.set(false);
}
}

void addParentWatcher(Watcher watcher)
{
void addParentWatcher(Watcher watcher) {
parentWatchers.offer(watcher);
}

void removeParentWatcher(Watcher watcher)
{
void removeParentWatcher(Watcher watcher) {
parentWatchers.remove(watcher);
}

long getInstanceIndex()
{
long getInstanceIndex() {
return instanceIndex.get();
}

int getLastNegotiatedSessionTimeoutMs()
{
int getLastNegotiatedSessionTimeoutMs() {
return lastNegotiatedSessionTimeoutMs.get();
}

@Override
public void process(WatchedEvent event)
{
if ( LOG_EVENTS )
{
public void process(WatchedEvent event) {
if (LOG_EVENTS) {
log.debug("ConnectState watcher: " + event);
}

if ( event.getType() == Watcher.Event.EventType.None )
{
if (event.getType() == Watcher.Event.EventType.None) {
boolean wasConnected = isConnected.get();
boolean newIsConnected = checkState(event.getState(), wasConnected);
if ( newIsConnected != wasConnected )
{
if (newIsConnected != wasConnected) {
isConnected.set(newIsConnected);
connectionStartMs = System.currentTimeMillis();
if ( newIsConnected )
{
if (newIsConnected) {
lastNegotiatedSessionTimeoutMs.set(handleHolder.getNegotiatedSessionTimeoutMs());
log.debug("Negotiated session timeout: " + lastNegotiatedSessionTimeoutMs.get());
}
}
}

for ( Watcher parentWatcher : parentWatchers )
{
for (Watcher parentWatcher : parentWatchers) {
OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());
parentWatcher.process(event);
trace.commit();
}
}

EnsembleProvider getEnsembleProvider()
{
EnsembleProvider getEnsembleProvider() {
return ensembleProvider;
}

synchronized void reset() throws Exception
{
synchronized void reset() throws Exception {
log.debug("reset");

instanceIndex.incrementAndGet();

isConnected.set(false);
connectionStartMs = System.currentTimeMillis();
handleHolder.closeAndReset();
handleHolder.getZooKeeper(); // initiate connection
handleHolder.getZooKeeper(); // initiate connection
}

private synchronized void checkNewConnectionString()
{
private synchronized void checkNewConnectionString() {
final String newConnectionString = handleHolder.getNewConnectionString();

if (newConnectionString != null)
{
if (newConnectionString != null) {
handleNewConnectionString(newConnectionString);
}
}
Expand All @@ -222,117 +200,92 @@ public long getSessionId() {
return sessionId;
}

private boolean checkState(Event.KeeperState state, boolean wasConnected)
{
private boolean checkState(Event.KeeperState state, boolean wasConnected) {
boolean isConnected = wasConnected;
boolean checkNewConnectionString = true;
switch ( state )
{
default:
case Disconnected:
{
isConnected = false;
break;
}
switch (state) {
default:
case Disconnected: {
isConnected = false;
break;
}

case SyncConnected:
case ConnectedReadOnly:
{
isConnected = true;
break;
}
case SyncConnected:
case ConnectedReadOnly: {
isConnected = true;
break;
}

case AuthFailed:
{
isConnected = false;
log.error("Authentication failed");
break;
}
case AuthFailed: {
isConnected = false;
log.error("Authentication failed");
break;
}

case Expired:
{
isConnected = false;
checkNewConnectionString = false;
handleExpiredSession();
break;
}
case Expired: {
isConnected = false;
checkNewConnectionString = false;
handleExpiredSession();
break;
}

case SaslAuthenticated:
{
// NOP
break;
}
case SaslAuthenticated: {
// NOP
break;
}
}
// the session expired is logged in handleExpiredSession, so not log here
if (state != Event.KeeperState.Expired) {
new EventTrace(state.toString(), tracer.get(), getSessionId()).commit();
}

if ( checkNewConnectionString )
{
if (checkNewConnectionString) {
String newConnectionString = handleHolder.getNewConnectionString();
if ( newConnectionString != null )
{
if (newConnectionString != null) {
handleNewConnectionString(newConnectionString);
}
}

return isConnected;
}

private void handleNewConnectionString(String newConnectionString)
{
private void handleNewConnectionString(String newConnectionString) {
log.info("Connection string changed to: " + newConnectionString);
new EventTrace("connection-string-changed", tracer.get(), getSessionId()).commit();

try
{
try {
ZooKeeper zooKeeper = handleHolder.getZooKeeper();
if ( zooKeeper == null )
{
if (zooKeeper == null) {
log.warn("Could not update the connection string because getZooKeeper() returned null.");
}
else
{
if ( ensembleProvider.updateServerListEnabled() )
{
} else {
if (ensembleProvider.updateServerListEnabled()) {
zooKeeper.updateServerList(newConnectionString);
handleHolder.resetConnectionString(newConnectionString);
}
else
{
} else {
reset();
}
}
}
catch ( Exception e )
{
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
queueBackgroundException(e);
}
}

private void handleExpiredSession()
{
private void handleExpiredSession() {
log.warn("Session expired event received");
new EventTrace("session-expired", tracer.get(), getSessionId()).commit();

try
{
try {
reset();
}
catch ( Exception e )
{
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
queueBackgroundException(e);
}
}

@SuppressWarnings({"ThrowableResultOfMethodCallIgnored"})
private void queueBackgroundException(Exception e)
{
while ( backgroundExceptions.size() >= MAX_BACKGROUND_EXCEPTIONS )
{
private void queueBackgroundException(Exception e) {
while (backgroundExceptions.size() >= MAX_BACKGROUND_EXCEPTIONS) {
backgroundExceptions.poll();
}
backgroundExceptions.offer(e);
Expand Down
Loading