KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode#11631
KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode#11631guozhangwang merged 5 commits intoapache:trunkfrom
Conversation
…le FindCoordinatorFuture correctly
| // For manually assigned partitions, if there are no ready nodes, await metadata. | ||
| // For manually assigned partitions, if coordinator is unknown, make sure we lookup one and await metadata. | ||
| // If connections to all nodes fail, wakeups triggered while attempting to send fetch | ||
| // requests result in polls returning immediately, causing a tight loop of polls. Without | ||
| // the wakeup, poll() with no channels would block for the timeout, delaying re-connection. | ||
| // awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop. | ||
| // When group management is used, metadata wait is already performed for this scenario as | ||
| // coordinator is unknown, hence this check is not required. | ||
| if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) { | ||
| client.awaitMetadataUpdate(timer); | ||
| // awaitMetadataUpdate() in ensureCoordinatorReady initiates new connections with configured backoff and avoids the busy loop. | ||
| if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) { | ||
| return false; |
There was a problem hiding this comment.
Before the change, we will wait for the metadata update if no nodes available, to avoid busy loop when in non consumer group mode. After the change, we did as the group management did, to call ensureCoordinatorReady when coordinator unknown. And in ensureCoordinatorReady. This way, we can also make sure to handle the FindCoordinatorFuture well (and clear it) inside ensureCoordinatorReady.
There was a problem hiding this comment.
I'm wondering if this fix is a bit overkill, e.g. for those consumers who do not even set the group ID and do not rely on the coordinator for anything, the coordinatorUnknown() would always return true while ensureCoordinatorReady would send a discover coordinator request with null group id.
|
@guozhangwang @ableegoldman , could you please take a look to see if this fix makes sense to you? I'll fix the broken tests after your first review. Thank you. |
guozhangwang
left a comment
There was a problem hiding this comment.
Hi @showuon thanks for reporting this bug. I agree with you it's indeed an issue, but I'm not sure about the fix, maybe you can elaborate a bit more about the motivation?
| // For manually assigned partitions, if there are no ready nodes, await metadata. | ||
| // For manually assigned partitions, if coordinator is unknown, make sure we lookup one and await metadata. | ||
| // If connections to all nodes fail, wakeups triggered while attempting to send fetch | ||
| // requests result in polls returning immediately, causing a tight loop of polls. Without | ||
| // the wakeup, poll() with no channels would block for the timeout, delaying re-connection. | ||
| // awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop. | ||
| // When group management is used, metadata wait is already performed for this scenario as | ||
| // coordinator is unknown, hence this check is not required. | ||
| if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) { | ||
| client.awaitMetadataUpdate(timer); | ||
| // awaitMetadataUpdate() in ensureCoordinatorReady initiates new connections with configured backoff and avoids the busy loop. | ||
| if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) { | ||
| return false; |
There was a problem hiding this comment.
I'm wondering if this fix is a bit overkill, e.g. for those consumers who do not even set the group ID and do not rely on the coordinator for anything, the coordinatorUnknown() would always return true while ensureCoordinatorReady would send a discover coordinator request with null group id.
|
@guozhangwang , thanks for your comment. Answering your question below:
No, if consumer doesn't provide group id config value (default is null), we won't create And before the PR, we will wait for the metadata update if no nodes available, to avoid busy loop when in non consumer group mode. if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {
client.awaitMetadataUpdate(timer);After the change, we did as the group management did, to call Does that make sense? |
|
@guozhangwang @ableegoldman , please take a look when available. Thanks. |
|
@guozhangwang @ableegoldman @hachikuji , please take a look when available. This issue blocks some users when upgrading Kafka-clients. I think this should be fixed soon. |
|
@showuon Sorry for getting late on this -- I thought it was not ready since the title still has |
guozhangwang
left a comment
There was a problem hiding this comment.
Hi @showuon thanks for the explanation! I now get it and agrees with you about the fix.
One minor comment still though: the modified logic is exactly the same as
Could we move the whole block before the if-else block and also update the related comments?
Other than that, LGTM!
|
@guozhangwang , thanks for your comments. Sorry for confusing you! And it makes sense to remove the |
| ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); | ||
|
|
||
| KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); | ||
| KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, null, groupInstanceId, false); |
There was a problem hiding this comment.
This test case is to test manual assignment without storing offsets. So we should not create groupID (the third parameter from the end)
test added for this issue: https://issues.apache.org/jira/browse/KAFKA-4034
| GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs, | ||
| ConsumerCoordinator consumerCoordinator = null; | ||
| if (groupId != null) { | ||
| GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs, |
There was a problem hiding this comment.
we should not create consumerCoordinator when groupID is null
| // in this case, we do an explicit seek, so there should be no need to query the coordinator at all | ||
| val consumer = createConsumer() | ||
| // remove the group.id config to avoid coordinator created | ||
| val consumer = createConsumer(configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) |
There was a problem hiding this comment.
Same as above.
This test case is to test manual assignment without storing offsets. So we should not create groupID (the third parameter from the end)
test added for this issue: https://issues.apache.org/jira/browse/KAFKA-4034
| private final Time time = new MockTime(); | ||
| private final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); | ||
| private final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); |
There was a problem hiding this comment.
side cleanup: create global variables to share in tests.
| // Always update the heartbeat last poll time so that the heartbeat thread does not leave the | ||
| // group proactively due to application inactivity even if (say) the coordinator cannot be found. | ||
| pollHeartbeat(timer.currentTimeMs()); | ||
| if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) { | ||
| if (coordinatorUnknownAndUnready(timer)) { |
There was a problem hiding this comment.
Could we move the whole block before the if-else block and also update the related comments?
We can't because we should always lookup the coordinator after heartbeat poll to avoid the heartbeat timeout in group management case.
|
@guozhangwang , this PR is good for review now. I've fixed broken tests and added tests. Thank you. |
| // Always update the heartbeat last poll time so that the heartbeat thread does not leave the | ||
| // group proactively due to application inactivity even if (say) the coordinator cannot be found. | ||
| pollHeartbeat(timer.currentTimeMs()); | ||
| if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) { | ||
| if (coordinatorUnknownAndUnready(timer)) { |
| GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs, | ||
| ConsumerCoordinator consumerCoordinator = null; | ||
| if (groupId != null) { | ||
| GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs, |
…11631) After KAFKA-10793, we clear the findCoordinatorFuture in 2 places: 1. heartbeat thread 2. AbstractCoordinator#ensureCoordinatorReady But in non consumer group mode with group id provided (for offset commitment. So that there will be consumerCoordinator created), there will be no (1)heartbeat thread , and it only call (2)AbstractCoordinator#ensureCoordinatorReady when 1st time consumer wants to fetch committed offset position. That is, after 2nd lookupCoordinator call, we have no chance to clear the findCoordinatorFuture , and causes the offset commit never succeeded. To avoid the race condition as KAFKA-10793 mentioned, it's not safe to clear the findCoordinatorFuture in the future listener. So, I think we can fix this issue by calling AbstractCoordinator#ensureCoordinatorReady when coordinator unknown in non consumer group case, under each ConsumerCoordinator#poll. Reviewers: Guozhang Wang <wangguoz@gmail.com>
|
Merged to trunk, thanks @showuon ! Also cherry-picked to 3.1. |
…pache#11631) After KAFKA-10793, we clear the findCoordinatorFuture in 2 places: 1. heartbeat thread 2. AbstractCoordinator#ensureCoordinatorReady But in non consumer group mode with group id provided (for offset commitment. So that there will be consumerCoordinator created), there will be no (1)heartbeat thread , and it only call (2)AbstractCoordinator#ensureCoordinatorReady when 1st time consumer wants to fetch committed offset position. That is, after 2nd lookupCoordinator call, we have no chance to clear the findCoordinatorFuture , and causes the offset commit never succeeded. To avoid the race condition as KAFKA-10793 mentioned, it's not safe to clear the findCoordinatorFuture in the future listener. So, I think we can fix this issue by calling AbstractCoordinator#ensureCoordinatorReady when coordinator unknown in non consumer group case, under each ConsumerCoordinator#poll. Reviewers: Guozhang Wang <wangguoz@gmail.com>
…2244) This is another way of fixing KAFKA-13563 other than #11631. Instead of letting the consumer to always try to discover coordinator in pool with either mode (subscribe / assign), we defer the clearance of discover future upon committing async only. More specifically, under manual assign mode, there are only three places where we need the coordinator: * commitAsync (both by the consumer itself or triggered by caller), this is where we want to fix. * commitSync, which we already try to re-discovery coordinator. * committed (both by the consumer itself based on reset policy, or triggered by caller), which we already try to re-discovery coordinator. The benefits are that for manual assign mode that does not try to trigger any of the above three, then we never would be discovering coordinator. The original fix in #11631 would let the consumer to discover coordinator even if none of the above operations are required. Reviewers: Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
…2244) This is another way of fixing KAFKA-13563 other than #11631. Instead of letting the consumer to always try to discover coordinator in pool with either mode (subscribe / assign), we defer the clearance of discover future upon committing async only. More specifically, under manual assign mode, there are only three places where we need the coordinator: * commitAsync (both by the consumer itself or triggered by caller), this is where we want to fix. * commitSync, which we already try to re-discovery coordinator. * committed (both by the consumer itself based on reset policy, or triggered by caller), which we already try to re-discovery coordinator. The benefits are that for manual assign mode that does not try to trigger any of the above three, then we never would be discovering coordinator. The original fix in #11631 would let the consumer to discover coordinator even if none of the above operations are required. Reviewers: Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
…2244) (#12259) This is a cherrypick commit of 3.1. Another way of fixing KAFKA-13563 other than #11631. Instead of letting the consumer to always try to discover coordinator in pool with either mode (subscribe / assign), we defer the clearance of discover future upon committing async only. More specifically, under manual assign mode, there are only three places where we need the coordinator: commitAsync (both by the consumer itself or triggered by caller), this is where we want to fix. commitSync, which we already try to re-discovery coordinator. committed (both by the consumer itself based on reset policy, or triggered by caller), which we already try to re-discovery coordinator. The benefits are that for manual assign mode that does not try to trigger any of the above three, then we never would be discovering coordinator. The original fix in #11631 would let the consumer to discover coordinator even if none of the above operations are required. Reviewers: Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
…ache#12244) (apache#12259) This is a cherrypick commit of 3.1. Another way of fixing KAFKA-13563 other than apache#11631. Instead of letting the consumer to always try to discover coordinator in pool with either mode (subscribe / assign), we defer the clearance of discover future upon committing async only. More specifically, under manual assign mode, there are only three places where we need the coordinator: commitAsync (both by the consumer itself or triggered by caller), this is where we want to fix. commitSync, which we already try to re-discovery coordinator. committed (both by the consumer itself based on reset policy, or triggered by caller), which we already try to re-discovery coordinator. The benefits are that for manual assign mode that does not try to trigger any of the above three, then we never would be discovering coordinator. The original fix in apache#11631 would let the consumer to discover coordinator even if none of the above operations are required. Reviewers: Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
…ache#12244) This is another way of fixing KAFKA-13563 other than apache#11631. Instead of letting the consumer to always try to discover coordinator in pool with either mode (subscribe / assign), we defer the clearance of discover future upon committing async only. More specifically, under manual assign mode, there are only three places where we need the coordinator: * commitAsync (both by the consumer itself or triggered by caller), this is where we want to fix. * commitSync, which we already try to re-discovery coordinator. * committed (both by the consumer itself based on reset policy, or triggered by caller), which we already try to re-discovery coordinator. The benefits are that for manual assign mode that does not try to trigger any of the above three, then we never would be discovering coordinator. The original fix in apache#11631 would let the consumer to discover coordinator even if none of the above operations are required. Reviewers: Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
* HOTFIX: only try to clear discover-coordinator future upon commit (apache#12244) This is another way of fixing KAFKA-13563 other than apache#11631. Instead of letting the consumer to always try to discover coordinator in pool with either mode (subscribe / assign), we defer the clearance of discover future upon committing async only. More specifically, under manual assign mode, there are only three places where we need the coordinator: * commitAsync (both by the consumer itself or triggered by caller), this is where we want to fix. * commitSync, which we already try to re-discovery coordinator. * committed (both by the consumer itself based on reset policy, or triggered by caller), which we already try to re-discovery coordinator. The benefits are that for manual assign mode that does not try to trigger any of the above three, then we never would be discovering coordinator. The original fix in apache#11631 would let the consumer to discover coordinator even if none of the above operations are required. Reviewers: Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io> * HOTFIX: add space to avoid checkstyle failure Co-authored-by: Guozhang Wang <wangguoz@gmail.com>
…ache#12244) (apache#12259) (#723) This is a cherrypick commit of 3.1. Another way of fixing KAFKA-13563 other than apache#11631. Instead of letting the consumer to always try to discover coordinator in pool with either mode (subscribe / assign), we defer the clearance of discover future upon committing async only. More specifically, under manual assign mode, there are only three places where we need the coordinator: commitAsync (both by the consumer itself or triggered by caller), this is where we want to fix. commitSync, which we already try to re-discovery coordinator. committed (both by the consumer itself based on reset policy, or triggered by caller), which we already try to re-discovery coordinator. The benefits are that for manual assign mode that does not try to trigger any of the above three, then we never would be discovering coordinator. The original fix in apache#11631 would let the consumer to discover coordinator even if none of the above operations are required. Reviewers: Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io> Co-authored-by: Guozhang Wang <wangguoz@gmail.com>
…et commits (#12626) Asynchronous offset commits may throw an unexpected WakeupException following #11631 and #12244. This patch fixes the problem by passing through a flag to ensureCoordinatorReady to indicate whether wakeups should be disabled. This is used to disable wakeups in the context of asynchronous offset commits. All other uses leave wakeups enabled. Note: this patch builds on top of #12611. Co-Authored-By: Guozhang Wang wangguoz@gmail.com Reviewers: Luke Chen <showuon@gmail.com>
…et commits (#12626) Asynchronous offset commits may throw an unexpected WakeupException following #11631 and #12244. This patch fixes the problem by passing through a flag to ensureCoordinatorReady to indicate whether wakeups should be disabled. This is used to disable wakeups in the context of asynchronous offset commits. All other uses leave wakeups enabled. Note: this patch builds on top of #12611. Co-Authored-By: Guozhang Wang wangguoz@gmail.com Reviewers: Luke Chen <showuon@gmail.com>
…et commits (#12626) Asynchronous offset commits may throw an unexpected WakeupException following #11631 and #12244. This patch fixes the problem by passing through a flag to ensureCoordinatorReady to indicate whether wakeups should be disabled. This is used to disable wakeups in the context of asynchronous offset commits. All other uses leave wakeups enabled. Note: this patch builds on top of #12611. Co-Authored-By: Guozhang Wang wangguoz@gmail.com Reviewers: Luke Chen <showuon@gmail.com>
jira: KAFKA-13563
After KAFKA-10793, we clear the findCoordinatorFuture in 2 places:
But in non consumer group mode with group id provided (for offset commitment. So that there will be consumerCoordinator created), there will be no (1)heartbeat thread , and it only call (2)AbstractCoordinator#ensureCoordinatorReady when 1st time consumer wants to fetch committed offset position. That is, after 2nd lookupCoordinator call, we have no chance to clear the findCoordinatorFuture , and causes the offset commit never succeeded.
To avoid the race condition as KAFKA-10793 mentioned, it's not safe to clear the findCoordinatorFuture in the future listener. So, I think we can fix this issue by calling AbstractCoordinator#ensureCoordinatorReady when coordinator unknown in non consumer group case, under each ConsumerCoordinator#poll.
Committer Checklist (excluded from commit message)