Skip to content

MINOR: Restore interrupt status when closing#9863

Merged
ableegoldman merged 2 commits intoapache:trunkfrom
cadonna:improve_interruption
Jan 22, 2021
Merged

MINOR: Restore interrupt status when closing#9863
ableegoldman merged 2 commits intoapache:trunkfrom
cadonna:improve_interruption

Conversation

@cadonna
Copy link
Copy Markdown
Member

@cadonna cadonna commented Jan 11, 2021

We do not always own the thread that executes the close() method, i.e.,
we do not know the interruption policy of the thread. Thus, we should
not swallow the interruption. The least we can do is restoring the
interruption status before the current thread exits this method.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@cadonna
Copy link
Copy Markdown
Member Author

cadonna commented Jan 11, 2021

Call for review: @ableegoldman @wcarlson5

Copy link
Copy Markdown
Contributor

@wcarlson5 wcarlson5 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to keep the interrupted status, I have a few questions on the best way to do that though

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% sure this is the purpose of the interrupt flag but I think that this will do

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate on why you are not 100% sure?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it was to break threads out loops, I don't know if the flag is actually checked. So will passing it up be useful if the don't retrun

Copy link
Copy Markdown
Member Author

@cadonna cadonna Jan 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the words of "Java Concurrency in Practice":
"Thread interruption is a cooperative mechanism for a thread to signal another thread that it should, at its convenience and if it feels like it, stop what it is doing and do something else."
Interruption consists of the interruption status that can be set by the current thread itself or by another thread with Thread#interrupt(). The interruption status can be checked with Thread#isInterrupted() and it is checked by some blocking methods, most notably Object#wait() and Thread#sleep(), which -- if the current thread is interrupted -- clear the interruption status and throw an InterruptedException.
The "something else" means the application of an interruption policy. Since we do not always own the thread that calls close(), we do not know the interruption policy. So, the minimum that we can do is restoring the interrupted status if the current thread was interrupted so that further up the call stack of the current thread, the possibly existing interruption policy of the current thread can be applied.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense. thanks for explaining. I see why previously it was ignored but its probably best to restore it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to interrupt before we return? if that is the case why wait until the condition is full-filled?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not really interrupt, we restore the interruption state if the current thread was interrupted. What would be the alternative to restoring before we return?
If we do not wait until the condition is fulfilled, the while loop with the wait degenerates to busy waiting because usually the interruption status is checked at the beginning of the wait() method and we will run into the InterruptedException in each iteration which would defeat the purpose of the wait(). When the InterruptedException is thrown, the interruption status is reset.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we have this inside the try as well then we should return and set the interrupted status. That should be better for the user if they want to ignore the exception because they can be sure of the state change and avoid making this call again

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I see, moving the return into the try block would not change anything.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to break out of the loop here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think we should break the loop, because we actually do not know what interruption means for the current thread since we do not own it in general. It might be cancellation -- for which breaking the loop might make sense -- but it could also be something else.

Copy link
Copy Markdown
Member

@ableegoldman ableegoldman Jan 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should have a quick sync on this. My understanding is that an interrupt means that the thread wants to regain control somewhere along the callstack. So the only way I can see to interpret it is as "(1) stop blocking/waiting on whatever you're doing, (2) get the system back into a consistent state, and then (3) reset the flag so the interrupt can be handled (or not) by the caller". Before this PR we were doing (1) and (2), now we're doing (2) and (3), but why not all three?

If we don't break out of the loop then we've effectively ignored the interrupt, since we will go on waiting for it to reach NOT_RUNNING. But it's actually worse, since as you mentioned in another comment, it'll now be in a busy loop

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman Are there still open questions after our offline sync?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, I just forget about this PR -- thanks for the reminder

@ableegoldman
Copy link
Copy Markdown
Member

By the way, we now also swallow an InterruptedException in one other place: StreamThread#waitOnThreadState. Similar pattern, used for #removeThread instead of #close. Just wanted to bring it up since it seems like we should apply the same principle to that usage, but you may have missed it if you haven't rebased on trunk in the past 24 hours.

@cadonna cadonna force-pushed the improve_interruption branch from 7da96d9 to 435c66a Compare January 18, 2021 15:55
We do not always own the thread that executes the close()  method, i.e.,
we do not know the interruption policy of the thread. Thus, we should
not swallow the interruption. The least we can do is restoring the
interruption status before the current thread exits this method.
@cadonna cadonna force-pushed the improve_interruption branch from 435c66a to ae50d84 Compare January 22, 2021 16:49
Copy link
Copy Markdown
Member

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, will merge once the tests pass

