KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter#8589
KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter#8589mjsax merged 19 commits intoapache:trunkfrom
Conversation
|
Retest this please. Call for review @abbccdda |
|
oh, sorry I didn't run the Checkstyle and spotbugs quality checks locally, I will update shortly with these fixed |
|
Retest this please. |
|
@feyman2016 Only committers can trigger Jenkins retesting... Retest this please. |
|
Retest this please |
|
@mjsax I see, thanks! |
abbccdda
left a comment
There was a problem hiding this comment.
Thanks for the PR, left some initial comments.
| if (options.removeAll()) { | ||
| List<MemberIdentity> members = getMembersFromGroup(groupId); | ||
| findCoordinatorCall = getFindCoordinatorCall(context, | ||
| () -> getRemoveMembersFromGroupCall(context, members)); |
There was a problem hiding this comment.
could we pass the members into the context?
There was a problem hiding this comment.
My initial thought was to put the members in the context, but hesitated to do so because the ConsumerGroupOperationContext seems to be for generic usage. So I just refer to KafkaAdminClient#getAlterConsumerGroupOffsetsCall and make the members as a separate input param. Anyway, I'm glad to make the change if we think it's preferred to put the members in context.
There was a problem hiding this comment.
Yes, I feel this is more consistent for internal calls not to do a second round of interpretation for which members set to use.
There was a problem hiding this comment.
And to be clear, I'm not suggesting we have to put stuff into the context, just always passing in the intended removal list and do not depend on context.removeAll again inside internal function.
| try { | ||
| members = describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members(); | ||
| } catch (Throwable ex) { | ||
| System.out.println("Encounter exception when trying to get members from group: " + groupId); |
| members = describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members(); | ||
| } catch (Throwable ex) { | ||
| System.out.println("Encounter exception when trying to get members from group: " + groupId); | ||
| ex.printStackTrace(); |
There was a problem hiding this comment.
Curious why we are still continuing in this case, as the member lookup already fails.
There was a problem hiding this comment.
Thanks, will fix this .
| @@ -32,12 +32,23 @@ | |||
| public class RemoveMembersFromConsumerGroupOptions extends AbstractOptions<RemoveMembersFromConsumerGroupOptions> { | |||
|
|
|||
| private Set<MemberToRemove> members; | |||
There was a problem hiding this comment.
Could we just make members to be Optional<Set<MemberToRemove>> so that we don't need a separate removeAll parameter?
There was a problem hiding this comment.
Sure. Taking a step further, can we just keep the the type Set<MemberToRemove> for members unchanged and treat it as removeAll if the members is empty set?
| } | ||
|
|
||
| List<MemberIdentity> memberToRemove = new ArrayList<>(); | ||
| for (MemberDescription member: members) { |
There was a problem hiding this comment.
style error here.
I would recommend doing a self style check like:
./gradlew checkstyleMain checkstyleTest spotbugsMain spotbugsTest spotbugsScoverage compileTestJava otherwise we still need to fix those failures after we do jenkins build.
There was a problem hiding this comment.
Thanks for the advice, will fix it in the next commit.
There was a problem hiding this comment.
I reran the self style check, but didn't capture any error. I assume the error would be the missed final in for loop, updated.
|
@abbccdda Thanks a lot for the review, will update soon. |
|
@abbccdda Hey, updated based on comments, and also left some comments there, thanks. |
|
Call for retest and review, thanks! |
|
@feyman2016 Sure thing! |
abbccdda
left a comment
There was a problem hiding this comment.
Thanks for the update. I could see we are still having a couple of places with style violation, did you properly configure and run checkstyle locally?
| } | ||
|
|
||
| List<MemberIdentity> memberToRemove = new ArrayList<>(); | ||
| for (final MemberDescription member: members) { |
| if (options.removeAll()) { | ||
| List<MemberIdentity> members = getMembersFromGroup(groupId); | ||
| findCoordinatorCall = getFindCoordinatorCall(context, | ||
| () -> getRemoveMembersFromGroupCall(context, members)); |
There was a problem hiding this comment.
Yes, I feel this is more consistent for internal calls not to do a second round of interpretation for which members set to use.
| public boolean removeAll() { | ||
| return members.isEmpty(); | ||
| } | ||
|
|
| () -> getRemoveMembersFromGroupCall(context, members)); | ||
| } else { | ||
| findCoordinatorCall = getFindCoordinatorCall(context, | ||
| () -> getRemoveMembersFromGroupCall(context, new ArrayList<>())); |
| // If coordinator changed since we fetched it, retry | ||
| if (ConsumerGroupOperationContext.hasCoordinatorMoved(response)) { | ||
| Call call = getRemoveMembersFromGroupCall(context); | ||
| Call call = getRemoveMembersFromGroupCall(context, allMembers); |
There was a problem hiding this comment.
Why do we blindly put allMembers? I believe we base on context to interpret, but like discussed earlier, this is easy to make mistake, we should rely on one source for members.
There was a problem hiding this comment.
Fixed, now we explicitly pass in the members to be deleted to the private getRemoveMembersFromGroupCall
| Utils.closeQuietly(producer, "producer") | ||
| } | ||
|
|
||
| val EMPTY_GROUP_INSTANCE_ID = "" |
There was a problem hiding this comment.
I don't think we really need this struct, could we just put null in groupInstanceSet?
There was a problem hiding this comment.
I feel like this is more informative, so didn't update it, but yeah, I can update if we really not prefer this~
| try { | ||
| while (true) { | ||
| consumer.poll(JDuration.ofSeconds(5)) | ||
| if ( !consumer.assignment.isEmpty && latch.getCount > 0L) |
| consumer.commitSync() | ||
| } | ||
| } catch { | ||
| case _: InterruptException => // Suppress the output to stderr |
There was a problem hiding this comment.
Didn't change the exception handling logic here, just extract the Thread creation logic to reuse~
| assertFalse(testGroupDescription.isSimpleConsumerGroup) | ||
| assertEquals(consumerSet.size -1, testGroupDescription.members().size()) | ||
|
|
||
| // Delete all active members remained (a static member + a dynamic member) |
| .withRequiredArg() | ||
| .ofType(String.class) | ||
| .describedAs("file name"); | ||
| forceOption = optionParser.accepts("force", "Force remove members when long session time out has been configured, " + |
There was a problem hiding this comment.
Do we also want to edit the usage info on top to mention the force delete option?
There was a problem hiding this comment.
I think so, updated
|
@abbccdda Sorry for causing too much trouble about style....will make sure style check correctly executed before requesting for review next time. |
|
@feyman2016 no worry! It's just for first time :) |
|
@abbccdda I'm fixing the style error, and I found that it showed success if I run the below cmd locally : |
|
@feyman2016 Thanks for the context. I don't worry too much for comment vanish if you change the commit history, as they would not be gone but just show as Also just a reminder that we are one week away from the feature freeze, so let's try to ramp up and get this into 2.6. |
|
@abbccdda Thanks, will try my best to get this into 2.6 |
|
@abbccdda Updated, call for review, thanks! |
abbccdda
left a comment
There was a problem hiding this comment.
Thanks for the update @feyman2016 ! Got some more comments, but I think we are getting there :)
| Collection<MemberDescription> members; | ||
| try { | ||
| members = describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members(); | ||
| } catch (Throwable ex) { |
There was a problem hiding this comment.
I think we should catch Exception here:
https://stackoverflow.com/questions/2274102/difference-between-using-throwable-and-exception-in-a-try-catch
There was a problem hiding this comment.
Make sense, fixed~
| for (final MemberDescription member : members) { | ||
| if (member.groupInstanceId().isPresent()) { | ||
| memberToRemove.add(new MemberIdentity().setGroupInstanceId(member.groupInstanceId().get()) | ||
| ); |
There was a problem hiding this comment.
This indentation is a bit weird, let's just merge L3625-3626
|
|
||
| ConsumerGroupOperationContext<Map<MemberIdentity, Errors>, RemoveMembersFromConsumerGroupOptions> context = | ||
| new ConsumerGroupOperationContext<>(groupId, options, deadline, future); | ||
| new ConsumerGroupOperationContext<>(groupId, options, deadline, future); |
There was a problem hiding this comment.
Let's get back the original indentation.
| return new LeaveGroupRequest.Builder(context.groupId(), | ||
| context.options().members().stream().map( | ||
| MemberToRemove::toMemberIdentity).collect(Collectors.toList())); | ||
| return new LeaveGroupRequest.Builder(context.groupId(), |
|
|
||
| private Call getRemoveMembersFromGroupCall(ConsumerGroupOperationContext<Map<MemberIdentity, Errors>, RemoveMembersFromConsumerGroupOptions> context) { | ||
| private Call getRemoveMembersFromGroupCall(ConsumerGroupOperationContext<Map<MemberIdentity, Errors>, | ||
| RemoveMembersFromConsumerGroupOptions> context, List<MemberIdentity> allMembers) { |
| consumerThread.join() | ||
| } | ||
| } | ||
| }finally { |
There was a problem hiding this comment.
nit: format
I'm pretty surprised this wasn't caught in my previous template. Let me check how to cover this in style test as well.
There was a problem hiding this comment.
Thanks, but I wonder what does the template refer to here?
| public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() throws Exception { | ||
| appID = testId + "-with-force-option"; | ||
| streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); | ||
| streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT * 100); |
There was a problem hiding this comment.
Without the "" + to convert the value to String, we will get exception like: it is because STREAMS_CONSUMER_TIMEOUT = 2000L, ""+ is widely used in this test, just follow it here without any change to not enlarge the scope of this PR, I can help to create a Jira to enhance it if we think this workaround is not quite intuitive~
org.apache.kafka.common.config.ConfigException: Invalid value 200000 for configuration session.timeout.ms: Expected value to be a 32-bit integer, but it was a java.lang.Long
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:672)
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129)
at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:606)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:630)
at org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getRestoreConsumer(DefaultKafkaClientSupplier.java:56)
at org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:313)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:766)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:652)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:562)
at org.apache.kafka.streams.integration.AbstractResetIntegrationTest.testResetWhenLongSessionTimeoutConfiguredWithForceOption(AbstractResetIntegrationTest.java:270)
at org.apache.kafka.streams.integration.ResetIntegrationTest.testResetWhenLongSessionTimeoutConfiguredWithForceOption(ResetIntegrationTest.java:77)
There was a problem hiding this comment.
I see, this is indeed weird, please file a JIRA so that we could clean in a follow-up PR if others feel the same way.
There was a problem hiding this comment.
Created https://issues.apache.org/jira/browse/KAFKA-10035 for tracking, thanks!
|
|
||
| private void cleanGlobal(final boolean withIntermediateTopics, | ||
| private int tryCleanGlobal(final boolean withIntermediateTopics, | ||
| final String resetScenario, |
| cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT); | ||
|
|
||
| final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig); | ||
| return exitCode; |
There was a problem hiding this comment.
Like said earlier, I think we could just return
return new StreamsResetter().run(parameters, cleanUpConfig) == 0
| } | ||
|
|
||
| private void cleanGlobal(final boolean withIntermediateTopics, | ||
| private int tryCleanGlobal(final boolean withIntermediateTopics, |
There was a problem hiding this comment.
We could add meta comment for the return value here, and instead of returning an exit code, I feel a boolean is suffice to indicate whether the clean operation was successful or not.
There was a problem hiding this comment.
Indeed, updated as suggested
| */ | ||
| public KafkaFuture<Void> memberResult(MemberToRemove member) { | ||
| if (removeAll()) { | ||
| throw new IllegalArgumentException("The method: memberResult is not applicable in 'removeAll' mode"); |
There was a problem hiding this comment.
Why that? I understand that we expect that users don't know the memberId if the so a "remove all"; however, I don't see why we need to disallow this call? Can you elaborate?
There was a problem hiding this comment.
Since in the removeAll scenario, we don't save the members to be deleted in RemoveMembersFromConsumerGroupResult, so I think calling memberResult doesn't seem applicative.
| MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED); | ||
| } | ||
|
|
||
| private static DescribeGroupsResponseData prepareDescribeGroupsResponseData(String groupId, List<String> groupInstances, |
There was a problem hiding this comment.
Nit: formatting
private static DescribeGroupsResponseData prepareDescribeGroupsResponseData(String groupId,
List<String> groupInstances,
List<TopicPartition> topicPartitions) {
| new NewTopic(testTopicName, testNumPartitions, 1.toShort))).all().get() | ||
| waitForTopics(client, List(testTopicName), List()) | ||
|
|
||
| client.createTopics(util.Arrays.asList(new NewTopic(testTopicName, testNumPartitions, 1.toShort), |
There was a problem hiding this comment.
nit: formatting: move new NewTopic(...) to next line
mjsax
left a comment
There was a problem hiding this comment.
Overall, LGTM. Couple of comments/questions.
Co-authored-by: Matthias J. Sax <mjsax@apache.org>
Co-authored-by: Matthias J. Sax <mjsax@apache.org>
Co-authored-by: Matthias J. Sax <mjsax@apache.org>
Co-authored-by: Matthias J. Sax <mjsax@apache.org>
|
Retest this please. |
1 similar comment
|
Retest this please. |
|
@mjsax Thanks a lot for the review and tests triggering! |
|
Java 8: Java 11: Java 14: |
|
Retest this please. |
3 similar comments
|
Retest this please. |
|
Retest this please. |
|
Retest this please. |
|
Jenkins does not cooperate... will try again later |
|
Retest this please. |
|
Finally, jenkins cooperated :) |
|
Thanks for the KIP and PR @feyman2016! |
|
Great work, @feyman2016 @mjsax ! |
|
FYI, since we took a slightly different implementation(leveraging the empty members rather than introducing a new field to imply the |
I use IntelliJ that does have some auto-formatting. Use it basically with default settings. In doubt, disable auto-formatting to avoid unnecessary reformatting. -- Some things are also a little bit of "personal taste"...
Thanks! Can you send a follow up email to the voting thread explaining the change? To make sure nobody has concerns about it. |
|
@mjsax Sure, updated in the voting thread of KIP-571 |
* 'trunk' of github.com:apache/kafka: (36 commits) Remove redundant `containsKey` call in KafkaProducer (apache#8761) KAFKA-9494; Include additional metadata information in DescribeConfig response (KIP-569) (apache#8723) KAFKA-10061; Fix flaky `ReassignPartitionsIntegrationTest.testCancellation` (apache#8749) KAFKA-9130; KIP-518 Allow listing consumer groups per state (apache#8238) KAFKA-9501: convert between active and standby without closing stores (apache#8248) KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (apache#8739) MINOR: Log the reason for coordinator discovery failure (apache#8747) KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException (apache#8705) MINOR: remove unnecessary timeout for admin request (apache#8738) MINOR: Relax Percentiles test (apache#8748) MINOR: regression test for task assignor config (apache#8743) MINOR: Update documentation.html to refer to 2.6 (apache#8745) MINOR: Update documentation.html to refer to 2.5 (apache#8744) KAFKA-9673: Filter and Conditional SMTs (apache#8699) KAFKA-9971: Error Reporting in Sink Connectors (KIP-610) (apache#8720) KAFKA-10052: Harden assertion of topic settings in Connect integration tests (apache#8735) MINOR: Slight MetadataCache tweaks to avoid unnecessary work (apache#8728) KAFKA-9802; Increase transaction timeout in system tests to reduce flakiness (apache#8736) KAFKA-10050: kafka_log4j_appender.py fixed for JDK11 (apache#8731) KAFKA-9146: Add option to force delete active members in StreamsResetter (apache#8589) ... # Conflicts: # core/src/main/scala/kafka/log/Log.scala
This PR is mainly to enhance https://issues.apache.org/jira/browse/KAFKA-9146.
KafkaAdminClient#removeMembersFromConsumerGroupnow support "removeAll" members in a given groupRelated KIP:
KIP-571: https://cwiki.apache.org/confluence/display/KAFKA/KIP-571%3A+Add+option+to+force+remove+members+in+StreamsResetter
Committer Checklist (excluded from commit message)