KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled#12603
Conversation
hachikuji
left a comment
There was a problem hiding this comment.
Thanks, left a few questions.
| if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) { | ||
| autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync(); | ||
| if (autoCommitEnabled) { | ||
| pausePartitions(partitionsToRevoke); |
There was a problem hiding this comment.
It seems like this bug (and some of the complexity in this patch) is due to the fact that we do the auto-commit prior to revoking partitions. I wonder if that is really necessary. If we revoke first, then the partitions would be removed from SubscriptionState and we wouldn't have to worry about fetches for these partitions returning. Could that work as well?
There was a problem hiding this comment.
but if we don't commit these offsets, i wonder if that breaks the poll contract, i.e. we need to commit the acked data. Note that, I believe some of the data could already be returned to the client from the previous poll loop.
There was a problem hiding this comment.
I think what Jason suggested, is change from
"committing offset for subscribed partitions -> revoking",
to "revoking -> committing offset for subscribed partitions + revoked partitions".
I don't think it is a good idea because that would open a door to allow consumer committing offsets for partitions it doesn't own.
c01cbac to
0cc360e
Compare
|
Thanks @showuon @hachikuji for the inputs, I just made this PR reviewable, so please review it if possible :) |
|
@philipnee Thanks for the PR. Could you please update the description to explain the change? |
There was a problem hiding this comment.
nit: how about we make SubscriptionState.markUnconsumable batched? Then we just need to grab the lock once.
clean up clean up clean up clean up nit delay the cooperative partition revocation PR comment: Use switch statement modify test PR comments: only precompute EAGER partitions PR comments clean up Reverting to the previous form to simply patching clean up rename function
aa9fdb7 to
66531f8
Compare
| } | ||
|
|
||
| @Test | ||
| public void testPendingRevacationPartitionFetching() { |
There was a problem hiding this comment.
nit: Revocation is misspelled
I did not find the name very clear. It looks like the main difference between this and testFetchingPendingPartitions is that this method tests that the pending state gets reset after reassignment? Perhaps the name should reflect that?
showuon
left a comment
There was a problem hiding this comment.
Sorry for the delay review. LGTM! Left some minor comments, which can be addressed in a follow-up PR. Thank you.
| subscriptions.seek(tp0, 100); | ||
| subscriptions.seek(tp0, 100); | ||
| subscriptions.seek(tp0, 100); |
There was a problem hiding this comment.
Any reason we seek tp0 to offset 100 three times?
| subscriptions.seek(tp0, 100); | ||
| assertEquals(100, subscriptions.position(tp0).offset); | ||
|
|
||
| assertTrue(subscriptions.isFetchable(tp0)); // because tp is paused |
There was a problem hiding this comment.
I don't understand the comment here. Where do we pause tp0? and it is fetchable now, right?
There was a problem hiding this comment.
I'll clean it up.
There was a problem hiding this comment.
oops, this is a typo. Probably some residual after modifying the tests.
… prior to revocation (#12603) When auto-commit is enabled with the "eager" rebalance strategy, the consumer will commit all offsets prior to revocation. Following recent changes, this offset commit is done asynchronously, which means there is an opportunity for fetches to continue returning data to the application. When this happens, the progress is lost following revocation, which results in duplicate consumption. This patch fixes the problem by adding a flag in `SubscriptionState` to ensure that partitions which are awaiting revocation will not continue being fetched. Reviewers: Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
… prior to revocation (#12603) When auto-commit is enabled with the "eager" rebalance strategy, the consumer will commit all offsets prior to revocation. Following recent changes, this offset commit is done asynchronously, which means there is an opportunity for fetches to continue returning data to the application. When this happens, the progress is lost following revocation, which results in duplicate consumption. This patch fixes the problem by adding a flag in `SubscriptionState` to ensure that partitions which are awaiting revocation will not continue being fetched. Reviewers: Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
Minor cleanups in `FetcherTest` following #12603. Reviewers: Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
Minor cleanups in `FetcherTest` following apache/kafka#12603. Reviewers: Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
…2-14-SEP-2022 * apache-kafka/3.2: (45 commits) MINOR: Bump version in upgrade guide to 3.2.3 KAFKA-14208; Do not raise wakeup in consumer during asynchronous offset commits (apache#12626) KAFKA-14196; Do not continue fetching partitions awaiting auto-commit prior to revocation (apache#12603) MINOR: 3.2 branch version to 3.2.3-SNAPSHOT Bump version to 3.2.2 Upgrade Netty and Jackson versions for CVE fixes [KAFKA-14044] (apache#12376) KAFKA-14194: Fix NPE in Cluster.nodeIfOnline (apache#12584) MINOR: Update LICENSE-binary MINOR: Align Scala version to 2.13.8 MINOR: Bump version in upgrade guide to 3.2.2 ...
…eptember 2022) `Jenkinsfile` was the only conflict and we ignore the changes since they are not relevant to the Confluent build. * apache-github/3.3: (61 commits) KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads. (apache#12628) KAFKA-14243: Temporarily disable unsafe downgrade (apache#12664) KAFKA-14240; Validate kraft snapshot state on startup (apache#12653) KAFKA-14233: disable testReloadUpdatedFilesWithoutConfigChange first to fix the build (apache#12658) KAFKA-14238; KRaft metadata log should not delete segment past the latest snapshot (apache#12655) KAFKA-14156: Built-in partitioner may create suboptimal batches (apache#12570) MINOR: Adds KRaft versions of most streams system tests (apache#12458) MINOR; Add missing li end tag (apache#12640) MINOR: Mention that kraft is production ready in upgrade notes (apache#12635) MINOR: Add upgrade note regarding the Strictly Uniform Sticky Partitioner (KIP-794) (apache#12630) KAFKA-14222; KRaft's memory pool should always allocate a buffer (apache#12625) KAFKA-14208; Do not raise wakeup in consumer during asynchronous offset commits (apache#12626) KAFKA-14196; Do not continue fetching partitions awaiting auto-commit prior to revocation (apache#12603) KAFKA-14215; Ensure forwarded requests are applied to broker request quota (apache#12624) MINOR; Remove end html tag from upgrade (apache#12605) Remove the html end tag from upgrade.html KAFKA-14205; Document how to replace the disk for the KRaft Controller (apache#12597) KAFKA-14203 Disable snapshot generation on broker after metadata errors (apache#12596) KAFKA-14216: Remove ZK reference from org.apache.kafka.server.quota.ClientQuotaCallback javadoc (apache#12617) KAFKA-14217: app-reset-tool.html should not show --zookeeper flag that no longer exists (apache#12618) ...
Problem
Several flaky tests under OffsetValidationTest are indicating an underlying consumer duplication issue. This is caused by KAFKA-14024 after switching the autocommit to async, and therefore causes the fetcher to move ahead without committing the previous offsets. The progress will be lost after partition revocation
Fix
Add an internal flag to stop the fetcher from sending fetches, for the partition to be revoked. The reason we didn't use pause() is because we don't want user API to interfere with the behavior, which might potentially introduce more bugs.
Committer Checklist (excluded from commit message)