try {
while (state != targetState) {
try {
stateLock.wait();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still a bit concerned about #removeStreamThread potentially hanging indefinitely...maybe we should add a timeout to this API? If we want to do so, it would need to be soon since the 2.8 release is coming up. WDYT @cadonna @wcarlson5 ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This particular bit would only hang if the thread is not shutting down. If that is happening maybe we are already in an unrecoverable state?

Overall I wouldn't mind having some sort of timeout but I am not very worried about it. @cadonna maybe you have any insights from writing the KIP?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm just paranoid from going through some incidents where the thread really did never shut down. I would agree we're probably in a bad & unrecoverable state at that point, but hanging indefinitely is not the right answer. What happens when a program freezes on your laptop? I usually just try to force quit it -- the user might want to do the same. But if we never return control of their thread then it's hard for them to know to shut it down

Copy link
Copy Markdown
Contributor

@wcarlson5 wcarlson5 Jan 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a fair concern. Based on the interrupt documentation I would hope that force quit doesn't use the interrupt path. As Bruno said

In the words of "Java Concurrency in Practice":
"Thread interruption is a cooperative mechanism for a thread to signal another thread that it should, at its convenience and if it feels like it, stop what it is doing and do something else."

The "at its convenience" is what makes me less concerned

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, I don't think force quit uses the interrupt path. But anyways I don't think it's possible to force quit a Kafka Streams application until we get around to making it a desktop application. I was just trying to draw a parallel -- my concern here is just that there is absolutely no way to recover should a StreamThread hang during shutdown, meaning you will just need to kill the process and do an unclean shutdown. With ALOS it's maybe not so bad, but with EOS an unclean shutdown requires wiping out all state stores and completely restoring them from scratch. So there is absolutely something to lose besides just the annoyance of having to manually kill the Streams process.

I just got a notification from a report of Streams being stuck during shutdown in an EOS application, so I promise I'm not making this up 🙂 . I can point you to it if you'd like -- unfortunately we never even managed to determine the root cause, so there may be more bugs out there which cause this besides just the ones we currently know

@ableegoldman
Copy link
Copy Markdown
Member

ableegoldman commented Jan 22, 2021

Two unrelated fail tests:

org.apache.kafka.clients.consumer.internals.FetcherTest.testEarlierOffsetResetArrivesLate() 
org.apache.kafka.clients.producer.KafkaProducerTest.testHeadersWithExtendedClasses()

@ableegoldman ableegoldman merged commit 7b06a24 into apache:trunk Jan 22, 2021
@mjsax mjsax added the streams label Jan 22, 2021
ijuma added a commit to ijuma/kafka that referenced this pull request Jan 26, 2021
…e-allocations-lz4

* apache-github/trunk: (562 commits)
  MINOR: remove unused code from MessageTest (apache#9961)
  MINOR: Fix visibility of Log.{unflushedMessages, addSegment} methods (apache#9966)
  KAFKA-12229: Restore original class loader in integration tests using EmbeddedConnectCluster during shutdown  (apache#9942)
  KAFKA-12190: Fix setting of file permissions on non-POSIX filesystems (apache#9947)
  MINOR: Remove `toStruct` and `fromStruct` methods from generated protocol classes (apache#9960)
  MINOR: Fix typo in Utils#toPositive (apache#9943)
  MINOR: MessageUtil: remove some deadcode (apache#9931)
  MINOR: Update zstd-jni to 1.4.8-2 (apache#9957)
  MINOR: Revert assertion in MockProducerTest (apache#9956)
  MINOR: Optimize assertions in unit tests (apache#9955)
  MINOR: Tag `RaftEventSimulationTest` as `integration` and tweak it (apache#9925)
  MINOR: Update to Gradle 6.8.1 (apache#9953)
  MINOR: A few small group coordinator cleanups (apache#9952)
  MINOR: Upgrade ducktape to version 0.8.1  (apache#9933)
  MINOR: fix record time in test shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing (apache#9948)
  MINOR: Restore interrupt status when closing (apache#9863)
  KAFKA-10357: Extract setup of repartition topics from Streams partition assignor (apache#9848)
  KAFKA-12212; Bump Metadata API version to remove `ClusterAuthorizedOperations` fields (KIP-700) (apache#9945)
  MINOR: log 2min processing summary of StreamThread loop (apache#9941)
  MINOR: Drop enable.metadata.quorum config (apache#9934)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants