KAFKA-14208: Should not wake-up with non-blocking coordinator discovery#12611
KAFKA-14208: Should not wake-up with non-blocking coordinator discovery#12611guozhangwang wants to merge 1 commit intoapache:trunkfrom
Conversation
|
Call @philipnee @showuon for reviews. |
|
|
||
| // if we do not want to block on discovering coordinator at all, | ||
| // then we should not try to poll in a loop, and should not throw wake-up exception either | ||
| if (timer.timeoutMs() == 0L) { |
There was a problem hiding this comment.
I wonder if this breaks the API in a different way. If the user calls wakeup(), would we expect the next blocking call to trigger a wakeup even if it were called with a timeout of zero? I wonder if it would be better to overload ensureCoordinatorReady with an additional flag?
There was a problem hiding this comment.
would we expect the next blocking call to trigger a wakeup even if it were called with a timeout of zero?
Yes, I think as long as it's a blocking call, the wakeup exception should be thrown, even if it's zero timeout. I've checked existing javadoc in KafkaConsumer, all the methods, which declared wakeupException will be thrown, will still throw wakeupExceptions after this change. So, I think this PR change (1) won't break API, and (2) fixes the issue for commitAsync, which makes sense to me.
I wonder if it would be better to overload ensureCoordinatorReady with an additional flag?
Yes, as @guozhangwang mentioned in PR description:
In this PR I'm trying to fix it in a least intrusive way (a more general fix should be, potentially, to have two versions of ensureCoordinatorReady)
I think this fix is safer. But I don't have strong opinion about it, just want to raise the release timing issue here.
There was a problem hiding this comment.
Yeah, I feel it's a bit slippery to leave the logic in place with just a TODO somewhere to go back and fix it. Too many times, the follow-up never happens. Is it really that much additional effort to add an overload?
protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
return ensureCoordinatorReady(timer, true);
}
private synchronized boolean ensureCoordinatorReady(final Timer timer, boolean checkWakeup) {
...
showuon
left a comment
There was a problem hiding this comment.
The change makes sense to me. Thanks.
|
|
||
| // if we do not want to block on discovering coordinator at all, | ||
| // then we should not try to poll in a loop, and should not throw wake-up exception either | ||
| if (timer.timeoutMs() == 0L) { |
There was a problem hiding this comment.
would we expect the next blocking call to trigger a wakeup even if it were called with a timeout of zero?
Yes, I think as long as it's a blocking call, the wakeup exception should be thrown, even if it's zero timeout. I've checked existing javadoc in KafkaConsumer, all the methods, which declared wakeupException will be thrown, will still throw wakeupExceptions after this change. So, I think this PR change (1) won't break API, and (2) fixes the issue for commitAsync, which makes sense to me.
I wonder if it would be better to overload ensureCoordinatorReady with an additional flag?
Yes, as @guozhangwang mentioned in PR description:
In this PR I'm trying to fix it in a least intrusive way (a more general fix should be, potentially, to have two versions of ensureCoordinatorReady)
I think this fix is safer. But I don't have strong opinion about it, just want to raise the release timing issue here.
| try { | ||
| coordinator.joinGroupIfNeeded(mockTime.timer(0)); | ||
| fail("Should have woken up from joinGroupIfNeeded()"); | ||
| } catch (WakeupException ignored) { |
There was a problem hiding this comment.
nit:
assertThrows(WakeupException.class,
() -> coordinator.joinGroupIfNeeded(mockTime.timer(0)),
"Should have woken up from joinGroupIfNeeded()")| try { | ||
| coordinator.ensureCoordinatorReady(mockTime.timer(1)); | ||
| fail("Should have woken up from ensureCoordinatorReady()"); | ||
| } catch (WakeupException ignored) { | ||
| } |
There was a problem hiding this comment.
nit: can be replaced with assertThrows
|
I'll go ahead and close this since we're going to merge #12626. |
…et commits (#12626) Asynchronous offset commits may throw an unexpected WakeupException following #11631 and #12244. This patch fixes the problem by passing through a flag to ensureCoordinatorReady to indicate whether wakeups should be disabled. This is used to disable wakeups in the context of asynchronous offset commits. All other uses leave wakeups enabled. Note: this patch builds on top of #12611. Co-Authored-By: Guozhang Wang wangguoz@gmail.com Reviewers: Luke Chen <showuon@gmail.com>
…et commits (#12626) Asynchronous offset commits may throw an unexpected WakeupException following #11631 and #12244. This patch fixes the problem by passing through a flag to ensureCoordinatorReady to indicate whether wakeups should be disabled. This is used to disable wakeups in the context of asynchronous offset commits. All other uses leave wakeups enabled. Note: this patch builds on top of #12611. Co-Authored-By: Guozhang Wang wangguoz@gmail.com Reviewers: Luke Chen <showuon@gmail.com>
…et commits (#12626) Asynchronous offset commits may throw an unexpected WakeupException following #11631 and #12244. This patch fixes the problem by passing through a flag to ensureCoordinatorReady to indicate whether wakeups should be disabled. This is used to disable wakeups in the context of asynchronous offset commits. All other uses leave wakeups enabled. Note: this patch builds on top of #12611. Co-Authored-By: Guozhang Wang wangguoz@gmail.com Reviewers: Luke Chen <showuon@gmail.com>
Today we may try to discover coordinator in both blocking (e.g. in
poll) and non-blocking (e.g. incommitAsync) way. For the latter we would poll the underlying network client with timeout 0, and in this case we should not trigger wakeup since these are non-blocking calls and hence should not throw wake-ups.In this PR I'm trying to fix it in a least intrusive way (a more general fix should be, potentially, to have two versions of
ensureCoordinatorReady), since in our threading refactoring, theensureCoordinatorReadyfunction would not be called by the calling thread any more and only triggered by the background thread, and hence we would have a much simpler manner to ensure that non-blocking functions never throw wake-ups.Committer Checklist (excluded from commit message)