KAFKA-7576: Fix shutdown of replica fetcher threads#5875
KAFKA-7576: Fix shutdown of replica fetcher threads#5875hachikuji merged 6 commits intoapache:trunkfrom
Conversation
|
Is the bug being fixed here a regression? |
6bea963 to
dabb207
Compare
|
@ijuma It is not exactly a regression since the bug was originally introduced under KAFKA-6051 which was in 1.1, the same release as dynamic config updates. That can result in an exception being thrown from replica fetcher shutdown, during broker shutdown or when replica fetchers are reduced using dynamic config update. In 2.0.1 and 2.1.0, the exception propagation was fixed under KAFKA-7464, which catches and ignores the exception. That may be ok during shutdown, but would result in resource leakage with dynamic config update. So this PR is essentially redoing the changes under KAFKA-6051 and KAFKA-7464. |
There was a problem hiding this comment.
Wouldn't it be better to have a way to initiate an orderly shutdown of KafkaClient? It seems a bit more general.
There was a problem hiding this comment.
@ijuma Thanks for the review. I have updated with a new method in KafkaClient to initiate shutdown. Not sure if it matches what you had in mind. Let me know what you think.
|
@hachikuji can you please review this one? |
|
retest this please |
hachikuji
left a comment
There was a problem hiding this comment.
Thanks @rajinisivaram. Looks good. Just had a few small comments.
There was a problem hiding this comment.
Was there a strong reason to check aborted sends before this?
There was a problem hiding this comment.
Not really, moved it to the start of the method.
There was a problem hiding this comment.
nit: the "shutdown" at the end seems unintended?
There was a problem hiding this comment.
This is kind of icky. I wonder if we should consider adding a field to ShutdownableThread to save the failure exception?
There was a problem hiding this comment.
Agree this is not nice. The first log entry comparison was added under KAFKA-7464. And I extended that to add another of the same type. I think we want the test to ensure that exceptions are not propagated to the caller and that we are closing the blockingSend instance. And that is tested by this subset of that test:
thread.initiateShutdown()
thread.awaitShutdown()
verify(mockBlockingSend)
We dont really need to verify the log entry at all for this. I suppose the log entry comparison checks that the test is correct by verifying that it did inject that exception, but not sure we really need to do that. It feels unnecessary to store away the exception in ShutdownableThread just to check this. What do you think?
There was a problem hiding this comment.
Yes, that makes sense to me.
There was a problem hiding this comment.
I guess there may not be a non-intrusive way to accomplish this more reliably. Perhaps we could check the size of the message queue?
There was a problem hiding this comment.
I was being lazy here, updated the test to wait for the server to receive one byte.
393120b to
b4c0a9b
Compare
|
@hachikuji Thanks for the review. I addressed some of the comments and left a couple of questions for the remaining. |
ReplicaFetcherThread.shutdown attempts to close the fetcher's Selector while the thread is running. This in unsafe and can result in `Selector.close()` failing with an exception. The exception is caught and logged at debug level, but this can lead to socket leak if the shutdown is due to dynamic config update rather than broker shutdown. This PR changes the shutdown logic to close Selector after the replica fetcher thread is shutdown, with a wakeup() and flag to terminate blocking sends first. Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
ReplicaFetcherThread.shutdown attempts to close the fetcher's Selector while the thread is running. This in unsafe and can result in `Selector.close()` failing with an exception. The exception is caught and logged at debug level, but this can lead to socket leak if the shutdown is due to dynamic config update rather than broker shutdown. This PR changes the shutdown logic to close Selector after the replica fetcher thread is shutdown, with a wakeup() and flag to terminate blocking sends first. Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
ReplicaFetcherThread.shutdown attempts to close the fetcher's Selector while the thread is running. This in unsafe and can result in `Selector.close()` failing with an exception. The exception is caught and logged at debug level, but this can lead to socket leak if the shutdown is due to dynamic config update rather than broker shutdown. This PR changes the shutdown logic to close Selector after the replica fetcher thread is shutdown, with a wakeup() and flag to terminate blocking sends first. Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
ReplicaFetcherThread.shutdown attempts to close the fetcher's Selector while the thread is running. This in unsafe and can result in `Selector.close()` failing with an exception. The exception is caught and logged at debug level, but this can lead to socket leak if the shutdown is due to dynamic config update rather than broker shutdown. This PR changes the shutdown logic to close Selector after the replica fetcher thread is shutdown, with a wakeup() and flag to terminate blocking sends first. Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
ReplicaFetcherThread.shutdown attempts to close the fetcher's Selector while the thread is running. This in unsafe and can result in
Selector.close()failing with an exception. The exception is caught and logged at debug level, but this can lead to socket leak if the shutdown is due to dynamic config update rather than broker shutdown. This PR changes the shutdown logic to close Selector after the replica fetcher thread is shutdown, with a wakeup() and flag to terminate blocking sends first.Committer Checklist (excluded from commit message)