diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AbstractUniformAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AbstractUniformAssignmentBuilder.java deleted file mode 100644 index 2c1711330877e..0000000000000 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AbstractUniformAssignmentBuilder.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.coordinator.group.assignor; - -import org.apache.kafka.common.Uuid; -import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; -import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; -import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; -import org.apache.kafka.server.common.TopicIdPartition; - -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -/** - * The assignment builder is used to construct the target assignment based on the members' subscriptions. - */ -public abstract class AbstractUniformAssignmentBuilder { - protected abstract GroupAssignment buildAssignment(); - - /** - * Adds the topic's partition to the member's target assignment. - */ - protected static void addPartitionToAssignment( - Map memberAssignments, - String memberId, - Uuid topicId, - int partition - ) { - memberAssignments.get(memberId) - .partitions() - .computeIfAbsent(topicId, __ -> new HashSet<>()) - .add(partition); - } - - /** - * Constructs a set of {@code TopicIdPartition} including all the given topic Ids based on their partition counts. - * - * @param topicIds Collection of topic Ids. - * @param subscribedTopicDescriber Describer to fetch partition counts for topics. - * - * @return Set of {@code TopicIdPartition} including all the provided topic Ids. - */ - protected static Set topicIdPartitions( - Collection topicIds, - SubscribedTopicDescriber subscribedTopicDescriber - ) { - return topicIds.stream() - .flatMap(topic -> IntStream - .range(0, subscribedTopicDescriber.numPartitions(topic)) - .mapToObj(i -> new TopicIdPartition(topic, i)) - ).collect(Collectors.toSet()); - } -} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java index 8ae76313676fa..61f7d3aec4049 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java @@ -35,19 +35,19 @@ * subscriptions across the group members: *
    *
  • - * Optimized Uniform Assignment Builder: This strategy is used when all members have subscribed + * Uniform Homogeneous Assignment Builder: This strategy is used when all members have subscribed * to the same set of topics. *
  • *
  • - * General Uniform Assignment Builder: This strategy is used when members have varied topic + * Uniform Heterogeneous Assignment Builder: This strategy is used when members have varied topic * subscriptions. *
  • *
