KAFKA-9146 [WIP]: Add option to force delete active members in StreamsResetter#8020
KAFKA-9146 [WIP]: Add option to force delete active members in StreamsResetter#8020feyman2016 wants to merge 10 commits intoapache:trunkfrom
Conversation
|
Thanks for the PR. Could you rename it to |
|
I updated the PR title -- however, auto-linking only works when a new PR is opened (not when the PR title is changed...). I updated the Jira ticket to link to the PR. |
| private Set<LeaveGroupRequestData.MemberIdentity> members; | ||
|
|
||
| public RemoveMembersFromConsumerGroupOptions(Collection<MemberToRemove> members) { | ||
| public RemoveMembersFromConsumerGroupOptions(Collection<LeaveGroupRequestData.MemberIdentity> members) { |
There was a problem hiding this comment.
This is an breaking public API change -- we can't allow this.
There was a problem hiding this comment.
Thanks ,mjsax !
About the public API change, Boyang, Chen has also replied in the related JIRA, let me go through it first and will update soon. :)
| } | ||
|
|
||
| public Set<MemberToRemove> members() { | ||
| public Set<LeaveGroupRequestData.MemberIdentity> members() { |
There was a problem hiding this comment.
This is an breaking public API change -- we can't allow this.
|
|
||
| RemoveMembersFromConsumerGroupResult(KafkaFuture<Map<MemberIdentity, Errors>> future, | ||
| Set<MemberToRemove> memberInfos) { | ||
| Set<MemberIdentity> memberInfos) { |
There was a problem hiding this comment.
This is an breaking public API change -- we can't allow this.
| * Returns the selected member future. | ||
| */ | ||
| public KafkaFuture<Void> memberResult(MemberToRemove member) { | ||
| public KafkaFuture<Void> memberResult(MemberIdentity member) { |
There was a problem hiding this comment.
This is an breaking public API change -- we can't allow this.
| .withRequiredArg() | ||
| .ofType(String.class) | ||
| .describedAs("file name"); | ||
| forceDeleteMemberOption = optionParser.accepts("force-delete-member", "Force delete member when long session time out has been configured").withRequiredArg().ofType(Boolean.class).defaultsTo(false); |
There was a problem hiding this comment.
This is a public API change, too.
|
The related JIRA https://issues.apache.org/jira/browse/KAFKA-9146 needs a KIP and detailed plan, will mark this PR as WIP and update later on. |
| private void cleanGlobal(final boolean withIntermediateTopics, | ||
| final String resetScenario, | ||
| final String resetScenarioArg) throws Exception { | ||
| private int tryCleanGlobal(final boolean withIntermediateTopics, |
There was a problem hiding this comment.
No logic change here, just extract the main body of cleanGlobal out and put it in tryCleanGlobal.
|
@feyman2016 Are we ready for another review? |
|
@feyman2016 Is this still the PR you are working against? |
|
Hi, @abbccdda, sorry for the delay, I have created another PR, and request for review, thanks! https://github.com/apache/kafka/pull/8589/files |
|
@feyman2016 closing this PR in favor of #8589 -- I assume it's a replacement. |
|
@mjsax Thanks for closing this. |
This PR is mainly to enhance https://issues.apache.org/jira/browse/KAFKA-9146.
org.apache.kafka.clients.admin.Admin#removeMembersFromConsumerGrouphas been changed to support both static or dynamic members~StreamsResetteris introduced, if --force specified when using the StreamsResetter, then all the active static/dynamic members will be removed.Related KIP:
KIP-571: https://cwiki.apache.org/confluence/display/KAFKA/KIP-571%3A+Add+option+to+force+remove+members+in+StreamsResetter
Committer Checklist (excluded from commit message)