HOTFIX: only try to clear discover-coordinator future upon commit#12244
Conversation
dajac
left a comment
There was a problem hiding this comment.
@guozhangwang Thanks for the PR. Overall, the approach looks good to me. I left a few comments, mainly regarding the test cases.
| // awaitMetadataUpdate() in ensureCoordinatorReady initiates new connections with configured backoff and avoids the busy loop. | ||
| if (coordinatorUnknownAndUnready(timer)) { | ||
| return false; | ||
| if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) { |
There was a problem hiding this comment.
nit: I think that we need to remove if coordinator is unknown, make sure we lookup one and from the above comment (first sentence).
| public void testCommitAsyncWithUserAssignedType() { | ||
| subscriptions.assignFromUser(Collections.singleton(t1p)); | ||
| // should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE error | ||
| client.prepareResponse(groupCoordinatorResponse(node, Errors.COORDINATOR_NOT_AVAILABLE)); |
There was a problem hiding this comment.
Do we really need this? I thought that the whole point was to ensure that no requests are sent out when the manual mode is used.
| client.prepareResponse(groupCoordinatorResponse(node, Errors.COORDINATOR_NOT_AVAILABLE)); | ||
| // set timeout to 0 because we don't want to retry after the error | ||
| coordinator.poll(time.timer(0)); | ||
| assertTrue(coordinator.coordinatorUnknown()); |
There was a problem hiding this comment.
Should we also assert that there is not inflight requests after calling poll?
| @Test | ||
| public void testAutoCommitAsyncWithUserAssignedType() { | ||
| try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions) | ||
| ) { |
There was a problem hiding this comment.
nit: I would bring back the closing parenthesis and the opening curly brace on the previous line.
| ) { | ||
| subscriptions.assignFromUser(Collections.singleton(t1p)); | ||
| // should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE error | ||
| client.prepareResponse(groupCoordinatorResponse(node, Errors.COORDINATOR_NOT_AVAILABLE)); |
…try-to-discovery-upon-commit
guozhangwang
left a comment
There was a problem hiding this comment.
@dajac I addressed you comments and modifies the tests, also along with that I found a minor issue which is not fixed in my first commit (thanks to your suggestion on the unit tests!).
| client.awaitMetadataUpdate(timer); | ||
| } | ||
|
|
||
| // if there is pending coordinator requests, ensure they have a chance to be transmitted. |
There was a problem hiding this comment.
This is a major change while addressing @dajac 's comment: previously the manual assignment, the coordinator.poll call would not call networkClient.poll, which means that if the coordinator discovery does not complete within the commitAsync (note we call networkClient.poll twice in that function, so it's possible that function would complete the discovery), we would have no other places to poll the network client to complete the pending requests.
showuon
left a comment
There was a problem hiding this comment.
LGTM! Thanks for fixing it in a better way. Yes, I agree with this change, we will only find coordinator when necessary, which is a good improvement! Left a minor comment. Thank you.
|
|
||
| // should try to find coordinator since we are commit async | ||
| coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), (offsets, exception) -> { | ||
| throw new AssertionError("Commit should not get responses"); |
There was a problem hiding this comment.
nit: use fail instead, and we might need to log the callback parameters for troubleshooting.
fail("Commit should not get responses, but got offsets:" + offsets +", and exception:" + exception)
…try-to-discovery-upon-commit
|
Cherry-picking to 3.2 |
…2244) This is another way of fixing KAFKA-13563 other than #11631. Instead of letting the consumer to always try to discover coordinator in pool with either mode (subscribe / assign), we defer the clearance of discover future upon committing async only. More specifically, under manual assign mode, there are only three places where we need the coordinator: * commitAsync (both by the consumer itself or triggered by caller), this is where we want to fix. * commitSync, which we already try to re-discovery coordinator. * committed (both by the consumer itself based on reset policy, or triggered by caller), which we already try to re-discovery coordinator. The benefits are that for manual assign mode that does not try to trigger any of the above three, then we never would be discovering coordinator. The original fix in #11631 would let the consumer to discover coordinator even if none of the above operations are required. Reviewers: Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
…2244) (#12259) This is a cherrypick commit of 3.1. Another way of fixing KAFKA-13563 other than #11631. Instead of letting the consumer to always try to discover coordinator in pool with either mode (subscribe / assign), we defer the clearance of discover future upon committing async only. More specifically, under manual assign mode, there are only three places where we need the coordinator: commitAsync (both by the consumer itself or triggered by caller), this is where we want to fix. commitSync, which we already try to re-discovery coordinator. committed (both by the consumer itself based on reset policy, or triggered by caller), which we already try to re-discovery coordinator. The benefits are that for manual assign mode that does not try to trigger any of the above three, then we never would be discovering coordinator. The original fix in #11631 would let the consumer to discover coordinator even if none of the above operations are required. Reviewers: Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
…ache#12244) (apache#12259) This is a cherrypick commit of 3.1. Another way of fixing KAFKA-13563 other than apache#11631. Instead of letting the consumer to always try to discover coordinator in pool with either mode (subscribe / assign), we defer the clearance of discover future upon committing async only. More specifically, under manual assign mode, there are only three places where we need the coordinator: commitAsync (both by the consumer itself or triggered by caller), this is where we want to fix. commitSync, which we already try to re-discovery coordinator. committed (both by the consumer itself based on reset policy, or triggered by caller), which we already try to re-discovery coordinator. The benefits are that for manual assign mode that does not try to trigger any of the above three, then we never would be discovering coordinator. The original fix in apache#11631 would let the consumer to discover coordinator even if none of the above operations are required. Reviewers: Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
…ache#12244) This is another way of fixing KAFKA-13563 other than apache#11631. Instead of letting the consumer to always try to discover coordinator in pool with either mode (subscribe / assign), we defer the clearance of discover future upon committing async only. More specifically, under manual assign mode, there are only three places where we need the coordinator: * commitAsync (both by the consumer itself or triggered by caller), this is where we want to fix. * commitSync, which we already try to re-discovery coordinator. * committed (both by the consumer itself based on reset policy, or triggered by caller), which we already try to re-discovery coordinator. The benefits are that for manual assign mode that does not try to trigger any of the above three, then we never would be discovering coordinator. The original fix in apache#11631 would let the consumer to discover coordinator even if none of the above operations are required. Reviewers: Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
* HOTFIX: only try to clear discover-coordinator future upon commit (apache#12244) This is another way of fixing KAFKA-13563 other than apache#11631. Instead of letting the consumer to always try to discover coordinator in pool with either mode (subscribe / assign), we defer the clearance of discover future upon committing async only. More specifically, under manual assign mode, there are only three places where we need the coordinator: * commitAsync (both by the consumer itself or triggered by caller), this is where we want to fix. * commitSync, which we already try to re-discovery coordinator. * committed (both by the consumer itself based on reset policy, or triggered by caller), which we already try to re-discovery coordinator. The benefits are that for manual assign mode that does not try to trigger any of the above three, then we never would be discovering coordinator. The original fix in apache#11631 would let the consumer to discover coordinator even if none of the above operations are required. Reviewers: Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io> * HOTFIX: add space to avoid checkstyle failure Co-authored-by: Guozhang Wang <wangguoz@gmail.com>
…ache#12244) (apache#12259) (#723) This is a cherrypick commit of 3.1. Another way of fixing KAFKA-13563 other than apache#11631. Instead of letting the consumer to always try to discover coordinator in pool with either mode (subscribe / assign), we defer the clearance of discover future upon committing async only. More specifically, under manual assign mode, there are only three places where we need the coordinator: commitAsync (both by the consumer itself or triggered by caller), this is where we want to fix. commitSync, which we already try to re-discovery coordinator. committed (both by the consumer itself based on reset policy, or triggered by caller), which we already try to re-discovery coordinator. The benefits are that for manual assign mode that does not try to trigger any of the above three, then we never would be discovering coordinator. The original fix in apache#11631 would let the consumer to discover coordinator even if none of the above operations are required. Reviewers: Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io> Co-authored-by: Guozhang Wang <wangguoz@gmail.com>
…et commits (#12626) Asynchronous offset commits may throw an unexpected WakeupException following #11631 and #12244. This patch fixes the problem by passing through a flag to ensureCoordinatorReady to indicate whether wakeups should be disabled. This is used to disable wakeups in the context of asynchronous offset commits. All other uses leave wakeups enabled. Note: this patch builds on top of #12611. Co-Authored-By: Guozhang Wang wangguoz@gmail.com Reviewers: Luke Chen <showuon@gmail.com>
…et commits (#12626) Asynchronous offset commits may throw an unexpected WakeupException following #11631 and #12244. This patch fixes the problem by passing through a flag to ensureCoordinatorReady to indicate whether wakeups should be disabled. This is used to disable wakeups in the context of asynchronous offset commits. All other uses leave wakeups enabled. Note: this patch builds on top of #12611. Co-Authored-By: Guozhang Wang wangguoz@gmail.com Reviewers: Luke Chen <showuon@gmail.com>
…et commits (#12626) Asynchronous offset commits may throw an unexpected WakeupException following #11631 and #12244. This patch fixes the problem by passing through a flag to ensureCoordinatorReady to indicate whether wakeups should be disabled. This is used to disable wakeups in the context of asynchronous offset commits. All other uses leave wakeups enabled. Note: this patch builds on top of #12611. Co-Authored-By: Guozhang Wang wangguoz@gmail.com Reviewers: Luke Chen <showuon@gmail.com>
This is another way of fixing KAFKA-13563 other than #11631.
Instead of letting the consumer to always try to discover coordinator in pool with either mode (subscribe / assign), we defer the clearance of discover future upon committing async only. More specifically, under manual assign mode, there are only three places where we need the coordinator:
The benefits are that for manual assign mode that does not try to trigger any of the above three, then we never would be discovering coordinator. The original fix in #11631 would let the consumer to discover coordinator even if none of the above operations are required.
Committer Checklist (excluded from commit message)