From 246fd698b2ea724014e0aba26a4c9f957cce34d0 Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Mon, 11 Jan 2021 16:11:43 +0100 Subject: [PATCH 1/2] MINOR: Restore interrupt status when closing We do not always own the thread that executes the close() method, i.e., we do not know the interruption policy of the thread. Thus, we should not swallow the interruption. The least we can do is restoring the interruption status before the current thread exits this method. --- .../apache/kafka/streams/KafkaStreams.java | 33 ++++++++++++------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index ad9cdbeb7001a..aaa6e2f6e6201 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -250,20 +250,31 @@ public boolean isValidTransition(final State newState) { private boolean waitOnState(final State targetState, final long waitMs) { final long begin = time.milliseconds(); synchronized (stateLock) { + boolean interrupted = false; long elapsedMs = 0L; - while (state != targetState) { - if (waitMs > elapsedMs) { - final long remainingMs = waitMs - elapsedMs; - try { - stateLock.wait(remainingMs); - } catch (final InterruptedException e) { - // it is ok: just move on to the next iteration + try { + while (state != targetState) { + if (waitMs > elapsedMs) { + final long remainingMs = waitMs - elapsedMs; + try { + stateLock.wait(remainingMs); + } catch (final InterruptedException e) { + interrupted = true; + } + } else { + log.debug("Cannot transit to {} within {}ms", targetState, waitMs); + return false; } - } else { - log.debug("Cannot transit to {} within {}ms", targetState, waitMs); - return false; + elapsedMs = time.milliseconds() - begin; + } + } finally { + // Make sure to restore the interruption status before returning. + // We do not always own the current thread that executes this method, i.e., we do not know the + // interruption policy of the thread. The least we can do is restore the interruption status before + // the current thread exits this method. + if (interrupted) { + Thread.currentThread().interrupt(); } - elapsedMs = time.milliseconds() - begin; } return true; } From ae50d84983f51ca7d1340a36634a14607964579f Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Mon, 18 Jan 2021 16:38:57 +0100 Subject: [PATCH 2/2] Add additional restoration of interruption status --- .../processor/internals/StreamThread.java | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 0d156474d8ccd..559f8fc6bfc48 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -612,11 +612,22 @@ public void setStreamsUncaughtExceptionHandler(final java.util.function.Consumer public void waitOnThreadState(final StreamThread.State targetState) { synchronized (stateLock) { - while (state != targetState) { - try { - stateLock.wait(); - } catch (final InterruptedException e) { - // it is ok: just move on to the next iteration + boolean interrupted = false; + try { + while (state != targetState) { + try { + stateLock.wait(); + } catch (final InterruptedException e) { + interrupted = true; + } + } + } finally { + // Make sure to restore the interruption status before returning. + // We do not always own the current thread that executes this method, i.e., we do not know the + // interruption policy of the thread. The least we can do is restore the interruption status before + // the current thread exits this method. + if (interrupted) { + Thread.currentThread().interrupt(); } } }