KAFKA-8179: Part 3, Add PartitionsLost API for resetGenerations and metadata/subscription change#6884
Conversation
…-client-algorithm
…pache#6511) As described in KIP-443 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-443%3A+Return+to+default+segment.ms+and+segment.index.bytes+in+Streams+repartition+topics). We want to remove the aggressive overrides of segment.ms and segment.index.bytes for repartition topics. The remaining segment.bytes should still be effective in bounding its footprint. Reviewers: Bill Bejeck <bbejeck@gmail.com>
…-consumer-protocol
…-consumer-protocol
…-consumer-protocol
…-consumer-protocol
|
triggered system tests for client: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2854/ for streams: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2855/ |
ableegoldman
left a comment
There was a problem hiding this comment.
Thanks for kicking off system tests -- everything LGTM
| * Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)} or {@link #subscribe(Pattern)}. | ||
| * This also clears any partitions directly assigned through {@link #assign(Collection)}. | ||
| * | ||
| * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. rebalance callback errors) |
There was a problem hiding this comment.
It might be worth mentioning in the upgrade notes the fact that we now invoke revocation logic in unsubscribe and close.
There was a problem hiding this comment.
Yes, that's in my plan -- I have another PR on web docs for the changes related, but wanted to keep this one from continue growing.
| // heartbeat error handling is executed by heartbeat thread, in which case | ||
| // we would not drop partitions and trigger rebalance callback as it should | ||
| // only be triggered by the caller thread | ||
| if (api != ApiKeys.HEARTBEAT) { |
There was a problem hiding this comment.
Hmm.. I am not sure this is sufficient. Any of the responses could return from the heartbeat thread.
|
Jenkins failures are irrelevant. |
|
retest this please |
| .compose(new LeaveGroupResponseHandler()); | ||
| } | ||
|
|
||
| // we need to reset generation first in order to trigger the rebalance callback if necessary, before sending |
There was a problem hiding this comment.
This seems no longer relevant?
|
Addressed latest comment. |
|
Jenkins failures are irrelevant and non-overlapping. |
hachikuji
left a comment
There was a problem hiding this comment.
LGTM. I left a suggestion on an alternative name for onLeaveGroup.
| /** | ||
| * Invoked prior to each leave group event. This is typically used to cleanup assigned partitions | ||
| */ | ||
| protected void onLeaveGroup() {} |
There was a problem hiding this comment.
For consistency with onJoinPrepare and onJoinComplete, would it be reasonable to call this onLeavePrepare?
* apache-github/trunk: MINOR: Ignore dynamic log4j log level tests (apache#7183) KAFKA-8748: Fix flaky testDescribeLogDirsRequest (apache#7182) KAFKA-8598: Use automatic RPC generation in RenewDelegationToken KAFKA-8179: Part 3, Add PartitionsLost API for resetGenerations and metadata/subscription change (apache#6884)
…setGenerations and metadata/subscription change (apache#6884)" This reverts commit e867a58.
This is a cherry-pick of the bug-fix included in #6884 to 2.3 and older branch. Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Matthias J. Sax <mjsax@apache.org>
When resetting generations proactively and subscription changes, we will trigger onPartitionsRevoked instead.
The ordering of the callback would be the following:
a. Callback onPartitionsRevoked / onPartitionsLost triggered.
b. Update the assignment (both revoked and added).
c. Callback onPartitionsAssigned triggered.
In this way we are assured that users can still access the partitions being revoked, whereas they can also access the partitions being added.
Semantical behavior change (KAFKA-4600): if the rebalance listener throws an exception, pass it along all the way to the
consumer.pollcaller, but still completes the rest of the actions. Also, the newly assigned partitions list does not gets affected with exception thrown since it is just for notifying the users.Semantical behavior change: the ConsumerCoordinator would not try to modify assignor's returned assignments, instead it will validate that assignments and set the error code accordingly: if there are overlaps between added / revoked partitions, it is a fatal error and would be communicated to all members to stop; if revoked is not empty, it is an error indicate re-join; otherwise, it is normal.
Minor: with the error code removed from the Assignment, ConsumerCoordinator will request re-join if the revoked partitions list is not empty.
Updated ConsumerCoordinatorTest accordingly. Also found a minor bug in MetadataUpdate that removed topic would still be retained with null value of num.partitions.
Updated a few other unit tests that are exposed due to this change.
Committer Checklist (excluded from commit message)