Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -250,20 +250,31 @@ public boolean isValidTransition(final State newState) {
private boolean waitOnState(final State targetState, final long waitMs) {
final long begin = time.milliseconds();
synchronized (stateLock) {
boolean interrupted = false;
long elapsedMs = 0L;
while (state != targetState) {
if (waitMs > elapsedMs) {
final long remainingMs = waitMs - elapsedMs;
try {
stateLock.wait(remainingMs);
} catch (final InterruptedException e) {
// it is ok: just move on to the next iteration
try {
while (state != targetState) {
if (waitMs > elapsedMs) {
final long remainingMs = waitMs - elapsedMs;
try {
stateLock.wait(remainingMs);
} catch (final InterruptedException e) {
interrupted = true;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to break out of the loop here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think we should break the loop, because we actually do not know what interruption means for the current thread since we do not own it in general. It might be cancellation -- for which breaking the loop might make sense -- but it could also be something else.

Copy link
Copy Markdown
Member

@ableegoldman ableegoldman Jan 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should have a quick sync on this. My understanding is that an interrupt means that the thread wants to regain control somewhere along the callstack. So the only way I can see to interpret it is as "(1) stop blocking/waiting on whatever you're doing, (2) get the system back into a consistent state, and then (3) reset the flag so the interrupt can be handled (or not) by the caller". Before this PR we were doing (1) and (2), now we're doing (2) and (3), but why not all three?

If we don't break out of the loop then we've effectively ignored the interrupt, since we will go on waiting for it to reach NOT_RUNNING. But it's actually worse, since as you mentioned in another comment, it'll now be in a busy loop

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman Are there still open questions after our offline sync?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, I just forget about this PR -- thanks for the reminder

}
} else {
log.debug("Cannot transit to {} within {}ms", targetState, waitMs);
return false;
}
} else {
log.debug("Cannot transit to {} within {}ms", targetState, waitMs);
return false;
elapsedMs = time.milliseconds() - begin;
}
} finally {
// Make sure to restore the interruption status before returning.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to interrupt before we return? if that is the case why wait until the condition is full-filled?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not really interrupt, we restore the interruption state if the current thread was interrupted. What would be the alternative to restoring before we return?
If we do not wait until the condition is fulfilled, the while loop with the wait degenerates to busy waiting because usually the interruption status is checked at the beginning of the wait() method and we will run into the InterruptedException in each iteration which would defeat the purpose of the wait(). When the InterruptedException is thrown, the interruption status is reset.

// We do not always own the current thread that executes this method, i.e., we do not know the
// interruption policy of the thread. The least we can do is restore the interruption status before
// the current thread exits this method.
if (interrupted) {
Thread.currentThread().interrupt();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% sure this is the purpose of the interrupt flag but I think that this will do

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate on why you are not 100% sure?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it was to break threads out loops, I don't know if the flag is actually checked. So will passing it up be useful if the don't retrun

Copy link
Copy Markdown
Member Author

@cadonna cadonna Jan 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the words of "Java Concurrency in Practice":
"Thread interruption is a cooperative mechanism for a thread to signal another thread that it should, at its convenience and if it feels like it, stop what it is doing and do something else."
Interruption consists of the interruption status that can be set by the current thread itself or by another thread with Thread#interrupt(). The interruption status can be checked with Thread#isInterrupted() and it is checked by some blocking methods, most notably Object#wait() and Thread#sleep(), which -- if the current thread is interrupted -- clear the interruption status and throw an InterruptedException.
The "something else" means the application of an interruption policy. Since we do not always own the thread that calls close(), we do not know the interruption policy. So, the minimum that we can do is restoring the interrupted status if the current thread was interrupted so that further up the call stack of the current thread, the possibly existing interruption policy of the current thread can be applied.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense. thanks for explaining. I see why previously it was ignored but its probably best to restore it.

}
elapsedMs = time.milliseconds() - begin;
}
return true;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we have this inside the try as well then we should return and set the interrupted status. That should be better for the user if they want to ignore the exception because they can be sure of the state change and avoid making this call again

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I see, moving the return into the try block would not change anything.

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,11 +612,22 @@ public void setStreamsUncaughtExceptionHandler(final java.util.function.Consumer

public void waitOnThreadState(final StreamThread.State targetState) {
synchronized (stateLock) {
while (state != targetState) {
try {
stateLock.wait();
} catch (final InterruptedException e) {
// it is ok: just move on to the next iteration
boolean interrupted = false;
try {
while (state != targetState) {
try {
stateLock.wait();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still a bit concerned about #removeStreamThread potentially hanging indefinitely...maybe we should add a timeout to this API? If we want to do so, it would need to be soon since the 2.8 release is coming up. WDYT @cadonna @wcarlson5 ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This particular bit would only hang if the thread is not shutting down. If that is happening maybe we are already in an unrecoverable state?

Overall I wouldn't mind having some sort of timeout but I am not very worried about it. @cadonna maybe you have any insights from writing the KIP?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm just paranoid from going through some incidents where the thread really did never shut down. I would agree we're probably in a bad & unrecoverable state at that point, but hanging indefinitely is not the right answer. What happens when a program freezes on your laptop? I usually just try to force quit it -- the user might want to do the same. But if we never return control of their thread then it's hard for them to know to shut it down

Copy link
Copy Markdown
Contributor

@wcarlson5 wcarlson5 Jan 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a fair concern. Based on the interrupt documentation I would hope that force quit doesn't use the interrupt path. As Bruno said

In the words of "Java Concurrency in Practice":
"Thread interruption is a cooperative mechanism for a thread to signal another thread that it should, at its convenience and if it feels like it, stop what it is doing and do something else."

The "at its convenience" is what makes me less concerned

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, I don't think force quit uses the interrupt path. But anyways I don't think it's possible to force quit a Kafka Streams application until we get around to making it a desktop application. I was just trying to draw a parallel -- my concern here is just that there is absolutely no way to recover should a StreamThread hang during shutdown, meaning you will just need to kill the process and do an unclean shutdown. With ALOS it's maybe not so bad, but with EOS an unclean shutdown requires wiping out all state stores and completely restoring them from scratch. So there is absolutely something to lose besides just the annoyance of having to manually kill the Streams process.

I just got a notification from a report of Streams being stuck during shutdown in an EOS application, so I promise I'm not making this up 🙂 . I can point you to it if you'd like -- unfortunately we never even managed to determine the root cause, so there may be more bugs out there which cause this besides just the ones we currently know

} catch (final InterruptedException e) {
interrupted = true;
}
}
} finally {
// Make sure to restore the interruption status before returning.
// We do not always own the current thread that executes this method, i.e., we do not know the
// interruption policy of the thread. The least we can do is restore the interruption status before
// the current thread exits this method.
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
Expand Down