From dda96a8dc072a0c70e41941bea977f518d545578 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Tue, 11 Apr 2023 10:17:31 +0200 Subject: [PATCH 1/6] Add TargetAssignmentBuilder --- .../consumer/TargetAssignmentBuilder.java | 327 +++++++++ .../consumer/TargetAssignmentBuilderTest.java | 679 ++++++++++++++++++ 2 files changed, 1006 insertions(+) create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java create mode 100644 group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java new file mode 100644 index 0000000000000..be7b1bd69dbd9 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.Record; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; + +/** + * Build a new Target Assignment based on the provided parameters. As a result, + * it yields the records that must be persisted to the log and the new member + * assignments as a map. + * + * Records are only created for members which have a new target assignment. If + * their assignment did not change, no new record is needed. + * + * When a member is deleted, it is assumed that its target assignment record + * is deleted as part of the member deletion process. In other words, this class + * does not yield a tombstone for remove members. + */ +public class TargetAssignmentBuilder { + /** + * The assignment result returned by {{@link TargetAssignmentBuilder#build()}}. + */ + public static class TargetAssignmentResult { + /** + * The records that must be applied to the __consumer_offsets + * topics to persist the new target assignment. + */ + private final List records; + + /** + * The new target assignment for all members. + */ + private final Map assignments; + + TargetAssignmentResult( + List records, + Map assignments + ) { + Objects.requireNonNull(records); + Objects.requireNonNull(assignments); + this.records = records; + this.assignments = assignments; + } + + /** + * @return The records. + */ + public List records() { + return records; + } + + /** + * @return The assignment. + */ + public Map assignments() { + return assignments; + } + } + + /** + * The group id. + */ + private final String groupId; + + /** + * The group epoch. + */ + private final int groupEpoch; + + /** + * The partition assignor to compute the assignment. + */ + private final PartitionAssignor assignor; + + /** + * The members in the group. + */ + private Map members = Collections.emptyMap(); + + /** + * The subscription metadata. + */ + private Map subscriptionMetadata = Collections.emptyMap(); + + /** + * The current target assignment. + */ + private Map assignments = Collections.emptyMap(); + + /** + * The members which have been updated or deleted. Deleted members + * are signaled by a null value. + */ + private Map updatedMembers = new HashMap<>(); + + /** + * Constructs the object. + * + * @param groupId The group id. + * @param groupEpoch The group epoch to compute a target assignment for. + * @param assignor The assignor to use to compute the target assignment. + */ + public TargetAssignmentBuilder( + String groupId, + int groupEpoch, + PartitionAssignor assignor + ) { + this.groupId = Objects.requireNonNull(groupId); + this.groupEpoch = groupEpoch; + this.assignor = Objects.requireNonNull(assignor); + } + + /** + * Adds all the current members. + * + * @param members The current members in the consumer groups. + * @return This object. + */ + public TargetAssignmentBuilder withMembers( + Map members + ) { + this.members = members; + return this; + } + + /** + * Adds the subscription metadata to use. + * + * @param subscriptionMetadata The subscription metadata. + * @return This object. + */ + public TargetAssignmentBuilder withSubscriptionMetadata( + Map subscriptionMetadata + ) { + this.subscriptionMetadata = subscriptionMetadata; + return this; + } + + /** + * Adds the current target assignments. + * + * @param assignments The current assignments. + * @return This object. + */ + public TargetAssignmentBuilder withTargetAssignments( + Map assignments + ) { + this.assignments = assignments; + return this; + } + + /** + * Updates a member. This is useful when the updated member is + * not yet materialized in memory. + * + * @param memberId The member id. + * @param updatedMember The updated member. + * @return This object. + */ + public TargetAssignmentBuilder withUpdatedMember( + String memberId, + ConsumerGroupMember updatedMember + ) { + this.updatedMembers.put(memberId, updatedMember); + return this; + } + + /** + * Removes a member. This is useful when the removed member + * is not yet materialized in memory. + * + * @param memberId The member id. + * @return This object. + */ + public TargetAssignmentBuilder withRemoveMembers( + String memberId + ) { + return withUpdatedMember(memberId, null); + } + + /** + * Builds the new target assignment. + * + * @return A TargetAssignmentResult which contains the records to update + * the current target assignment. + * @throws PartitionAssignorException if the assignment can not be computed. + */ + public TargetAssignmentResult build() throws PartitionAssignorException { + Map memberSpecs = new HashMap<>(); + + // Prepare the member spec for all members. + members.forEach((memberId, member) -> addMemberSpec( + memberSpecs, + member, + assignments.getOrDefault(memberId, Assignment.EMPTY) + )); + + // Update the member spec if updated or deleted members. + updatedMembers.forEach((memberId, updatedMemberOrNull) -> { + if (updatedMemberOrNull == null) { + memberSpecs.remove(memberId); + } else { + addMemberSpec( + memberSpecs, + updatedMemberOrNull, + assignments.getOrDefault(memberId, Assignment.EMPTY) + ); + } + }); + + // Prepare the topic metadata. + Map topics = new HashMap<>(); + subscriptionMetadata.forEach((topicName, topicMetadata) -> + topics.put(topicMetadata.id(), new AssignmentTopicMetadata(topicMetadata.numPartitions())) + ); + + // Compute the assignment. + GroupAssignment newGroupAssignment = assignor.assign(new AssignmentSpec( + Collections.unmodifiableMap(memberSpecs), + Collections.unmodifiableMap(topics) + )); + + // Compute delta from previous to new assignment and create the + // relevant records. + List records = new ArrayList<>(); + Map newTargetAssignment = new HashMap<>(); + + memberSpecs.keySet().forEach(memberId -> { + Assignment oldMemberAssignment = assignments.get(memberId); + Assignment newMemberAssignment = newMemberAssignment(newGroupAssignment, memberId); + + newTargetAssignment.put(memberId, newMemberAssignment); + + if (oldMemberAssignment == null) { + // If the member had no assignment, we always create a record for him. + records.add(newTargetAssignmentRecord( + groupId, + memberId, + newMemberAssignment.partitions() + )); + } else { + // If the member had an assignment, we only create a record if the + // new assignment is different. + if (!newMemberAssignment.equals(oldMemberAssignment)) { + records.add(newTargetAssignmentRecord( + groupId, + memberId, + newMemberAssignment.partitions() + )); + } + } + }); + + // Bump the assignment epoch. + records.add(newTargetAssignmentEpochRecord(groupId, groupEpoch)); + + return new TargetAssignmentResult(records, newTargetAssignment); + } + + private Assignment newMemberAssignment( + GroupAssignment newGroupAssignment, + String memberId + ) { + MemberAssignment newMemberAssignment = newGroupAssignment.members().get(memberId); + if (newMemberAssignment != null) { + return new Assignment(newMemberAssignment.targetPartitions()); + } else { + return Assignment.EMPTY; + } + } + + private void addMemberSpec( + Map members, + ConsumerGroupMember member, + Assignment targetAssignment + ) { + Set subscribedTopics = new HashSet<>(); + member.subscribedTopicNames().forEach(topicName -> { + TopicMetadata topicMetadata = subscriptionMetadata.get(topicName); + if (topicMetadata != null) { + subscribedTopics.add(topicMetadata.id()); + } + }); + + members.put(member.memberId(), new AssignmentMemberSpec( + Optional.ofNullable(member.instanceId()), + Optional.ofNullable(member.rackId()), + subscribedTopics, + targetAssignment.partitions() + )); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java new file mode 100644 index 0000000000000..ceb38e549ad66 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java @@ -0,0 +1,679 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TargetAssignmentBuilderTest { + + public static class TargetAssignmentBuilderTestContext { + private final String groupId; + private final int groupEpoch; + private final PartitionAssignor assignor = mock(PartitionAssignor.class); + private final Map members = new HashMap<>(); + private final Map subscriptionMetadata = new HashMap<>(); + private final Map updatedMembers = new HashMap<>(); + private final Map targetAssignments = new HashMap<>(); + private final Map assignments = new HashMap<>(); + + public TargetAssignmentBuilderTestContext( + String groupId, + int groupEpoch + ) { + this.groupId = groupId; + this.groupEpoch = groupEpoch; + } + + public TargetAssignmentBuilderTestContext addGroupMember( + String memberId, + List subscriptions, + Map> targetPartitions + ) { + members.put(memberId, new ConsumerGroupMember.Builder(memberId) + .setSubscribedTopicNames(subscriptions) + .setRebalanceTimeoutMs(5000) + .build()); + + targetAssignments.put(memberId, new Assignment( + (byte) 0, + targetPartitions, + VersionedMetadata.EMPTY + )); + + return this; + } + + public TargetAssignmentBuilderTestContext addTopicMetadata( + Uuid topicId, + String topicName, + int numPartitions + ) { + subscriptionMetadata.put(topicName, new TopicMetadata( + topicId, + topicName, + numPartitions + )); + return this; + } + + public TargetAssignmentBuilderTestContext updateMemberSubscription( + String memberId, + List subscriptions + ) { + return updateMemberSubscription( + memberId, + subscriptions, + Optional.empty(), + Optional.empty() + ); + } + + public TargetAssignmentBuilderTestContext updateMemberSubscription( + String memberId, + List subscriptions, + Optional instanceId, + Optional rackId + ) { + ConsumerGroupMember existingMember = members.get(memberId); + ConsumerGroupMember.Builder builder; + if (existingMember != null) { + builder = new ConsumerGroupMember.Builder(existingMember); + } else { + builder = new ConsumerGroupMember.Builder(memberId); + } + updatedMembers.put(memberId, builder + .setSubscribedTopicNames(subscriptions) + .maybeUpdateInstanceId(instanceId) + .maybeUpdateRackId(rackId) + .build()); + return this; + } + + public TargetAssignmentBuilderTestContext removeMemberSubscription( + String memberId + ) { + this.updatedMembers.put(memberId, null); + return this; + } + + public TargetAssignmentBuilderTestContext prepareMemberAssignment( + String memberId, + Map> assignment + ) { + assignments.put(memberId, new org.apache.kafka.coordinator.group.assignor.MemberAssignment(assignment)); + return this; + } + + private AssignmentMemberSpec assignmentMemberSpec( + ConsumerGroupMember member, + Assignment targetAssignment + ) { + Set subscribedTopics = new HashSet<>(); + member.subscribedTopicNames().forEach(topicName -> { + TopicMetadata topicMetadata = subscriptionMetadata.get(topicName); + if (topicMetadata != null) { + subscribedTopics.add(topicMetadata.id()); + } + }); + + return new AssignmentMemberSpec( + Optional.ofNullable(member.instanceId()), + Optional.ofNullable(member.rackId()), + subscribedTopics, + targetAssignment.partitions() + ); + } + + public TargetAssignmentBuilder.TargetAssignmentResult build() { + // Prepare expected member specs. + Map memberSpecs = new HashMap<>(); + members.forEach((memberId, member) -> { + memberSpecs.put(memberId, assignmentMemberSpec( + member, + targetAssignments.getOrDefault(memberId, Assignment.EMPTY) + )); + }); + + updatedMembers.forEach((memberId, updatedMemberOrNull) -> { + if (updatedMemberOrNull == null) { + memberSpecs.remove(memberId); + } else { + memberSpecs.put(memberId, assignmentMemberSpec( + updatedMemberOrNull, + targetAssignments.getOrDefault(memberId, Assignment.EMPTY) + )); + } + }); + + Map topicMetadata = new HashMap<>(); + subscriptionMetadata.forEach((topicName, metadata) -> { + topicMetadata.put(metadata.id(), new AssignmentTopicMetadata(metadata.numPartitions())); + }); + + AssignmentSpec assignmentSpec = new AssignmentSpec( + memberSpecs, + topicMetadata + ); + + // We use `any` here to always return an assignment but use `verify` later on + // to ensure that the input was correct. + when(assignor.assign(any())).thenReturn(new GroupAssignment(assignments)); + + // Create and populate the assignment builder. + TargetAssignmentBuilder builder = new TargetAssignmentBuilder(groupId, groupEpoch, assignor) + .withMembers(members) + .withSubscriptionMetadata(subscriptionMetadata) + .withTargetAssignments(targetAssignments); + + updatedMembers.forEach((memberId, updatedMemberOrNull) -> { + if (updatedMemberOrNull != null) { + builder.withUpdatedMember(memberId, updatedMemberOrNull); + } else { + builder.withRemoveMembers(memberId); + } + }); + + TargetAssignmentBuilder.TargetAssignmentResult result = builder.build(); + + verify(assignor, times(1)).assign(assignmentSpec); + + return result; + } + } + + @Test + public void testEmpty() { + TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( + "my-group", + 20 + ); + + TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + assertEquals(newTargetAssignmentEpochRecord( + "my-group", + 20 + ), result.records().get(result.records().size() - 1)); + assertEquals(Collections.emptyMap(), result.assignments()); + } + + @Test + public void testAssignmentIsNotChanged() { + Uuid fooTopicId = Uuid.randomUuid(); + Uuid barTopicId = Uuid.randomUuid(); + + TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( + "my-group", + 20 + ); + + context.addTopicMetadata(fooTopicId, "foo", 6); + context.addTopicMetadata(barTopicId, "bar", 6); + + context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2, 3) + )); + + context.addGroupMember("member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5, 6), + mkTopicAssignment(barTopicId, 4, 5, 6) + )); + + context.prepareMemberAssignment("member-1", mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2, 3) + )); + + context.prepareMemberAssignment("member-2", mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5, 6), + mkTopicAssignment(barTopicId, 4, 5, 6) + )); + + TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + + assertEquals(Collections.singletonList(newTargetAssignmentEpochRecord( + "my-group", + 20 + )), result.records()); + + Map expectedAssignment = new HashMap<>(); + expectedAssignment.put("member-1", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2, 3) + ))); + expectedAssignment.put("member-2", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5, 6), + mkTopicAssignment(barTopicId, 4, 5, 6) + ))); + + assertEquals(expectedAssignment, result.assignments()); + } + + @Test + public void testAssignmentSwapped() { + Uuid fooTopicId = Uuid.randomUuid(); + Uuid barTopicId = Uuid.randomUuid(); + + TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( + "my-group", + 20 + ); + + context.addTopicMetadata(fooTopicId, "foo", 6); + context.addTopicMetadata(barTopicId, "bar", 6); + + context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2, 3) + )); + + context.addGroupMember("member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5, 6), + mkTopicAssignment(barTopicId, 4, 5, 6) + )); + + context.prepareMemberAssignment("member-2", mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2, 3) + )); + + context.prepareMemberAssignment("member-1", mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5, 6), + mkTopicAssignment(barTopicId, 4, 5, 6) + )); + + TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + + assertUnorderedList(Arrays.asList( + newTargetAssignmentRecord("my-group", "member-1", mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5, 6), + mkTopicAssignment(barTopicId, 4, 5, 6) + )), + newTargetAssignmentRecord("my-group", "member-2", mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2, 3) + )) + ), result.records().subList(0, 2)); + + assertEquals(newTargetAssignmentEpochRecord( + "my-group", + 20 + ), result.records().get(result.records().size() - 1)); + + Map expectedAssignment = new HashMap<>(); + expectedAssignment.put("member-2", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2, 3) + ))); + expectedAssignment.put("member-1", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5, 6), + mkTopicAssignment(barTopicId, 4, 5, 6) + ))); + + assertEquals(expectedAssignment, result.assignments()); + } + + @Test + public void testNewMember() { + Uuid fooTopicId = Uuid.randomUuid(); + Uuid barTopicId = Uuid.randomUuid(); + + TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( + "my-group", + 20 + ); + + context.addTopicMetadata(fooTopicId, "foo", 6); + context.addTopicMetadata(barTopicId, "bar", 6); + + context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2, 3) + )); + + context.addGroupMember("member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5, 6), + mkTopicAssignment(barTopicId, 4, 5, 6) + )); + + context.updateMemberSubscription("member-3", Arrays.asList("foo", "bar", "zar")); + + context.prepareMemberAssignment("member-1", mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2), + mkTopicAssignment(barTopicId, 1, 2) + )); + + context.prepareMemberAssignment("member-2", mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4), + mkTopicAssignment(barTopicId, 3, 4) + )); + + context.prepareMemberAssignment("member-3", mkAssignment( + mkTopicAssignment(fooTopicId, 5, 6), + mkTopicAssignment(barTopicId, 5, 6) + )); + + TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + + assertUnorderedList(Arrays.asList( + newTargetAssignmentRecord("my-group", "member-1", mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2), + mkTopicAssignment(barTopicId, 1, 2) + )), + newTargetAssignmentRecord("my-group", "member-2", mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4), + mkTopicAssignment(barTopicId, 3, 4) + )), + newTargetAssignmentRecord("my-group", "member-3", mkAssignment( + mkTopicAssignment(fooTopicId, 5, 6), + mkTopicAssignment(barTopicId, 5, 6) + )) + ), result.records().subList(0, result.records().size() - 1)); + + assertEquals(newTargetAssignmentEpochRecord( + "my-group", + 20 + ), result.records().get(result.records().size() - 1)); + + Map expectedAssignment = new HashMap<>(); + expectedAssignment.put("member-1", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2), + mkTopicAssignment(barTopicId, 1, 2) + ))); + expectedAssignment.put("member-2", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4), + mkTopicAssignment(barTopicId, 3, 4) + ))); + expectedAssignment.put("member-3", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 5, 6), + mkTopicAssignment(barTopicId, 5, 6) + ))); + + assertEquals(expectedAssignment, result.assignments()); + } + + @Test + public void testUpdateMember() { + Uuid fooTopicId = Uuid.randomUuid(); + Uuid barTopicId = Uuid.randomUuid(); + + TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( + "my-group", + 20 + ); + + context.addTopicMetadata(fooTopicId, "foo", 6); + context.addTopicMetadata(barTopicId, "bar", 6); + + context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2) + )); + + context.addGroupMember("member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5, 6), + mkTopicAssignment(barTopicId, 3, 4) + )); + + context.addGroupMember("member-3", Arrays.asList("bar", "zar"), mkAssignment( + mkTopicAssignment(barTopicId, 5, 6) + )); + + context.updateMemberSubscription( + "member-3", + Arrays.asList("foo", "bar", "zar"), + Optional.of("instance-id-3"), + Optional.of("rack-0") + ); + + context.prepareMemberAssignment("member-1", mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2), + mkTopicAssignment(barTopicId, 1, 2) + )); + + context.prepareMemberAssignment("member-2", mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4), + mkTopicAssignment(barTopicId, 3, 4) + )); + + context.prepareMemberAssignment("member-3", mkAssignment( + mkTopicAssignment(fooTopicId, 5, 6), + mkTopicAssignment(barTopicId, 5, 6) + )); + + TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + + assertUnorderedList(Arrays.asList( + newTargetAssignmentRecord("my-group", "member-1", mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2), + mkTopicAssignment(barTopicId, 1, 2) + )), + newTargetAssignmentRecord("my-group", "member-2", mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4), + mkTopicAssignment(barTopicId, 3, 4) + )), + newTargetAssignmentRecord("my-group", "member-3", mkAssignment( + mkTopicAssignment(fooTopicId, 5, 6), + mkTopicAssignment(barTopicId, 5, 6) + )) + ), result.records().subList(0, result.records().size() - 1)); + + assertEquals(newTargetAssignmentEpochRecord( + "my-group", + 20 + ), result.records().get(result.records().size() - 1)); + + Map expectedAssignment = new HashMap<>(); + expectedAssignment.put("member-1", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2), + mkTopicAssignment(barTopicId, 1, 2) + ))); + expectedAssignment.put("member-2", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4), + mkTopicAssignment(barTopicId, 3, 4) + ))); + expectedAssignment.put("member-3", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 5, 6), + mkTopicAssignment(barTopicId, 5, 6) + ))); + + assertEquals(expectedAssignment, result.assignments()); + } + + @Test + public void testPartialAssignmentUpdate() { + Uuid fooTopicId = Uuid.randomUuid(); + Uuid barTopicId = Uuid.randomUuid(); + + TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( + "my-group", + 20 + ); + + context.addTopicMetadata(fooTopicId, "foo", 6); + context.addTopicMetadata(barTopicId, "bar", 6); + + context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2), + mkTopicAssignment(barTopicId, 1, 2) + )); + + context.addGroupMember("member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4), + mkTopicAssignment(barTopicId, 3, 4) + )); + + context.addGroupMember("member-3", Arrays.asList("bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 5, 6), + mkTopicAssignment(barTopicId, 5, 6) + )); + + context.prepareMemberAssignment("member-1", mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2), + mkTopicAssignment(barTopicId, 1, 2) + )); + + context.prepareMemberAssignment("member-2", mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 3, 4, 5) + )); + + context.prepareMemberAssignment("member-3", mkAssignment( + mkTopicAssignment(fooTopicId, 6), + mkTopicAssignment(barTopicId, 6) + )); + + TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + + // Member 1 has not record because its assignment did not change. + assertUnorderedList(Arrays.asList( + newTargetAssignmentRecord("my-group", "member-2", mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 3, 4, 5) + )), + newTargetAssignmentRecord("my-group", "member-3", mkAssignment( + mkTopicAssignment(fooTopicId, 6), + mkTopicAssignment(barTopicId, 6) + )) + ), result.records().subList(0, result.records().size() - 1)); + + assertEquals(newTargetAssignmentEpochRecord( + "my-group", + 20 + ), result.records().get(result.records().size() - 1)); + + Map expectedAssignment = new HashMap<>(); + expectedAssignment.put("member-1", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2), + mkTopicAssignment(barTopicId, 1, 2) + ))); + expectedAssignment.put("member-2", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 3, 4, 5) + ))); + expectedAssignment.put("member-3", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 6), + mkTopicAssignment(barTopicId, 6) + ))); + + assertEquals(expectedAssignment, result.assignments()); + } + + @Test + public void testDeleteMember() { + Uuid fooTopicId = Uuid.randomUuid(); + Uuid barTopicId = Uuid.randomUuid(); + + TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( + "my-group", + 20 + ); + + context.addTopicMetadata(fooTopicId, "foo", 6); + context.addTopicMetadata(barTopicId, "bar", 6); + + context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2), + mkTopicAssignment(barTopicId, 1, 2) + )); + + context.addGroupMember("member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4), + mkTopicAssignment(barTopicId, 3, 4) + )); + + context.addGroupMember("member-3", Arrays.asList("foo", "bar", "zar"), mkAssignment( + mkTopicAssignment(fooTopicId, 5, 6), + mkTopicAssignment(barTopicId, 5, 6) + )); + + context.removeMemberSubscription("member-3"); + + context.prepareMemberAssignment("member-1", mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2, 3) + )); + + context.prepareMemberAssignment("member-2", mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5, 6), + mkTopicAssignment(barTopicId, 4, 5, 6) + )); + + TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + + assertUnorderedList(Arrays.asList( + newTargetAssignmentRecord("my-group", "member-1", mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2, 3) + )), + newTargetAssignmentRecord("my-group", "member-2", mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5, 6), + mkTopicAssignment(barTopicId, 4, 5, 6) + )) + ), result.records().subList(0, result.records().size() - 1)); + + assertEquals(newTargetAssignmentEpochRecord( + "my-group", + 20 + ), result.records().get(result.records().size() - 1)); + + Map expectedAssignment = new HashMap<>(); + expectedAssignment.put("member-1", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2, 3) + ))); + expectedAssignment.put("member-2", new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5, 6), + mkTopicAssignment(barTopicId, 4, 5, 6) + ))); + + assertEquals(expectedAssignment, result.assignments()); + } + + public static void assertUnorderedList( + List expected, + List actual + ) { + assertEquals(new HashSet<>(expected), new HashSet<>(actual)); + } +} From b150934d3738ccb943f8a85cfdc9e3d1dcf0fa5f Mon Sep 17 00:00:00 2001 From: David Jacot Date: Thu, 27 Apr 2023 14:38:24 +0200 Subject: [PATCH 2/6] address minor comments --- .../group/consumer/ConsumerGroupMember.java | 6 ++-- .../consumer/TargetAssignmentBuilder.java | 20 +++++------ .../consumer/TargetAssignmentBuilderTest.java | 33 ++++++++++++------- 3 files changed, 34 insertions(+), 25 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java index 823368e65e085..b2e4fcb782e63 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java @@ -38,8 +38,8 @@ */ public class ConsumerGroupMember { /** - * A builder allowing to create a new member or update an - * existing one. + * A builder that facilitates the creation of a new member or the update of + * an existing one. * * Please refer to the javadoc of {{@link ConsumerGroupMember}} for the * definition of the fields. @@ -521,7 +521,7 @@ public Map> partitionsPendingRevocation() { } /** - * @return The set of partitions awaiting assigning to the member. + * @return The set of partitions awaiting assignment to the member. */ public Map> partitionsPendingAssignment() { return partitionsPendingAssignment; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java index be7b1bd69dbd9..8a4e04980daa0 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java @@ -49,7 +49,7 @@ * * When a member is deleted, it is assumed that its target assignment record * is deleted as part of the member deletion process. In other words, this class - * does not yield a tombstone for remove members. + * does not yield a tombstone for removed members. */ public class TargetAssignmentBuilder { /** @@ -85,7 +85,7 @@ public List records() { } /** - * @return The assignment. + * @return The assignments. */ public Map assignments() { return assignments; @@ -103,7 +103,7 @@ public Map assignments() { private final int groupEpoch; /** - * The partition assignor to compute the assignment. + * The partition assignor used to compute the assignment. */ private final PartitionAssignor assignor; @@ -126,7 +126,7 @@ public Map assignments() { * The members which have been updated or deleted. Deleted members * are signaled by a null value. */ - private Map updatedMembers = new HashMap<>(); + private final Map updatedMembers = new HashMap<>(); /** * Constructs the object. @@ -148,7 +148,7 @@ public TargetAssignmentBuilder( /** * Adds all the current members. * - * @param members The current members in the consumer groups. + * @param members The current members in the consumer group. * @return This object. */ public TargetAssignmentBuilder withMembers( @@ -172,9 +172,9 @@ public TargetAssignmentBuilder withSubscriptionMetadata( } /** - * Adds the current target assignments. + * Adds the existing target assignments. * - * @param assignments The current assignments. + * @param assignments The existing target assignments. * @return This object. */ public TargetAssignmentBuilder withTargetAssignments( @@ -207,7 +207,7 @@ public TargetAssignmentBuilder withUpdatedMember( * @param memberId The member id. * @return This object. */ - public TargetAssignmentBuilder withRemoveMembers( + public TargetAssignmentBuilder withRemoveMember( String memberId ) { return withUpdatedMember(memberId, null); @@ -217,7 +217,7 @@ public TargetAssignmentBuilder withRemoveMembers( * Builds the new target assignment. * * @return A TargetAssignmentResult which contains the records to update - * the current target assignment. + * the existing target assignment. * @throws PartitionAssignorException if the assignment can not be computed. */ public TargetAssignmentResult build() throws PartitionAssignorException { @@ -267,7 +267,7 @@ public TargetAssignmentResult build() throws PartitionAssignorException { newTargetAssignment.put(memberId, newMemberAssignment); if (oldMemberAssignment == null) { - // If the member had no assignment, we always create a record for him. + // If the member had no assignment, we always create a record for it. records.add(newTargetAssignmentRecord( groupId, memberId, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java index ceb38e549ad66..1c12c370f8281 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java @@ -209,7 +209,7 @@ public TargetAssignmentBuilder.TargetAssignmentResult build() { if (updatedMemberOrNull != null) { builder.withUpdatedMember(memberId, updatedMemberOrNull); } else { - builder.withRemoveMembers(memberId); + builder.withRemoveMember(memberId); } }); @@ -229,15 +229,15 @@ public void testEmpty() { ); TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); - assertEquals(newTargetAssignmentEpochRecord( + assertEquals(Collections.singletonList(newTargetAssignmentEpochRecord( "my-group", 20 - ), result.records().get(result.records().size() - 1)); + )), result.records()); assertEquals(Collections.emptyMap(), result.assignments()); } @Test - public void testAssignmentIsNotChanged() { + public void testAssignmentHasNotChanged() { Uuid fooTopicId = Uuid.randomUuid(); Uuid barTopicId = Uuid.randomUuid(); @@ -324,6 +324,8 @@ public void testAssignmentSwapped() { TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + assertEquals(3, result.records().size()); + assertUnorderedList(Arrays.asList( newTargetAssignmentRecord("my-group", "member-1", mkAssignment( mkTopicAssignment(fooTopicId, 4, 5, 6), @@ -338,7 +340,7 @@ public void testAssignmentSwapped() { assertEquals(newTargetAssignmentEpochRecord( "my-group", 20 - ), result.records().get(result.records().size() - 1)); + ), result.records().get(2)); Map expectedAssignment = new HashMap<>(); expectedAssignment.put("member-2", new Assignment(mkAssignment( @@ -395,6 +397,8 @@ public void testNewMember() { TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + assertEquals(4, result.records().size()); + assertUnorderedList(Arrays.asList( newTargetAssignmentRecord("my-group", "member-1", mkAssignment( mkTopicAssignment(fooTopicId, 1, 2), @@ -408,12 +412,12 @@ public void testNewMember() { mkTopicAssignment(fooTopicId, 5, 6), mkTopicAssignment(barTopicId, 5, 6) )) - ), result.records().subList(0, result.records().size() - 1)); + ), result.records().subList(0, 3)); assertEquals(newTargetAssignmentEpochRecord( "my-group", 20 - ), result.records().get(result.records().size() - 1)); + ), result.records().get(3)); Map expectedAssignment = new HashMap<>(); expectedAssignment.put("member-1", new Assignment(mkAssignment( @@ -565,7 +569,9 @@ public void testPartialAssignmentUpdate() { TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); - // Member 1 has not record because its assignment did not change. + assertEquals(3, result.records().size()); + + // Member 1 has no record because its assignment did not change. assertUnorderedList(Arrays.asList( newTargetAssignmentRecord("my-group", "member-2", mkAssignment( mkTopicAssignment(fooTopicId, 3, 4, 5), @@ -575,12 +581,12 @@ public void testPartialAssignmentUpdate() { mkTopicAssignment(fooTopicId, 6), mkTopicAssignment(barTopicId, 6) )) - ), result.records().subList(0, result.records().size() - 1)); + ), result.records().subList(0, 2)); assertEquals(newTargetAssignmentEpochRecord( "my-group", 20 - ), result.records().get(result.records().size() - 1)); + ), result.records().get(2)); Map expectedAssignment = new HashMap<>(); expectedAssignment.put("member-1", new Assignment(mkAssignment( @@ -641,6 +647,8 @@ public void testDeleteMember() { TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + assertEquals(3, result.records().size()); + assertUnorderedList(Arrays.asList( newTargetAssignmentRecord("my-group", "member-1", mkAssignment( mkTopicAssignment(fooTopicId, 1, 2, 3), @@ -650,12 +658,12 @@ public void testDeleteMember() { mkTopicAssignment(fooTopicId, 4, 5, 6), mkTopicAssignment(barTopicId, 4, 5, 6) )) - ), result.records().subList(0, result.records().size() - 1)); + ), result.records().subList(0, 2)); assertEquals(newTargetAssignmentEpochRecord( "my-group", 20 - ), result.records().get(result.records().size() - 1)); + ), result.records().get(2)); Map expectedAssignment = new HashMap<>(); expectedAssignment.put("member-1", new Assignment(mkAssignment( @@ -674,6 +682,7 @@ public static void assertUnorderedList( List expected, List actual ) { + assertEquals(expected.size(), actual.size()); assertEquals(new HashSet<>(expected), new HashSet<>(actual)); } } From 95c83515955ecb1d56d3bfaa936b9eff0a23dfcb Mon Sep 17 00:00:00 2001 From: David Jacot Date: Fri, 28 Apr 2023 16:03:30 +0200 Subject: [PATCH 3/6] address minor comments --- .../consumer/TargetAssignmentBuilder.java | 48 +++++------ .../consumer/TargetAssignmentBuilderTest.java | 82 +++++++++++++------ 2 files changed, 80 insertions(+), 50 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java index 8a4e04980daa0..c38b837ed49c5 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java @@ -118,7 +118,7 @@ public Map assignments() { private Map subscriptionMetadata = Collections.emptyMap(); /** - * The current target assignment. + * The existing target assignment. */ private Map assignments = Collections.emptyMap(); @@ -146,9 +146,9 @@ public TargetAssignmentBuilder( } /** - * Adds all the current members. + * Adds all the existing members. * - * @param members The current members in the consumer group. + * @param members The existing members in the consumer group. * @return This object. */ public TargetAssignmentBuilder withMembers( @@ -185,18 +185,18 @@ public TargetAssignmentBuilder withTargetAssignments( } /** - * Updates a member. This is useful when the updated member is + * Adds or updates a member. This is useful when the updated member is * not yet materialized in memory. * - * @param memberId The member id. - * @param updatedMember The updated member. + * @param memberId The member id. + * @param member The member to add or update. * @return This object. */ - public TargetAssignmentBuilder withUpdatedMember( + public TargetAssignmentBuilder addOrUpdateMember( String memberId, - ConsumerGroupMember updatedMember + ConsumerGroupMember member ) { - this.updatedMembers.put(memberId, updatedMember); + this.updatedMembers.put(memberId, member); return this; } @@ -207,10 +207,10 @@ public TargetAssignmentBuilder withUpdatedMember( * @param memberId The member id. * @return This object. */ - public TargetAssignmentBuilder withRemoveMember( + public TargetAssignmentBuilder removeMember( String memberId ) { - return withUpdatedMember(memberId, null); + return addOrUpdateMember(memberId, null); } /** @@ -224,22 +224,22 @@ public TargetAssignmentResult build() throws PartitionAssignorException { Map memberSpecs = new HashMap<>(); // Prepare the member spec for all members. - members.forEach((memberId, member) -> addMemberSpec( - memberSpecs, + members.forEach((memberId, member) -> memberSpecs.put(memberId, createAssignmentMemberSpec( member, - assignments.getOrDefault(memberId, Assignment.EMPTY) - )); + assignments.getOrDefault(memberId, Assignment.EMPTY), + subscriptionMetadata + ))); // Update the member spec if updated or deleted members. updatedMembers.forEach((memberId, updatedMemberOrNull) -> { if (updatedMemberOrNull == null) { memberSpecs.remove(memberId); } else { - addMemberSpec( - memberSpecs, + memberSpecs.put(memberId, createAssignmentMemberSpec( updatedMemberOrNull, - assignments.getOrDefault(memberId, Assignment.EMPTY) - ); + assignments.getOrDefault(memberId, Assignment.EMPTY), + subscriptionMetadata + )); } }); @@ -304,10 +304,10 @@ private Assignment newMemberAssignment( } } - private void addMemberSpec( - Map members, + public static AssignmentMemberSpec createAssignmentMemberSpec( ConsumerGroupMember member, - Assignment targetAssignment + Assignment targetAssignment, + Map subscriptionMetadata ) { Set subscribedTopics = new HashSet<>(); member.subscribedTopicNames().forEach(topicName -> { @@ -317,11 +317,11 @@ private void addMemberSpec( } }); - members.put(member.memberId(), new AssignmentMemberSpec( + return new AssignmentMemberSpec( Optional.ofNullable(member.instanceId()), Optional.ofNullable(member.rackId()), subscribedTopics, targetAssignment.partitions() - )); + ); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java index 1c12c370f8281..2a7a32b260537 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java @@ -37,6 +37,7 @@ import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; +import static org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.createAssignmentMemberSpec; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -144,52 +145,40 @@ public TargetAssignmentBuilderTestContext prepareMemberAssignment( return this; } - private AssignmentMemberSpec assignmentMemberSpec( - ConsumerGroupMember member, - Assignment targetAssignment - ) { - Set subscribedTopics = new HashSet<>(); - member.subscribedTopicNames().forEach(topicName -> { - TopicMetadata topicMetadata = subscriptionMetadata.get(topicName); - if (topicMetadata != null) { - subscribedTopics.add(topicMetadata.id()); - } - }); - - return new AssignmentMemberSpec( - Optional.ofNullable(member.instanceId()), - Optional.ofNullable(member.rackId()), - subscribedTopics, - targetAssignment.partitions() - ); - } - public TargetAssignmentBuilder.TargetAssignmentResult build() { // Prepare expected member specs. Map memberSpecs = new HashMap<>(); + + // All the existing members are prepared. members.forEach((memberId, member) -> { - memberSpecs.put(memberId, assignmentMemberSpec( + memberSpecs.put(memberId, createAssignmentMemberSpec( member, - targetAssignments.getOrDefault(memberId, Assignment.EMPTY) + targetAssignments.getOrDefault(memberId, Assignment.EMPTY), + subscriptionMetadata )); }); + // All the updated are added and all the deleted + // members are removed. updatedMembers.forEach((memberId, updatedMemberOrNull) -> { if (updatedMemberOrNull == null) { memberSpecs.remove(memberId); } else { - memberSpecs.put(memberId, assignmentMemberSpec( + memberSpecs.put(memberId, createAssignmentMemberSpec( updatedMemberOrNull, - targetAssignments.getOrDefault(memberId, Assignment.EMPTY) + targetAssignments.getOrDefault(memberId, Assignment.EMPTY), + subscriptionMetadata )); } }); + // Prepare the expected topic metadata. Map topicMetadata = new HashMap<>(); subscriptionMetadata.forEach((topicName, metadata) -> { topicMetadata.put(metadata.id(), new AssignmentTopicMetadata(metadata.numPartitions())); }); + // Prepare the expected assignment spec. AssignmentSpec assignmentSpec = new AssignmentSpec( memberSpecs, topicMetadata @@ -205,22 +194,63 @@ public TargetAssignmentBuilder.TargetAssignmentResult build() { .withSubscriptionMetadata(subscriptionMetadata) .withTargetAssignments(targetAssignments); + // Add the updated members or delete the deleted members. updatedMembers.forEach((memberId, updatedMemberOrNull) -> { if (updatedMemberOrNull != null) { - builder.withUpdatedMember(memberId, updatedMemberOrNull); + builder.addOrUpdateMember(memberId, updatedMemberOrNull); } else { - builder.withRemoveMember(memberId); + builder.removeMember(memberId); } }); + // Execute the builder. TargetAssignmentBuilder.TargetAssignmentResult result = builder.build(); + // Verify that the assignor was called once with the expected + // assignment spec. verify(assignor, times(1)).assign(assignmentSpec); return result; } } + @Test + public void testCreateAssignmentMemberSpec() { + Uuid fooTopicId = Uuid.randomUuid(); + Uuid barTopicId = Uuid.randomUuid(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id") + .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar")) + .setRackId("rackId") + .setInstanceId("instanceId") + .build(); + + Map subscriptionMetadata = new HashMap() { + { + put("foo", new TopicMetadata(fooTopicId, "foo", 5)); + put("bar", new TopicMetadata(barTopicId, "bar", 5)); + } + }; + + Assignment assignment = new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(barTopicId, 1, 2, 3) + )); + + AssignmentMemberSpec assignmentMemberSpec = createAssignmentMemberSpec( + member, + assignment, + subscriptionMetadata + ); + + assertEquals(new AssignmentMemberSpec( + Optional.of("instanceId"), + Optional.of("rackId"), + new HashSet<>(Arrays.asList(fooTopicId, barTopicId)), + assignment.partitions() + ), assignmentMemberSpec); + } + @Test public void testEmpty() { TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( From f2c41472f3f08903f144b871d575bcd1f0d48b43 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Fri, 28 Apr 2023 16:08:24 +0200 Subject: [PATCH 4/6] minor refactor --- .../consumer/TargetAssignmentBuilderTest.java | 70 +++++++------------ 1 file changed, 24 insertions(+), 46 deletions(-) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java index 2a7a32b260537..c405952a93d55 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; import org.junit.jupiter.api.Test; @@ -55,7 +56,7 @@ public static class TargetAssignmentBuilderTestContext { private final Map subscriptionMetadata = new HashMap<>(); private final Map updatedMembers = new HashMap<>(); private final Map targetAssignments = new HashMap<>(); - private final Map assignments = new HashMap<>(); + private final Map assignments = new HashMap<>(); public TargetAssignmentBuilderTestContext( String groupId, @@ -65,7 +66,7 @@ public TargetAssignmentBuilderTestContext( this.groupEpoch = groupEpoch; } - public TargetAssignmentBuilderTestContext addGroupMember( + public void addGroupMember( String memberId, List subscriptions, Map> targetPartitions @@ -80,28 +81,26 @@ public TargetAssignmentBuilderTestContext addGroupMember( targetPartitions, VersionedMetadata.EMPTY )); - - return this; } - public TargetAssignmentBuilderTestContext addTopicMetadata( - Uuid topicId, + public Uuid addTopicMetadata( String topicName, int numPartitions ) { + Uuid topicId = Uuid.randomUuid(); subscriptionMetadata.put(topicName, new TopicMetadata( topicId, topicName, numPartitions )); - return this; + return topicId; } - public TargetAssignmentBuilderTestContext updateMemberSubscription( + public void updateMemberSubscription( String memberId, List subscriptions ) { - return updateMemberSubscription( + updateMemberSubscription( memberId, subscriptions, Optional.empty(), @@ -109,7 +108,7 @@ public TargetAssignmentBuilderTestContext updateMemberSubscription( ); } - public TargetAssignmentBuilderTestContext updateMemberSubscription( + public void updateMemberSubscription( String memberId, List subscriptions, Optional instanceId, @@ -127,22 +126,19 @@ public TargetAssignmentBuilderTestContext updateMemberSubscription( .maybeUpdateInstanceId(instanceId) .maybeUpdateRackId(rackId) .build()); - return this; } - public TargetAssignmentBuilderTestContext removeMemberSubscription( + public void removeMemberSubscription( String memberId ) { this.updatedMembers.put(memberId, null); - return this; } - public TargetAssignmentBuilderTestContext prepareMemberAssignment( + public void prepareMemberAssignment( String memberId, Map> assignment ) { - assignments.put(memberId, new org.apache.kafka.coordinator.group.assignor.MemberAssignment(assignment)); - return this; + assignments.put(memberId, new MemberAssignment(assignment)); } public TargetAssignmentBuilder.TargetAssignmentResult build() { @@ -268,16 +264,13 @@ public void testEmpty() { @Test public void testAssignmentHasNotChanged() { - Uuid fooTopicId = Uuid.randomUuid(); - Uuid barTopicId = Uuid.randomUuid(); - TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( "my-group", 20 ); - context.addTopicMetadata(fooTopicId, "foo", 6); - context.addTopicMetadata(barTopicId, "bar", 6); + Uuid fooTopicId = context.addTopicMetadata("foo", 6); + Uuid barTopicId = context.addTopicMetadata("bar", 6); context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2, 3), @@ -321,16 +314,13 @@ public void testAssignmentHasNotChanged() { @Test public void testAssignmentSwapped() { - Uuid fooTopicId = Uuid.randomUuid(); - Uuid barTopicId = Uuid.randomUuid(); - TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( "my-group", 20 ); - context.addTopicMetadata(fooTopicId, "foo", 6); - context.addTopicMetadata(barTopicId, "bar", 6); + Uuid fooTopicId = context.addTopicMetadata("foo", 6); + Uuid barTopicId = context.addTopicMetadata("bar", 6); context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2, 3), @@ -387,16 +377,13 @@ public void testAssignmentSwapped() { @Test public void testNewMember() { - Uuid fooTopicId = Uuid.randomUuid(); - Uuid barTopicId = Uuid.randomUuid(); - TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( "my-group", 20 ); - context.addTopicMetadata(fooTopicId, "foo", 6); - context.addTopicMetadata(barTopicId, "bar", 6); + Uuid fooTopicId = context.addTopicMetadata("foo", 6); + Uuid barTopicId = context.addTopicMetadata("bar", 6); context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2, 3), @@ -468,16 +455,13 @@ public void testNewMember() { @Test public void testUpdateMember() { - Uuid fooTopicId = Uuid.randomUuid(); - Uuid barTopicId = Uuid.randomUuid(); - TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( "my-group", 20 ); - context.addTopicMetadata(fooTopicId, "foo", 6); - context.addTopicMetadata(barTopicId, "bar", 6); + Uuid fooTopicId = context.addTopicMetadata("foo", 6); + Uuid barTopicId = context.addTopicMetadata("bar", 6); context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2, 3), @@ -556,16 +540,13 @@ public void testUpdateMember() { @Test public void testPartialAssignmentUpdate() { - Uuid fooTopicId = Uuid.randomUuid(); - Uuid barTopicId = Uuid.randomUuid(); - TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( "my-group", 20 ); - context.addTopicMetadata(fooTopicId, "foo", 6); - context.addTopicMetadata(barTopicId, "bar", 6); + Uuid fooTopicId = context.addTopicMetadata("foo", 6); + Uuid barTopicId = context.addTopicMetadata("bar", 6); context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2), @@ -637,16 +618,13 @@ public void testPartialAssignmentUpdate() { @Test public void testDeleteMember() { - Uuid fooTopicId = Uuid.randomUuid(); - Uuid barTopicId = Uuid.randomUuid(); - TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext( "my-group", 20 ); - context.addTopicMetadata(fooTopicId, "foo", 6); - context.addTopicMetadata(barTopicId, "bar", 6); + Uuid fooTopicId = context.addTopicMetadata("foo", 6); + Uuid barTopicId = context.addTopicMetadata("bar", 6); context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2), From 1785865ad4a2bb47c4d44ccbd5492fa1626109a8 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Fri, 28 Apr 2023 16:30:40 +0200 Subject: [PATCH 5/6] small cleanup --- .../group/consumer/TargetAssignmentBuilderTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java index c405952a93d55..5cb0fa3a47ae4 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java @@ -501,6 +501,8 @@ public void testUpdateMember() { TargetAssignmentBuilder.TargetAssignmentResult result = context.build(); + assertEquals(4, result.records().size()); + assertUnorderedList(Arrays.asList( newTargetAssignmentRecord("my-group", "member-1", mkAssignment( mkTopicAssignment(fooTopicId, 1, 2), @@ -514,12 +516,12 @@ public void testUpdateMember() { mkTopicAssignment(fooTopicId, 5, 6), mkTopicAssignment(barTopicId, 5, 6) )) - ), result.records().subList(0, result.records().size() - 1)); + ), result.records().subList(0, 3)); assertEquals(newTargetAssignmentEpochRecord( "my-group", 20 - ), result.records().get(result.records().size() - 1)); + ), result.records().get(3)); Map expectedAssignment = new HashMap<>(); expectedAssignment.put("member-1", new Assignment(mkAssignment( From 9516e8cbce256b9ad5cce92521d81e42f85225b8 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Tue, 2 May 2023 14:07:15 +0200 Subject: [PATCH 6/6] address minor comments --- .../consumer/TargetAssignmentBuilder.java | 40 +++++++++---------- .../consumer/TargetAssignmentBuilderTest.java | 31 +++++++------- 2 files changed, 35 insertions(+), 36 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java index c38b837ed49c5..02b120db1ef32 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java @@ -63,18 +63,18 @@ public static class TargetAssignmentResult { private final List records; /** - * The new target assignment for all members. + * The new target assignment for the group. */ - private final Map assignments; + private final Map targetAssignment; TargetAssignmentResult( List records, - Map assignments + Map targetAssignment ) { Objects.requireNonNull(records); - Objects.requireNonNull(assignments); + Objects.requireNonNull(targetAssignment); this.records = records; - this.assignments = assignments; + this.targetAssignment = targetAssignment; } /** @@ -85,10 +85,10 @@ public List records() { } /** - * @return The assignments. + * @return The target assignment. */ - public Map assignments() { - return assignments; + public Map targetAssignment() { + return targetAssignment; } } @@ -120,7 +120,7 @@ public Map assignments() { /** * The existing target assignment. */ - private Map assignments = Collections.emptyMap(); + private Map targetAssignment = Collections.emptyMap(); /** * The members which have been updated or deleted. Deleted members @@ -172,15 +172,15 @@ public TargetAssignmentBuilder withSubscriptionMetadata( } /** - * Adds the existing target assignments. + * Adds the existing target assignment. * - * @param assignments The existing target assignments. + * @param targetAssignment The existing target assignment. * @return This object. */ - public TargetAssignmentBuilder withTargetAssignments( - Map assignments + public TargetAssignmentBuilder withTargetAssignment( + Map targetAssignment ) { - this.assignments = assignments; + this.targetAssignment = targetAssignment; return this; } @@ -218,7 +218,7 @@ public TargetAssignmentBuilder removeMember( * * @return A TargetAssignmentResult which contains the records to update * the existing target assignment. - * @throws PartitionAssignorException if the assignment can not be computed. + * @throws PartitionAssignorException if the target assignment cannot be computed. */ public TargetAssignmentResult build() throws PartitionAssignorException { Map memberSpecs = new HashMap<>(); @@ -226,7 +226,7 @@ public TargetAssignmentResult build() throws PartitionAssignorException { // Prepare the member spec for all members. members.forEach((memberId, member) -> memberSpecs.put(memberId, createAssignmentMemberSpec( member, - assignments.getOrDefault(memberId, Assignment.EMPTY), + targetAssignment.getOrDefault(memberId, Assignment.EMPTY), subscriptionMetadata ))); @@ -237,7 +237,7 @@ public TargetAssignmentResult build() throws PartitionAssignorException { } else { memberSpecs.put(memberId, createAssignmentMemberSpec( updatedMemberOrNull, - assignments.getOrDefault(memberId, Assignment.EMPTY), + targetAssignment.getOrDefault(memberId, Assignment.EMPTY), subscriptionMetadata )); } @@ -255,13 +255,13 @@ public TargetAssignmentResult build() throws PartitionAssignorException { Collections.unmodifiableMap(topics) )); - // Compute delta from previous to new assignment and create the + // Compute delta from previous to new target assignment and create the // relevant records. List records = new ArrayList<>(); Map newTargetAssignment = new HashMap<>(); memberSpecs.keySet().forEach(memberId -> { - Assignment oldMemberAssignment = assignments.get(memberId); + Assignment oldMemberAssignment = targetAssignment.get(memberId); Assignment newMemberAssignment = newMemberAssignment(newGroupAssignment, memberId); newTargetAssignment.put(memberId, newMemberAssignment); @@ -286,7 +286,7 @@ public TargetAssignmentResult build() throws PartitionAssignorException { } }); - // Bump the assignment epoch. + // Bump the target assignment epoch. records.add(newTargetAssignmentEpochRecord(groupId, groupEpoch)); return new TargetAssignmentResult(records, newTargetAssignment); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java index 5cb0fa3a47ae4..a498af6ab8a40 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java @@ -55,8 +55,8 @@ public static class TargetAssignmentBuilderTestContext { private final Map members = new HashMap<>(); private final Map subscriptionMetadata = new HashMap<>(); private final Map updatedMembers = new HashMap<>(); - private final Map targetAssignments = new HashMap<>(); - private final Map assignments = new HashMap<>(); + private final Map targetAssignment = new HashMap<>(); + private final Map memberAssignments = new HashMap<>(); public TargetAssignmentBuilderTestContext( String groupId, @@ -73,10 +73,9 @@ public void addGroupMember( ) { members.put(memberId, new ConsumerGroupMember.Builder(memberId) .setSubscribedTopicNames(subscriptions) - .setRebalanceTimeoutMs(5000) .build()); - targetAssignments.put(memberId, new Assignment( + targetAssignment.put(memberId, new Assignment( (byte) 0, targetPartitions, VersionedMetadata.EMPTY @@ -138,7 +137,7 @@ public void prepareMemberAssignment( String memberId, Map> assignment ) { - assignments.put(memberId, new MemberAssignment(assignment)); + memberAssignments.put(memberId, new MemberAssignment(assignment)); } public TargetAssignmentBuilder.TargetAssignmentResult build() { @@ -149,7 +148,7 @@ public TargetAssignmentBuilder.TargetAssignmentResult build() { members.forEach((memberId, member) -> { memberSpecs.put(memberId, createAssignmentMemberSpec( member, - targetAssignments.getOrDefault(memberId, Assignment.EMPTY), + targetAssignment.getOrDefault(memberId, Assignment.EMPTY), subscriptionMetadata )); }); @@ -162,7 +161,7 @@ public TargetAssignmentBuilder.TargetAssignmentResult build() { } else { memberSpecs.put(memberId, createAssignmentMemberSpec( updatedMemberOrNull, - targetAssignments.getOrDefault(memberId, Assignment.EMPTY), + targetAssignment.getOrDefault(memberId, Assignment.EMPTY), subscriptionMetadata )); } @@ -182,13 +181,13 @@ public TargetAssignmentBuilder.TargetAssignmentResult build() { // We use `any` here to always return an assignment but use `verify` later on // to ensure that the input was correct. - when(assignor.assign(any())).thenReturn(new GroupAssignment(assignments)); + when(assignor.assign(any())).thenReturn(new GroupAssignment(memberAssignments)); // Create and populate the assignment builder. TargetAssignmentBuilder builder = new TargetAssignmentBuilder(groupId, groupEpoch, assignor) .withMembers(members) .withSubscriptionMetadata(subscriptionMetadata) - .withTargetAssignments(targetAssignments); + .withTargetAssignment(targetAssignment); // Add the updated members or delete the deleted members. updatedMembers.forEach((memberId, updatedMemberOrNull) -> { @@ -259,7 +258,7 @@ public void testEmpty() { "my-group", 20 )), result.records()); - assertEquals(Collections.emptyMap(), result.assignments()); + assertEquals(Collections.emptyMap(), result.targetAssignment()); } @Test @@ -309,7 +308,7 @@ public void testAssignmentHasNotChanged() { mkTopicAssignment(barTopicId, 4, 5, 6) ))); - assertEquals(expectedAssignment, result.assignments()); + assertEquals(expectedAssignment, result.targetAssignment()); } @Test @@ -372,7 +371,7 @@ public void testAssignmentSwapped() { mkTopicAssignment(barTopicId, 4, 5, 6) ))); - assertEquals(expectedAssignment, result.assignments()); + assertEquals(expectedAssignment, result.targetAssignment()); } @Test @@ -450,7 +449,7 @@ public void testNewMember() { mkTopicAssignment(barTopicId, 5, 6) ))); - assertEquals(expectedAssignment, result.assignments()); + assertEquals(expectedAssignment, result.targetAssignment()); } @Test @@ -537,7 +536,7 @@ public void testUpdateMember() { mkTopicAssignment(barTopicId, 5, 6) ))); - assertEquals(expectedAssignment, result.assignments()); + assertEquals(expectedAssignment, result.targetAssignment()); } @Test @@ -615,7 +614,7 @@ public void testPartialAssignmentUpdate() { mkTopicAssignment(barTopicId, 6) ))); - assertEquals(expectedAssignment, result.assignments()); + assertEquals(expectedAssignment, result.targetAssignment()); } @Test @@ -685,7 +684,7 @@ public void testDeleteMember() { mkTopicAssignment(barTopicId, 4, 5, 6) ))); - assertEquals(expectedAssignment, result.assignments()); + assertEquals(expectedAssignment, result.targetAssignment()); } public static void assertUnorderedList(