diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index ad9cdbeb7001a..aaa6e2f6e6201 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -250,20 +250,31 @@ public boolean isValidTransition(final State newState) { private boolean waitOnState(final State targetState, final long waitMs) { final long begin = time.milliseconds(); synchronized (stateLock) { + boolean interrupted = false; long elapsedMs = 0L; - while (state != targetState) { - if (waitMs > elapsedMs) { - final long remainingMs = waitMs - elapsedMs; - try { - stateLock.wait(remainingMs); - } catch (final InterruptedException e) { - // it is ok: just move on to the next iteration + try { + while (state != targetState) { + if (waitMs > elapsedMs) { + final long remainingMs = waitMs - elapsedMs; + try { + stateLock.wait(remainingMs); + } catch (final InterruptedException e) { + interrupted = true; + } + } else { + log.debug("Cannot transit to {} within {}ms", targetState, waitMs); + return false; } - } else { - log.debug("Cannot transit to {} within {}ms", targetState, waitMs); - return false; + elapsedMs = time.milliseconds() - begin; + } + } finally { + // Make sure to restore the interruption status before returning. + // We do not always own the current thread that executes this method, i.e., we do not know the + // interruption policy of the thread. The least we can do is restore the interruption status before + // the current thread exits this method. + if (interrupted) { + Thread.currentThread().interrupt(); } - elapsedMs = time.milliseconds() - begin; } return true; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 0d156474d8ccd..559f8fc6bfc48 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -612,11 +612,22 @@ public void setStreamsUncaughtExceptionHandler(final java.util.function.Consumer public void waitOnThreadState(final StreamThread.State targetState) { synchronized (stateLock) { - while (state != targetState) { - try { - stateLock.wait(); - } catch (final InterruptedException e) { - // it is ok: just move on to the next iteration + boolean interrupted = false; + try { + while (state != targetState) { + try { + stateLock.wait(); + } catch (final InterruptedException e) { + interrupted = true; + } + } + } finally { + // Make sure to restore the interruption status before returning. + // We do not always own the current thread that executes this method, i.e., we do not know the + // interruption policy of the thread. The least we can do is restore the interruption status before + // the current thread exits this method. + if (interrupted) { + Thread.currentThread().interrupt(); } } }