* * The appropriate strategy is automatically chosen based on the current members' topic subscriptions. * - * @see OptimizedUniformAssignmentBuilder - * @see GeneralUniformAssignmentBuilder + * @see UniformHomogeneousAssignmentBuilder + * @see UniformHeterogeneousAssignmentBuilder */ public class UniformAssignor implements ConsumerGroupPartitionAssignor { private static final Logger LOG = LoggerFactory.getLogger(UniformAssignor.class); @@ -76,14 +76,14 @@ public GroupAssignment assign( if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) { LOG.debug("Detected that all members are subscribed to the same set of topics, invoking the " - + "optimized assignment algorithm"); - return new OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber) + + "homogeneous assignment algorithm"); + return new UniformHomogeneousAssignmentBuilder(groupSpec, subscribedTopicDescriber) .build(); } else { LOG.debug("Detected that the members are subscribed to different sets of topics, invoking the " - + "general assignment algorithm"); - return new GeneralUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber) - .buildAssignment(); + + "heterogeneous assignment algorithm"); + return new UniformHeterogeneousAssignmentBuilder(groupSpec, subscribedTopicDescriber) + .build(); } } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java similarity index 95% rename from group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java rename to group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java index 46c4e363e568d..682dbbd677d00 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java @@ -41,7 +41,7 @@ import java.util.stream.Collectors; /** - * The general uniform assignment builder is used to generate the target assignment for a consumer group with + * The heterogeneous uniform assignment builder is used to generate the target assignment for a consumer group with * at least one of its members subscribed to a different set of topics. * * Assignments are done according to the following principles: @@ -55,8 +55,8 @@ * This assignment builder prioritizes the above properties in the following order: * Balance > Stickiness. */ -public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder { - private static final Logger LOG = LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class); +public class UniformHeterogeneousAssignmentBuilder { + private static final Logger LOG = LoggerFactory.getLogger(UniformHeterogeneousAssignmentBuilder.class); /** * The group metadata specification. @@ -113,7 +113,7 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu */ private final PartitionMovements partitionMovements; - public GeneralUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) { + public UniformHeterogeneousAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) { this.groupSpec = groupSpec; this.subscribedTopicDescriber = subscribedTopicDescriber; this.subscribedTopicIds = new HashSet<>(); @@ -133,7 +133,7 @@ public GeneralUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescr targetAssignment.put(memberId, new MemberAssignmentImpl(new HashMap<>())); }) ); - this.unassignedPartitions = new HashSet<>(topicIdPartitions(subscribedTopicIds, subscribedTopicDescriber)); + this.unassignedPartitions = topicIdPartitions(subscribedTopicIds, subscribedTopicDescriber); this.assignedStickyPartitions = new HashSet<>(); this.assignmentManager = new AssignmentManager(this.subscribedTopicDescriber); this.sortedMembersByAssignmentSize = assignmentManager.sortMembersByAssignmentSize(groupSpec.memberIds()); @@ -148,8 +148,7 @@ public GeneralUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescr *
  • Allocate all the remaining unassigned partitions to the members in a balanced manner.
  • *
  • Iterate through the assignment until it is balanced.
  • */ - @Override - protected GroupAssignment buildAssignment() { + public GroupAssignment build() { if (subscribedTopicIds.isEmpty()) { LOG.info("The subscription list is empty, returning an empty assignment"); return new GroupAssignment(Collections.emptyMap()); @@ -462,6 +461,43 @@ private void processPartitionMovement(TopicIdPartition topicIdPartition, String assignmentManager.addPartitionToTargetAssignment(topicIdPartition, newMember); } + /** + * Adds the topic's partition to the member's target assignment. + */ + private static void addPartitionToAssignment( + Map memberAssignments, + String memberId, + Uuid topicId, + int partition + ) { + memberAssignments.get(memberId) + .partitions() + .computeIfAbsent(topicId, __ -> new HashSet<>()) + .add(partition); + } + + /** + * Constructs a set of {@code TopicIdPartition} including all the given topic Ids based on their partition counts. + * + * @param topicIds Collection of topic Ids. + * @param subscribedTopicDescriber Describer to fetch partition counts for topics. + * + * @return Set of {@code TopicIdPartition} including all the provided topic Ids. + */ + private static Set topicIdPartitions( + Collection topicIds, + SubscribedTopicDescriber subscribedTopicDescriber + ) { + Set topicIdPartitions = new HashSet<>(); + for (Uuid topicId : topicIds) { + int numPartitions = subscribedTopicDescriber.numPartitions(topicId); + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + topicIdPartitions.add(new TopicIdPartition(topicId, partitionId)); + } + } + return topicIdPartitions; + } + /** * This class represents a pair of member Ids involved in a partition reassignment. * Each pair contains a source and a destination member Id. diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHomogeneousAssignmentBuilder.java similarity index 97% rename from group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java rename to group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHomogeneousAssignmentBuilder.java index 8631b3ce9ae93..e69f2a4d82b55 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHomogeneousAssignmentBuilder.java @@ -34,7 +34,7 @@ import java.util.Set; /** - * The homogenous uniform assignment builder is used to generate the target assignment for a consumer group with + * The homogeneous uniform assignment builder is used to generate the target assignment for a consumer group with * all its members subscribed to the same set of topics. * * Assignments are done according to the following principles: @@ -48,7 +48,7 @@ * The assignment builder prioritizes the properties in the following order: * Balance > Stickiness. */ -public class OptimizedUniformAssignmentBuilder { +public class UniformHomogeneousAssignmentBuilder { private static final Class UNMODIFIABLE_MAP_CLASS = Collections.unmodifiableMap(new HashMap<>()).getClass(); private static final Class EMPTY_MAP_CLASS = Collections.emptyMap().getClass(); @@ -104,7 +104,7 @@ private static boolean isImmutableMap(Map map) { */ private int remainingMembersToGetAnExtraPartition; - OptimizedUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) { + UniformHomogeneousAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) { this.groupSpec = groupSpec; this.subscribedTopicDescriber = subscribedTopicDescriber; this.subscribedTopicIds = new HashSet<>(groupSpec.memberSubscription(groupSpec.memberIds().iterator().next()) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java similarity index 99% rename from group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java rename to group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java index d0275f8615a31..4e48de28d4815 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java @@ -44,7 +44,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -public class GeneralUniformAssignmentBuilderTest { +public class UniformHeterogeneousAssignmentBuilderTest { private final UniformAssignor assignor = new UniformAssignor(); private final Uuid topic1Uuid = Uuid.fromString("T1-A4s3VTwiI5CTbEp6POw"); private final Uuid topic2Uuid = Uuid.fromString("T2-B4s3VTwiI5YHbPp6YUe"); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ClientSideAssignorBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ClientSideAssignorBenchmark.java index 636f2a9e8a7c0..9cee98371a7b8 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ClientSideAssignorBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ClientSideAssignorBenchmark.java @@ -76,7 +76,7 @@ public ConsumerPartitionAssignor assignor() { /** * The subscription pattern followed by the members of the group. * - * A subscription model is considered homogenous if all the members of the group + * A subscription model is considered homogeneous if all the members of the group * are subscribed to the same set of topics, it is heterogeneous otherwise. */ public enum SubscriptionModel {