From 24daac91dedcc6ab75b3d6be063310f5b8210e0f Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Tue, 6 Jul 2021 20:28:29 -0700 Subject: [PATCH 1/9] several fixes for better handling of invalid input and bug in assignment with unfilledMembers at minQuota --- .../consumer/CooperativeStickyAssignor.java | 31 +++++- .../internals/AbstractStickyAssignor.java | 105 ++++++++++++++---- .../CooperativeStickyAssignorTest.java | 76 ++++++++++++- .../clients/consumer/StickyAssignorTest.java | 49 +++++++- .../internals/AbstractStickyAssignorTest.java | 73 ++++++++++-- 5 files changed, 292 insertions(+), 42 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java index c7c0679575a9b..3e9864a152532 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -43,6 +44,8 @@ */ public class CooperativeStickyAssignor extends AbstractStickyAssignor { + private int generation = DEFAULT_GENERATION; // consumer group generation + @Override public String name() { return "cooperative-sticky"; @@ -53,9 +56,35 @@ public List supportedProtocols() { return Arrays.asList(RebalanceProtocol.COOPERATIVE, RebalanceProtocol.EAGER); } + @Override + public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) { + this.generation = metadata.generationId(); + } + + @Override + public ByteBuffer subscriptionUserData(Set topics) { + if (generation == DEFAULT_GENERATION) { + return ByteBuffer.allocate(0); + } else { + ByteBuffer buffer = ByteBuffer.allocate(4); + buffer.putInt(generation); + buffer.flip(); + return buffer; + } + } + @Override protected MemberData memberData(Subscription subscription) { - return new MemberData(subscription.ownedPartitions(), Optional.empty()); + ByteBuffer buffer = subscription.userData(); + buffer.rewind(); + + Optional encodedGeneration; + if (buffer.hasRemaining()) { + encodedGeneration = Optional.of(buffer.getInt()); + } else { + encodedGeneration = Optional.empty(); + } + return new MemberData(subscription.ownedPartitions(), encodedGeneration); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java index e111aa62599e4..074ad64fc174b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java @@ -71,14 +71,16 @@ public MemberData(List partitions, Optional generation) public Map> assign(Map partitionsPerTopic, Map subscriptions) { Map> consumerToOwnedPartitions = new HashMap<>(); - if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, consumerToOwnedPartitions)) { + Set partitionsWithMultiplePreviousOwners = new HashSet<>(); + if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, consumerToOwnedPartitions, partitionsWithMultiplePreviousOwners)) { log.debug("Detected that all consumers were subscribed to same set of topics, invoking the " + "optimized assignment algorithm"); partitionsTransferringOwnership = new HashMap<>(); - return constrainedAssign(partitionsPerTopic, consumerToOwnedPartitions); + return constrainedAssign(partitionsPerTopic, consumerToOwnedPartitions, partitionsWithMultiplePreviousOwners); } else { log.debug("Detected that not all consumers were subscribed to same set of topics, falling back to the " + "general case assignment algorithm"); + // we must set this to null for the general case so the cooperative assignor knows to compute it from scratch partitionsTransferringOwnership = null; return generalAssign(partitionsPerTopic, subscriptions, consumerToOwnedPartitions); } @@ -86,17 +88,23 @@ public Map> assign(Map partitionsP /** * Returns true iff all consumers have an identical subscription. Also fills out the passed in - * {@code consumerToOwnedPartitions} with each consumer's previously owned and still-subscribed partitions + * {@code consumerToOwnedPartitions} with each consumer's previously owned and still-subscribed partitions, + * and the {@code partitionsWithMultiplePreviousOwners} with any partitions claimed by multiple previous owners */ private boolean allSubscriptionsEqual(Set allTopics, Map subscriptions, - Map> consumerToOwnedPartitions) { + Map> consumerToOwnedPartitions, + Set partitionsWithMultiplePreviousOwners) { Set membersWithOldGeneration = new HashSet<>(); Set membersOfCurrentHighestGeneration = new HashSet<>(); boolean isAllSubscriptionsEqual = true; Set subscribedTopics = new HashSet<>(); + // keep track of all previously owned partitions so we can invalidate them if invalid input is + // detected, eg two consumers somehow claiming the same partition in the same/current generation + Map allPreviousPartitionsToOwner = new HashMap<>(); + for (Map.Entry subscriptionEntry : subscriptions.entrySet()) { String consumer = subscriptionEntry.getKey(); Subscription subscription = subscriptionEntry.getValue(); @@ -122,6 +130,13 @@ private boolean allSubscriptionsEqual(Set allTopics, // If the current member's generation is higher, all the previously owned partitions are invalid if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) { membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration); + + allPreviousPartitionsToOwner.clear(); + partitionsWithMultiplePreviousOwners.clear(); + for (String droppedOutConsumer : membersWithOldGeneration) { + consumerToOwnedPartitions.get(droppedOutConsumer).clear(); + } + membersOfCurrentHighestGeneration.clear(); maxGeneration = memberData.generation.get(); } @@ -130,19 +145,26 @@ private boolean allSubscriptionsEqual(Set allTopics, for (final TopicPartition tp : memberData.partitions) { // filter out any topics that no longer exist or aren't part of the current subscription if (allTopics.contains(tp.topic())) { - ownedPartitions.add(tp); + + if (!allPreviousPartitionsToOwner.containsKey(tp)) { + allPreviousPartitionsToOwner.put(tp, consumer); + ownedPartitions.add(tp); + } else { + String otherConsumer = allPreviousPartitionsToOwner.get(tp); + log.warn("Found multiple consumers {} and {} claiming the same TopicPartition {} in the " + + "same generation, this will be invalidated and removed from their previous assignment.", + consumer, otherConsumer, tp); + consumerToOwnedPartitions.get(otherConsumer).remove(tp); + partitionsWithMultiplePreviousOwners.add(tp); + } } } } } - for (String consumer : membersWithOldGeneration) { - consumerToOwnedPartitions.get(consumer).clear(); - } return isAllSubscriptionsEqual; } - /** * This constrainedAssign optimizes the assignment algorithm when all consumers were subscribed to same set of topics. * The method includes the following steps: @@ -154,13 +176,15 @@ private boolean allSubscriptionsEqual(Set allTopics, * we're still under the number of expected max capacity members * 2. Fill remaining members up to the expected numbers of maxQuota partitions, otherwise, to minQuota partitions * - * @param partitionsPerTopic The number of partitions for each subscribed topic - * @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions + * @param partitionsPerTopic The number of partitions for each subscribed topic + * @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions + * @param partitionsWithMultiplePreviousOwners The partitions being claimed in the previous assignment of multiple consumers * * @return Map from each member to the list of partitions assigned to them. */ private Map> constrainedAssign(Map partitionsPerTopic, - Map> consumerToOwnedPartitions) { + Map> consumerToOwnedPartitions, + Set partitionsWithMultiplePreviousOwners) { if (log.isDebugEnabled()) { log.debug("performing constrained assign. partitionsPerTopic: {}, consumerToOwnedPartitions: {}", partitionsPerTopic, consumerToOwnedPartitions); @@ -169,7 +193,9 @@ private Map> constrainedAssign(Map Set allRevokedPartitions = new HashSet<>(); // the consumers not yet at expected capacity - List unfilledMembers = new LinkedList<>(); + Set unfilledMembers = new HashSet<>(); + // the consumers with exactly minQuota, who may be assigned more until we reach the expected number with maxQuota + Set unfilledMembersWithMinQuota = new HashSet<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum); @@ -186,16 +212,25 @@ private Map> constrainedAssign(Map consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota)))); List assignedPartitions = new ArrayList<>(); - // Reassign previously owned partitions to the expected number + // Reassign previously owned partitions, up to the expected number of partitions per consumer for (Map.Entry> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List ownedPartitions = consumerEntry.getValue(); List consumerAssignment = assignment.get(consumer); + for (TopicPartition doublyClaimedPartition : partitionsWithMultiplePreviousOwners) { + if (ownedPartitions.contains(doublyClaimedPartition)) { + log.warn("Found partition {} still claimed as owned by consumer {}, despite being claimed by multiple" + + "consumers already in the same generation. Removing it from the ownedPartitions", + doublyClaimedPartition, consumer); + ownedPartitions.remove(doublyClaimedPartition); + } + } + if (ownedPartitions.size() < minQuota) { - // the expected assignment size is more than consumer have now, so keep all the owned partitions - // and put this member into unfilled member list + // the expected assignment size is more than this consumer has now, so keep all the owned partitions + // and put this member into the unfilled member list if (ownedPartitions.size() > 0) { consumerAssignment.addAll(ownedPartitions); assignedPartitions.addAll(ownedPartitions); @@ -205,6 +240,10 @@ private Map> constrainedAssign(Map // consumer owned the "maxQuota" of partitions or more, and we're still under the number of expected members // with more than the minQuota partitions, so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions numMembersAssignedOverMinQuota++; + if (numMembersAssignedOverMinQuota == expectedNumMembersAssignedOverMinQuota) { + unfilledMembers.removeAll(unfilledMembersWithMinQuota); + unfilledMembersWithMinQuota.clear(); + } List maxQuotaPartitions = ownedPartitions.subList(0, maxQuota); consumerAssignment.addAll(maxQuotaPartitions); assignedPartitions.addAll(maxQuotaPartitions); @@ -218,43 +257,58 @@ private Map> constrainedAssign(Map allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size())); // this consumer is potential maxQuota candidate since we're still under the number of expected members // with more than the minQuota partitions. Note, if the number of expected members with more than - // the minQuota partitions is 0, it means minQuota == maxQuota, so they won't be put into unfilledMembers + // the minQuota partitions is 0, it means minQuota == maxQuota, and these members are definitely filled if (numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) { unfilledMembers.add(consumer); + unfilledMembersWithMinQuota.add(consumer); } } } List unassignedPartitions = getUnassignedPartitions(totalPartitionsCount, partitionsPerTopic, assignedPartitions); - assignedPartitions = null; if (log.isDebugEnabled()) { log.debug("After reassigning previously owned partitions, unfilled members: {}, unassigned partitions: {}, " + "current assignment: {}", unfilledMembers, unassignedPartitions, assignment); } - Collections.sort(unfilledMembers); - Iterator unfilledConsumerIter = unfilledMembers.iterator(); + // flag to inform that we need to remove all members with minQuota partitions from the unfilledMembers, after + // just reaching the expected number of members allowed to go up to maxQuota + boolean filterMinQuotaMembers = false; // Round-Robin filling remaining members up to the expected numbers of maxQuota, otherwise, to minQuota for (TopicPartition unassignedPartition : unassignedPartitions) { if (!unfilledConsumerIter.hasNext()) { if (unfilledMembers.isEmpty()) { - // Should not enter here since we have calculated the exact number to assign to each consumer - // There might be issues in the assigning algorithm, or maybe assigning the same partition to two owners. + // Should not enter here since we have calculated the exact number to assign to each consumer. + // This indicates issues in the assignment algorithm int currentPartitionIndex = unassignedPartitions.indexOf(unassignedPartition); log.error("No more unfilled consumers to be assigned. The remaining unassigned partitions are: {}", - unassignedPartitions.subList(currentPartitionIndex, unassignedPartitions.size())); + unassignedPartitions.subList(currentPartitionIndex, unassignedPartitions.size())); throw new IllegalStateException("No more unfilled consumers to be assigned."); } + if (filterMinQuotaMembers) { + unfilledMembers.removeAll(unfilledMembersWithMinQuota); + unfilledMembersWithMinQuota.clear(); + filterMinQuotaMembers = false; + } unfilledConsumerIter = unfilledMembers.iterator(); } + String consumer = unfilledConsumerIter.next(); + if (filterMinQuotaMembers && unfilledMembersWithMinQuota.contains(consumer)) { + unfilledConsumerIter.remove(); + unfilledMembersWithMinQuota.remove(consumer); + continue; + } + List consumerAssignment = assignment.get(consumer); consumerAssignment.add(unassignedPartition); // We already assigned all possible ownedPartitions, so we know this must be newly assigned to this consumer - if (allRevokedPartitions.contains(unassignedPartition)) + // or else the partition was actually claimed by multiple previous owners and had to be invalidated from all + // members claimed ownedPartitions + if (allRevokedPartitions.contains(unassignedPartition) || partitionsWithMultiplePreviousOwners.contains(unassignedPartition)) partitionsTransferringOwnership.put(unassignedPartition, consumer); int currentAssignedCount = consumerAssignment.size(); @@ -262,6 +316,9 @@ private Map> constrainedAssign(Map if (currentAssignedCount == expectedAssignedCount) { if (currentAssignedCount == maxQuota) { numMembersAssignedOverMinQuota++; + if (numMembersAssignedOverMinQuota == expectedNumMembersAssignedOverMinQuota) { + filterMinQuotaMembers = true; + } } unfilledConsumerIter.remove(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java index d7d671e1a2abe..795a6b69b4e07 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java @@ -16,16 +16,23 @@ */ package org.apache.kafka.clients.consumer; -import static org.junit.jupiter.api.Assertions.assertTrue; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription; +import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor; +import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest; +import org.apache.kafka.common.TopicPartition; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; -import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription; -import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor; -import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest; -import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static java.util.Collections.emptyList; public class CooperativeStickyAssignorTest extends AbstractStickyAssignorTest { @@ -39,6 +46,65 @@ public Subscription buildSubscription(List topics, List return new Subscription(topics, assignor.subscriptionUserData(new HashSet<>(topics)), partitions); } + @Override + public Subscription buildSubscriptionWithGeneration(List topics, List partitions, int generation) { + assignor.onAssignment(null, new ConsumerGroupMetadata("dummy-group-id", generation, "dummy-member-id", Optional.empty())); + return new Subscription(topics, assignor.subscriptionUserData(new HashSet<>(topics)), partitions); + } + + @Test + public void testEncodeAndDecodeGeneration() { + Subscription subscription = new Subscription(topics(topic), assignor.subscriptionUserData(new HashSet<>(topics(topic)))); + + // initially the generation defaults to NO_GENERATION, which gets encoded as Optional.empty() + assertFalse(((CooperativeStickyAssignor) assignor).memberData(subscription).generation.isPresent()); + + int generation = 10; + assignor.onAssignment(null, new ConsumerGroupMetadata("dummy-group-id", generation, "dummy-member-id", Optional.empty())); + + subscription = new Subscription(topics(topic), assignor.subscriptionUserData(new HashSet<>(topics(topic)))); + assertTrue(((CooperativeStickyAssignor) assignor).memberData(subscription).generation.isPresent()); + assertEquals(((CooperativeStickyAssignor) assignor).memberData(subscription).generation.get(), generation); + } + + @Test + public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithEqualPartitionsPerConsumer() { + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 3); + + subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1)))); + subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 2)))); + subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList())); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2)); + // In the cooperative assignor, topic-0 has to be considered "owned" and so it cant be assigned until both have "revoked" it + assertTrue(assignment.get(consumer3).isEmpty()); + + verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); + assertTrue(isFullyBalanced(assignment)); + } + + @Test + public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithUnequalPartitionsPerConsumer() { + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 4); + + subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1)))); + subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 2)))); + subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList())); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic, 2), tp(topic, 3)), assignment.get(consumer2)); + // In the cooperative assignor, topic-0 has to be considered "owned" and so it cant be assigned until both have "revoked" it + assertTrue(assignment.get(consumer3).isEmpty()); + + verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); + assertTrue(isFullyBalanced(assignment)); + } + /** * The cooperative assignor must do some additional work and verification of some assignments relative to the eager * assignor, since it may or may not need to trigger a second follow-up rebalance. diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java index 684a421be1807..7ea6a08660f34 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java @@ -40,6 +40,8 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import static java.util.Collections.emptyList; + public class StickyAssignorTest extends AbstractStickyAssignorTest { @Override @@ -53,6 +55,48 @@ public Subscription buildSubscription(List topics, List serializeTopicPartitionAssignment(new MemberData(partitions, Optional.of(DEFAULT_GENERATION)))); } + @Override + public Subscription buildSubscriptionWithGeneration(List topics, List partitions, int generation) { + return new Subscription(topics, + serializeTopicPartitionAssignment(new MemberData(partitions, Optional.of(generation)))); + } + + @Test + public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithEqualPartitionsPerConsumer() { + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 3); + + subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1)))); + subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 2)))); + subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList())); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2)); + assertEquals(partitions(tp(topic, 0)), assignment.get(consumer3)); + + verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); + assertTrue(isFullyBalanced(assignment)); + } + + @Test + public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithUnequalPartitionsPerConsumer() { + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 4); + + subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1)))); + subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 2)))); + subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList())); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic, 2), tp(topic, 3)), assignment.get(consumer2)); + assertEquals(partitions(tp(topic, 0)), assignment.get(consumer3)); + + verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); + assertTrue(isFullyBalanced(assignment)); + } + @ParameterizedTest(name = "testAssignmentWithMultipleGenerations1 with isAllSubscriptionsEqual: {0}") @ValueSource(booleans = {true, false}) public void testAssignmentWithMultipleGenerations1(boolean isAllSubscriptionsEqual) { @@ -228,11 +272,6 @@ public void testSchemaBackwardCompatibility() { assertTrue(isFullyBalanced(assignment)); } - private Subscription buildSubscriptionWithGeneration(List topics, List partitions, int generation) { - return new Subscription(topics, - serializeTopicPartitionAssignment(new MemberData(partitions, Optional.of(generation)))); - } - private static Subscription buildSubscriptionWithOldSchema(List topics, List partitions) { Struct struct = new Struct(StickyAssignor.STICKY_ASSIGNOR_USER_DATA_V0); List topicAssignments = new ArrayList<>(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java index a650cbbf39a67..6e07c2dffda50 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java @@ -34,11 +34,13 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static java.util.Collections.emptyList; public abstract class AbstractStickyAssignorTest { protected AbstractStickyAssignor assignor; @@ -57,6 +59,8 @@ public abstract class AbstractStickyAssignorTest { protected abstract Subscription buildSubscription(List topics, List partitions); + protected abstract Subscription buildSubscriptionWithGeneration(List topics, List partitions, int generation); + @BeforeEach public void setUp() { assignor = createAssignor(); @@ -71,7 +75,7 @@ public void setUp() { @Test public void testOneConsumerNoTopic() { Map partitionsPerTopic = new HashMap<>(); - subscriptions = Collections.singletonMap(consumerId, new Subscription(Collections.emptyList())); + subscriptions = Collections.singletonMap(consumerId, new Subscription(emptyList())); Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); assertEquals(Collections.singleton(consumerId), assignment.keySet()); @@ -248,7 +252,7 @@ public void testMaxQuotaConsumerMoreThanNumExpectedMaxCapacityMembers() { buildSubscription(subscribedTopics, partitions(tp(topic1, 0), tp(topic2, 0)))); subscriptions.put(consumer2, buildSubscription(subscribedTopics, partitions(tp(topic1, 1), tp(topic2, 1)))); - subscriptions.put(consumer3, buildSubscription(subscribedTopics, Collections.emptyList())); + subscriptions.put(consumer3, buildSubscription(subscribedTopics, emptyList())); Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); @@ -275,7 +279,7 @@ public void testAllConsumerAreUnderMinQuota() { buildSubscription(subscribedTopics, partitions(tp(topic1, 0)))); subscriptions.put(consumer2, buildSubscription(subscribedTopics, partitions(tp(topic1, 1)))); - subscriptions.put(consumer3, buildSubscription(subscribedTopics, Collections.emptyList())); + subscriptions.put(consumer3, buildSubscription(subscribedTopics, emptyList())); Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); @@ -300,7 +304,7 @@ public void testAddRemoveConsumerOneTopic() { assertTrue(isFullyBalanced(assignment)); subscriptions.put(consumer1, buildSubscription(topics(topic), assignment.get(consumer1))); - subscriptions.put(consumer2, buildSubscription(topics(topic), Collections.emptyList())); + subscriptions.put(consumer2, buildSubscription(topics(topic), emptyList())); assignment = assignor.assign(partitionsPerTopic, subscriptions); verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); @@ -338,8 +342,8 @@ public void testAddRemoveTwoConsumersTwoTopics() { // add 2 consumers subscriptions.put(consumer1, buildSubscription(allTopics, assignment.get(consumer1))); subscriptions.put(consumer2, buildSubscription(allTopics, assignment.get(consumer2))); - subscriptions.put(consumer3, buildSubscription(allTopics, Collections.emptyList())); - subscriptions.put(consumer4, buildSubscription(allTopics, Collections.emptyList())); + subscriptions.put(consumer3, buildSubscription(allTopics, emptyList())); + subscriptions.put(consumer4, buildSubscription(allTopics, emptyList())); assignment = assignor.assign(partitionsPerTopic, subscriptions); verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); @@ -449,7 +453,6 @@ public void testAddRemoveTopicTwoConsumers() { assertTrue(consumer2assignment.containsAll(consumer2Assignment3)); } - @Test public void testReassignmentAfterOneConsumerLeaves() { Map partitionsPerTopic = new HashMap<>(); @@ -790,6 +793,62 @@ public void testReassignmentWithRandomSubscriptionsAndChanges() { } } + @Test + public void testAllConsumersReachExpectedQuotaAndAreConsideredFilled() { + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 4); + + subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1)))); + subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 2)))); + subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList())); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(partitions(tp(topic, 0), tp(topic, 1)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2)); + assertEquals(partitions(tp(topic, 3)), assignment.get(consumer3)); + + verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); + assertTrue(isFullyBalanced(assignment)); + } + + @Test + public void testOwnedPartitionsAreInvalidatedForConsumerWithStaleGeneration() { + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 3); + partitionsPerTopic.put(topic2, 3); + + int currentGeneration = 10; + + subscriptions.put(consumer1, buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1)), currentGeneration)); + subscriptions.put(consumer2, buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1)), currentGeneration - 1)); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1))), new HashSet<>(assignment.get(consumer1))); + assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 0), tp(topic2, 2))), new HashSet<>(assignment.get(consumer2))); + + verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); + assertTrue(isFullyBalanced(assignment)); + } + + @Test + public void testOwnedPartitionsAreInvalidatedForConsumerWithNoGeneration() { + Map partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 3); + partitionsPerTopic.put(topic2, 3); + + int currentGeneration = 10; + + subscriptions.put(consumer1, buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1)), currentGeneration)); + subscriptions.put(consumer2, buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1)), DEFAULT_GENERATION)); + + Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); + assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1))), new HashSet<>(assignment.get(consumer1))); + assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 0), tp(topic2, 2))), new HashSet<>(assignment.get(consumer2))); + + verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); + assertTrue(isFullyBalanced(assignment)); + } + private String getTopicName(int i, int maxNum) { return getCanonicalName("t", i, maxNum); } From 96c13783b450d1794731acc944b26c38474944b9 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Tue, 6 Jul 2021 21:09:18 -0700 Subject: [PATCH 2/9] clean up the fix, tests, checkstyle --- checkstyle/suppressions.xml | 2 +- .../consumer/CooperativeStickyAssignor.java | 3 +- .../internals/AbstractStickyAssignor.java | 65 +++++++++---------- .../internals/AbstractStickyAssignorTest.java | 12 ++-- 4 files changed, 40 insertions(+), 42 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index cfa727234aee0..1a2add86575a5 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -42,7 +42,7 @@ files="AbstractResponse.java"/> + files="(KerberosLogin|RequestResponseTest|ConnectMetricsRegistry|KafkaConsumer|AbstractStickyAssignor).java"/> diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java index 3e9864a152532..a7ac430b533ba 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java @@ -76,10 +76,9 @@ public ByteBuffer subscriptionUserData(Set topics) { @Override protected MemberData memberData(Subscription subscription) { ByteBuffer buffer = subscription.userData(); - buffer.rewind(); Optional encodedGeneration; - if (buffer.hasRemaining()) { + if (buffer != null && buffer.rewind().hasRemaining()) { encodedGeneration = Optional.of(buffer.getInt()); } else { encodedGeneration = Optional.empty(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java index 074ad64fc174b..5c9ef1c5e634f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; +import java.util.Queue; import java.util.Set; import java.util.TreeSet; import java.util.stream.Collectors; @@ -152,7 +153,7 @@ private boolean allSubscriptionsEqual(Set allTopics, } else { String otherConsumer = allPreviousPartitionsToOwner.get(tp); log.warn("Found multiple consumers {} and {} claiming the same TopicPartition {} in the " - + "same generation, this will be invalidated and removed from their previous assignment.", + + "same generation, this will be invalidated and removed from their previous assignment.", consumer, otherConsumer, tp); consumerToOwnedPartitions.get(otherConsumer).remove(tp); partitionsWithMultiplePreviousOwners.add(tp); @@ -193,9 +194,8 @@ private Map> constrainedAssign(Map Set allRevokedPartitions = new HashSet<>(); // the consumers not yet at expected capacity - Set unfilledMembers = new HashSet<>(); - // the consumers with exactly minQuota, who may be assigned more until we reach the expected number with maxQuota - Set unfilledMembersWithMinQuota = new HashSet<>(); + List unfilledMembers = new LinkedList<>(); + Queue potentiallyUnfilledMembersAtMinQuota = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum); @@ -241,8 +241,7 @@ private Map> constrainedAssign(Map // with more than the minQuota partitions, so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions numMembersAssignedOverMinQuota++; if (numMembersAssignedOverMinQuota == expectedNumMembersAssignedOverMinQuota) { - unfilledMembers.removeAll(unfilledMembersWithMinQuota); - unfilledMembersWithMinQuota.clear(); + potentiallyUnfilledMembersAtMinQuota.clear(); } List maxQuotaPartitions = ownedPartitions.subList(0, maxQuota); consumerAssignment.addAll(maxQuotaPartitions); @@ -257,10 +256,9 @@ private Map> constrainedAssign(Map allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size())); // this consumer is potential maxQuota candidate since we're still under the number of expected members // with more than the minQuota partitions. Note, if the number of expected members with more than - // the minQuota partitions is 0, it means minQuota == maxQuota, and these members are definitely filled + // the minQuota partitions is 0, it means minQuota == maxQuota, and there are no potentially unfilled if (numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) { - unfilledMembers.add(consumer); - unfilledMembersWithMinQuota.add(consumer); + potentiallyUnfilledMembersAtMinQuota.add(consumer); } } } @@ -272,34 +270,28 @@ private Map> constrainedAssign(Map "current assignment: {}", unfilledMembers, unassignedPartitions, assignment); } + Collections.sort(unfilledMembers); + Iterator unfilledConsumerIter = unfilledMembers.iterator(); - // flag to inform that we need to remove all members with minQuota partitions from the unfilledMembers, after - // just reaching the expected number of members allowed to go up to maxQuota - boolean filterMinQuotaMembers = false; // Round-Robin filling remaining members up to the expected numbers of maxQuota, otherwise, to minQuota for (TopicPartition unassignedPartition : unassignedPartitions) { - if (!unfilledConsumerIter.hasNext()) { - if (unfilledMembers.isEmpty()) { + String consumer; + if (unfilledConsumerIter.hasNext()) { + consumer = unfilledConsumerIter.next(); + } else { + if (unfilledMembers.isEmpty() && potentiallyUnfilledMembersAtMinQuota.isEmpty()) { // Should not enter here since we have calculated the exact number to assign to each consumer. // This indicates issues in the assignment algorithm int currentPartitionIndex = unassignedPartitions.indexOf(unassignedPartition); log.error("No more unfilled consumers to be assigned. The remaining unassigned partitions are: {}", unassignedPartitions.subList(currentPartitionIndex, unassignedPartitions.size())); throw new IllegalStateException("No more unfilled consumers to be assigned."); + } else if (unfilledMembers.isEmpty()) { + consumer = potentiallyUnfilledMembersAtMinQuota.poll(); + } else { + unfilledConsumerIter = unfilledMembers.iterator(); + consumer = unfilledConsumerIter.next(); } - if (filterMinQuotaMembers) { - unfilledMembers.removeAll(unfilledMembersWithMinQuota); - unfilledMembersWithMinQuota.clear(); - filterMinQuotaMembers = false; - } - unfilledConsumerIter = unfilledMembers.iterator(); - } - - String consumer = unfilledConsumerIter.next(); - if (filterMinQuotaMembers && unfilledMembersWithMinQuota.contains(consumer)) { - unfilledConsumerIter.remove(); - unfilledMembersWithMinQuota.remove(consumer); - continue; } List consumerAssignment = assignment.get(consumer); @@ -313,14 +305,21 @@ private Map> constrainedAssign(Map int currentAssignedCount = consumerAssignment.size(); int expectedAssignedCount = numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota ? maxQuota : minQuota; - if (currentAssignedCount == expectedAssignedCount) { - if (currentAssignedCount == maxQuota) { - numMembersAssignedOverMinQuota++; - if (numMembersAssignedOverMinQuota == expectedNumMembersAssignedOverMinQuota) { - filterMinQuotaMembers = true; + if (currentAssignedCount == minQuota) { + unfilledConsumerIter.remove(); + potentiallyUnfilledMembersAtMinQuota.add(consumer); + } else if (currentAssignedCount == maxQuota) { + numMembersAssignedOverMinQuota++; + if (numMembersAssignedOverMinQuota == expectedNumMembersAssignedOverMinQuota) { + // We only start to iterate over the "potentially unfilled" members at minQuota after we've filled + // all members up to at least minQuota, so once the last minQuota member reaches maxQuota, we + // should be done. But in case of some algorithmic error, just log a warning and continue to + // assign any remaining partitions within the assignment constraints + if (unassignedPartitions.indexOf(unassignedPartition) != unassignedPartitions.size() - 1) { + log.warn("Filled the last member up to maxQuota but still had partitions remaining to assign, " + + "will continue but this indicates a bug in the assignment."); } } - unfilledConsumerIter.remove(); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java index 6e07c2dffda50..35a0c3b4f8cb6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java @@ -230,8 +230,8 @@ public void testConsumerOwningMinQuotaExpectedMaxQuota() { buildSubscription(subscribedTopics, partitions(tp(topic1, 1), tp(topic2, 2)))); Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); - assertEquals(partitions(tp(topic1, 0), tp(topic2, 1), tp(topic2, 0)), assignment.get(consumer1)); - assertEquals(partitions(tp(topic1, 1), tp(topic2, 2)), assignment.get(consumer2)); + assertEquals(partitions(tp(topic1, 0), tp(topic2, 1)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic1, 1), tp(topic2, 2), tp(topic2, 0)), assignment.get(consumer2)); verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); assertTrue(isFullyBalanced(assignment)); @@ -284,9 +284,9 @@ public void testAllConsumerAreUnderMinQuota() { Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); - assertEquals(partitions(tp(topic1, 0), tp(topic2, 0)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic1, 0), tp(topic2, 2)), assignment.get(consumer1)); assertEquals(partitions(tp(topic1, 1), tp(topic2, 1)), assignment.get(consumer2)); - assertEquals(partitions(tp(topic2, 2)), assignment.get(consumer3)); + assertEquals(partitions(tp(topic2, 0)), assignment.get(consumer3)); assertTrue(isFullyBalanced(assignment)); } @@ -359,8 +359,8 @@ public void testAddRemoveTwoConsumersTwoTopics() { subscriptions.put(consumer3, buildSubscription(allTopics, assignment.get(consumer3))); subscriptions.put(consumer4, buildSubscription(allTopics, assignment.get(consumer4))); assignment = assignor.assign(partitionsPerTopic, subscriptions); - assertEquals(partitions(tp(topic2, 1), tp(topic2, 3), tp(topic1, 0), tp(topic1, 2)), assignment.get(consumer3)); - assertEquals(partitions(tp(topic2, 2), tp(topic1, 1), tp(topic2, 0)), assignment.get(consumer4)); + assertEquals(partitions(tp(topic2, 1), tp(topic2, 3), tp(topic1, 0), tp(topic2, 0)), assignment.get(consumer3)); + assertEquals(partitions(tp(topic2, 2), tp(topic1, 1), tp(topic1, 2)), assignment.get(consumer4)); verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); assertTrue(isFullyBalanced(assignment)); From e56913cdd7afb504fbfdeb6706ee0161b73dbf3e Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Tue, 6 Jul 2021 21:24:03 -0700 Subject: [PATCH 3/9] bump up timeout --- .../clients/consumer/internals/AbstractStickyAssignorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java index 35a0c3b4f8cb6..bb316480384e1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java @@ -558,7 +558,7 @@ public void testLargeAssignmentAndGroupWithUniformSubscription() { assignor.assign(partitionsPerTopic, subscriptions); } - @Timeout(40) + @Timeout(60) @Test public void testLargeAssignmentAndGroupWithNonEqualSubscription() { // 1 million partitions! From 2bcc470be97b76aa08446d09ff0e8342a1dd2fa0 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Wed, 7 Jul 2021 16:22:52 -0700 Subject: [PATCH 4/9] encode generation with schema, add test --- .../consumer/CooperativeStickyAssignor.java | 39 +++++++++++++------ .../CooperativeStickyAssignorTest.java | 19 +++++++-- 2 files changed, 42 insertions(+), 16 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java index a7ac430b533ba..483903ad7be5a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java @@ -17,7 +17,9 @@ package org.apache.kafka.clients.consumer; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -26,6 +28,11 @@ import java.util.Set; import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.protocol.types.Type; +import org.apache.kafka.common.utils.CollectionUtils; /** * A cooperative version of the {@link AbstractStickyAssignor AbstractStickyAssignor}. This follows the same (sticky) @@ -44,6 +51,11 @@ */ public class CooperativeStickyAssignor extends AbstractStickyAssignor { + // these schemas are used for preserving useful metadata for the assignment, such as the last stable generation + private static final String GENERATION_KEY_NAME = "generation"; + private static final Schema COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0 = new Schema( + new Field(GENERATION_KEY_NAME, Type.INT32)); + private int generation = DEFAULT_GENERATION; // consumer group generation @Override @@ -63,25 +75,28 @@ public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) @Override public ByteBuffer subscriptionUserData(Set topics) { - if (generation == DEFAULT_GENERATION) { - return ByteBuffer.allocate(0); - } else { - ByteBuffer buffer = ByteBuffer.allocate(4); - buffer.putInt(generation); - buffer.flip(); - return buffer; - } + Struct struct = new Struct(COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0); + + struct.set(GENERATION_KEY_NAME, generation); + ByteBuffer buffer = ByteBuffer.allocate(COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.sizeOf(struct)); + COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.write(buffer, struct); + buffer.flip(); + return buffer; } @Override protected MemberData memberData(Subscription subscription) { ByteBuffer buffer = subscription.userData(); - Optional encodedGeneration; - if (buffer != null && buffer.rewind().hasRemaining()) { - encodedGeneration = Optional.of(buffer.getInt()); - } else { + if (buffer == null) { encodedGeneration = Optional.empty(); + } else { + try { + Struct struct = COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.read(buffer); + encodedGeneration = Optional.of(struct.getInt(GENERATION_KEY_NAME)); + } catch (Exception e) { + encodedGeneration = Optional.of(DEFAULT_GENERATION); + } } return new MemberData(subscription.ownedPartitions(), encodedGeneration); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java index 795a6b69b4e07..3e1f3f0437e5a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java @@ -29,6 +29,8 @@ import java.util.Set; import org.junit.jupiter.api.Test; +import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -56,15 +58,24 @@ public Subscription buildSubscriptionWithGeneration(List topics, List(topics(topic)))); - // initially the generation defaults to NO_GENERATION, which gets encoded as Optional.empty() - assertFalse(((CooperativeStickyAssignor) assignor).memberData(subscription).generation.isPresent()); + Optional encodedGeneration = ((CooperativeStickyAssignor) assignor).memberData(subscription).generation; + assertTrue(encodedGeneration.isPresent()); + assertEquals(encodedGeneration.get(), DEFAULT_GENERATION); int generation = 10; assignor.onAssignment(null, new ConsumerGroupMetadata("dummy-group-id", generation, "dummy-member-id", Optional.empty())); subscription = new Subscription(topics(topic), assignor.subscriptionUserData(new HashSet<>(topics(topic)))); - assertTrue(((CooperativeStickyAssignor) assignor).memberData(subscription).generation.isPresent()); - assertEquals(((CooperativeStickyAssignor) assignor).memberData(subscription).generation.get(), generation); + encodedGeneration = ((CooperativeStickyAssignor) assignor).memberData(subscription).generation; + + assertTrue(encodedGeneration.isPresent()); + assertEquals(encodedGeneration.get(), generation); + } + + @Test + public void testDecodeGeneration() { + Subscription subscription = new Subscription(topics(topic)); + assertFalse(((CooperativeStickyAssignor) assignor).memberData(subscription).generation.isPresent()); } @Test From 15acfa141abe41715f8fed7597c8a5d3d85e3c2c Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Wed, 7 Jul 2021 16:24:38 -0700 Subject: [PATCH 5/9] checkstyle --- .../kafka/clients/consumer/CooperativeStickyAssignor.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java index 483903ad7be5a..5f0bb0c7f912e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java @@ -17,9 +17,7 @@ package org.apache.kafka.clients.consumer; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -32,7 +30,6 @@ import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Type; -import org.apache.kafka.common.utils.CollectionUtils; /** * A cooperative version of the {@link AbstractStickyAssignor AbstractStickyAssignor}. This follows the same (sticky) From 5f53140b4b78c7085fcc1b583ef6d74222ee1654 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Wed, 7 Jul 2021 16:27:25 -0700 Subject: [PATCH 6/9] review feedback --- .../consumer/internals/AbstractStickyAssignor.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java index 5c9ef1c5e634f..2d4aadd14744d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java @@ -96,7 +96,6 @@ private boolean allSubscriptionsEqual(Set allTopics, Map subscriptions, Map> consumerToOwnedPartitions, Set partitionsWithMultiplePreviousOwners) { - Set membersWithOldGeneration = new HashSet<>(); Set membersOfCurrentHighestGeneration = new HashSet<>(); boolean isAllSubscriptionsEqual = true; @@ -130,11 +129,9 @@ private boolean allSubscriptionsEqual(Set allTopics, // If the current member's generation is higher, all the previously owned partitions are invalid if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) { - membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration); - allPreviousPartitionsToOwner.clear(); partitionsWithMultiplePreviousOwners.clear(); - for (String droppedOutConsumer : membersWithOldGeneration) { + for (String droppedOutConsumer : membersOfCurrentHighestGeneration) { consumerToOwnedPartitions.get(droppedOutConsumer).clear(); } @@ -153,8 +150,8 @@ private boolean allSubscriptionsEqual(Set allTopics, } else { String otherConsumer = allPreviousPartitionsToOwner.get(tp); log.warn("Found multiple consumers {} and {} claiming the same TopicPartition {} in the " - + "same generation, this will be invalidated and removed from their previous assignment.", - consumer, otherConsumer, tp); + + "same generation {}, this will be invalidated and removed from their previous assignment.", + consumer, otherConsumer, tp, maxGeneration); consumerToOwnedPartitions.get(otherConsumer).remove(tp); partitionsWithMultiplePreviousOwners.add(tp); } @@ -221,7 +218,7 @@ private Map> constrainedAssign(Map for (TopicPartition doublyClaimedPartition : partitionsWithMultiplePreviousOwners) { if (ownedPartitions.contains(doublyClaimedPartition)) { - log.warn("Found partition {} still claimed as owned by consumer {}, despite being claimed by multiple" + log.warn("Found partition {} still claimed as owned by consumer {}, despite being claimed by multiple " + "consumers already in the same generation. Removing it from the ownedPartitions", doublyClaimedPartition, consumer); ownedPartitions.remove(doublyClaimedPartition); @@ -304,7 +301,6 @@ private Map> constrainedAssign(Map partitionsTransferringOwnership.put(unassignedPartition, consumer); int currentAssignedCount = consumerAssignment.size(); - int expectedAssignedCount = numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota ? maxQuota : minQuota; if (currentAssignedCount == minQuota) { unfilledConsumerIter.remove(); potentiallyUnfilledMembersAtMinQuota.add(consumer); From b32a644a950928730bd277a2fbf00b2a0b59dfda Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Wed, 7 Jul 2021 19:30:35 -0700 Subject: [PATCH 7/9] log final assignment at INFO --- .../consumer/internals/AbstractStickyAssignor.java | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java index 2d4aadd14744d..3664c08ac2213 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java @@ -178,13 +178,13 @@ private boolean allSubscriptionsEqual(Set allTopics, * @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions * @param partitionsWithMultiplePreviousOwners The partitions being claimed in the previous assignment of multiple consumers * - * @return Map from each member to the list of partitions assigned to them. + * @return Map from each member to the list of partitions assigned to them. */ private Map> constrainedAssign(Map partitionsPerTopic, Map> consumerToOwnedPartitions, Set partitionsWithMultiplePreviousOwners) { if (log.isDebugEnabled()) { - log.debug("performing constrained assign. partitionsPerTopic: {}, consumerToOwnedPartitions: {}", + log.debug("Performing constrained assign with partitionsPerTopic: {}, consumerToOwnedPartitions: {}.", partitionsPerTopic, consumerToOwnedPartitions); } @@ -344,9 +344,7 @@ private Map> constrainedAssign(Map } } - if (log.isDebugEnabled()) { - log.debug("Final assignment of partitions to consumers: \n{}", assignment); - } + log.info("Final assignment of partitions to consumers: \n{}", assignment); return assignment; } @@ -464,7 +462,6 @@ private Map> generalAssign(Map par // all partitions that needed to be assigned List unassignedPartitions = getUnassignedPartitions(sortedAllPartitions, assignedPartitions, topic2AllPotentialConsumers); - assignedPartitions = null; if (log.isDebugEnabled()) { log.debug("unassigned Partitions: {}", unassignedPartitions); @@ -482,9 +479,7 @@ private Map> generalAssign(Map par consumer2AllPotentialTopics, topic2AllPotentialConsumers, currentPartitionConsumer, revocationRequired, partitionsPerTopic, totalPartitionsCount); - if (log.isDebugEnabled()) { - log.debug("final assignment: {}", currentAssignment); - } + log.info("Final assignment of partitions to consumers: \n{}", currentAssignment); return currentAssignment; } From 3b66fa883e8436369326388c541a877eb274292f Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Mon, 12 Jul 2021 21:05:40 -0700 Subject: [PATCH 8/9] review feedback --- .../internals/AbstractStickyAssignor.java | 64 +++++++++---------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java index 3664c08ac2213..39d752e1f8613 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java @@ -149,7 +149,7 @@ private boolean allSubscriptionsEqual(Set allTopics, ownedPartitions.add(tp); } else { String otherConsumer = allPreviousPartitionsToOwner.get(tp); - log.warn("Found multiple consumers {} and {} claiming the same TopicPartition {} in the " + log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the " + "same generation {}, this will be invalidated and removed from their previous assignment.", consumer, otherConsumer, tp, maxGeneration); consumerToOwnedPartitions.get(otherConsumer).remove(tp); @@ -190,19 +190,19 @@ private Map> constrainedAssign(Map Set allRevokedPartitions = new HashSet<>(); - // the consumers not yet at expected capacity - List unfilledMembers = new LinkedList<>(); - Queue potentiallyUnfilledMembersAtMinQuota = new LinkedList<>(); + // the consumers which may still be assigned one or more partitions to reach expected capacity + List unfilledMembersWithUnderMinQuotaPartitions = new LinkedList<>(); + Queue unfilledMembersWithExactlyMinQuotaPartitions = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum); int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); - // the expected number of members with over minQuota assignment - int expectedNumMembersAssignedOverMinQuota = totalPartitionsCount % numberOfConsumers; - // the number of members with over minQuota partitions assigned - int numMembersAssignedOverMinQuota = 0; + // the expected number of members receiving more than minQuota partitions (zero when minQuota == maxQuota) + int expectedNumMembersWithOverMinQuotaPartitions = totalPartitionsCount % numberOfConsumers; + // the current number of members receiving more than minQuota partitions (zero when minQuota == maxQuota) + int currentNumMembersWithOverMinQuotaPartitions = 0; // initialize the assignment map with an empty array of size maxQuota for all members Map> assignment = new HashMap<>( @@ -218,7 +218,7 @@ private Map> constrainedAssign(Map for (TopicPartition doublyClaimedPartition : partitionsWithMultiplePreviousOwners) { if (ownedPartitions.contains(doublyClaimedPartition)) { - log.warn("Found partition {} still claimed as owned by consumer {}, despite being claimed by multiple " + log.error("Found partition {} still claimed as owned by consumer {}, despite being claimed by multiple " + "consumers already in the same generation. Removing it from the ownedPartitions", doublyClaimedPartition, consumer); ownedPartitions.remove(doublyClaimedPartition); @@ -232,13 +232,13 @@ private Map> constrainedAssign(Map consumerAssignment.addAll(ownedPartitions); assignedPartitions.addAll(ownedPartitions); } - unfilledMembers.add(consumer); - } else if (ownedPartitions.size() >= maxQuota && numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) { + unfilledMembersWithUnderMinQuotaPartitions.add(consumer); + } else if (ownedPartitions.size() >= maxQuota && currentNumMembersWithOverMinQuotaPartitions < expectedNumMembersWithOverMinQuotaPartitions) { // consumer owned the "maxQuota" of partitions or more, and we're still under the number of expected members // with more than the minQuota partitions, so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions - numMembersAssignedOverMinQuota++; - if (numMembersAssignedOverMinQuota == expectedNumMembersAssignedOverMinQuota) { - potentiallyUnfilledMembersAtMinQuota.clear(); + currentNumMembersWithOverMinQuotaPartitions++; + if (currentNumMembersWithOverMinQuotaPartitions == expectedNumMembersWithOverMinQuotaPartitions) { + unfilledMembersWithExactlyMinQuotaPartitions.clear(); } List maxQuotaPartitions = ownedPartitions.subList(0, maxQuota); consumerAssignment.addAll(maxQuotaPartitions); @@ -254,8 +254,8 @@ private Map> constrainedAssign(Map // this consumer is potential maxQuota candidate since we're still under the number of expected members // with more than the minQuota partitions. Note, if the number of expected members with more than // the minQuota partitions is 0, it means minQuota == maxQuota, and there are no potentially unfilled - if (numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) { - potentiallyUnfilledMembersAtMinQuota.add(consumer); + if (currentNumMembersWithOverMinQuotaPartitions < expectedNumMembersWithOverMinQuotaPartitions) { + unfilledMembersWithExactlyMinQuotaPartitions.add(consumer); } } } @@ -264,29 +264,29 @@ private Map> constrainedAssign(Map if (log.isDebugEnabled()) { log.debug("After reassigning previously owned partitions, unfilled members: {}, unassigned partitions: {}, " + - "current assignment: {}", unfilledMembers, unassignedPartitions, assignment); + "current assignment: {}", unfilledMembersWithUnderMinQuotaPartitions, unassignedPartitions, assignment); } - Collections.sort(unfilledMembers); + Collections.sort(unfilledMembersWithUnderMinQuotaPartitions); - Iterator unfilledConsumerIter = unfilledMembers.iterator(); + Iterator unfilledConsumerIter = unfilledMembersWithUnderMinQuotaPartitions.iterator(); // Round-Robin filling remaining members up to the expected numbers of maxQuota, otherwise, to minQuota for (TopicPartition unassignedPartition : unassignedPartitions) { String consumer; if (unfilledConsumerIter.hasNext()) { consumer = unfilledConsumerIter.next(); } else { - if (unfilledMembers.isEmpty() && potentiallyUnfilledMembersAtMinQuota.isEmpty()) { + if (unfilledMembersWithUnderMinQuotaPartitions.isEmpty() && unfilledMembersWithExactlyMinQuotaPartitions.isEmpty()) { // Should not enter here since we have calculated the exact number to assign to each consumer. // This indicates issues in the assignment algorithm int currentPartitionIndex = unassignedPartitions.indexOf(unassignedPartition); log.error("No more unfilled consumers to be assigned. The remaining unassigned partitions are: {}", unassignedPartitions.subList(currentPartitionIndex, unassignedPartitions.size())); throw new IllegalStateException("No more unfilled consumers to be assigned."); - } else if (unfilledMembers.isEmpty()) { - consumer = potentiallyUnfilledMembersAtMinQuota.poll(); + } else if (unfilledMembersWithUnderMinQuotaPartitions.isEmpty()) { + consumer = unfilledMembersWithExactlyMinQuotaPartitions.poll(); } else { - unfilledConsumerIter = unfilledMembers.iterator(); + unfilledConsumerIter = unfilledMembersWithUnderMinQuotaPartitions.iterator(); consumer = unfilledConsumerIter.next(); } } @@ -303,37 +303,37 @@ private Map> constrainedAssign(Map int currentAssignedCount = consumerAssignment.size(); if (currentAssignedCount == minQuota) { unfilledConsumerIter.remove(); - potentiallyUnfilledMembersAtMinQuota.add(consumer); + unfilledMembersWithExactlyMinQuotaPartitions.add(consumer); } else if (currentAssignedCount == maxQuota) { - numMembersAssignedOverMinQuota++; - if (numMembersAssignedOverMinQuota == expectedNumMembersAssignedOverMinQuota) { + currentNumMembersWithOverMinQuotaPartitions++; + if (currentNumMembersWithOverMinQuotaPartitions == expectedNumMembersWithOverMinQuotaPartitions) { // We only start to iterate over the "potentially unfilled" members at minQuota after we've filled // all members up to at least minQuota, so once the last minQuota member reaches maxQuota, we // should be done. But in case of some algorithmic error, just log a warning and continue to // assign any remaining partitions within the assignment constraints if (unassignedPartitions.indexOf(unassignedPartition) != unassignedPartitions.size() - 1) { - log.warn("Filled the last member up to maxQuota but still had partitions remaining to assign, " + log.error("Filled the last member up to maxQuota but still had partitions remaining to assign, " + "will continue but this indicates a bug in the assignment."); } } } } - if (!unfilledMembers.isEmpty()) { + if (!unfilledMembersWithUnderMinQuotaPartitions.isEmpty()) { // we expected all the remaining unfilled members have minQuota partitions and we're already at the expected number // of members with more than the minQuota partitions. Otherwise, there must be error here. - if (numMembersAssignedOverMinQuota != expectedNumMembersAssignedOverMinQuota) { + if (currentNumMembersWithOverMinQuotaPartitions != expectedNumMembersWithOverMinQuotaPartitions) { log.error("Current number of members with more than the minQuota partitions: {}, is less than the expected number " + "of members with more than the minQuota partitions: {}, and no more partitions to be assigned to the remaining unfilled consumers: {}", - numMembersAssignedOverMinQuota, expectedNumMembersAssignedOverMinQuota, unfilledMembers); + currentNumMembersWithOverMinQuotaPartitions, expectedNumMembersWithOverMinQuotaPartitions, unfilledMembersWithUnderMinQuotaPartitions); throw new IllegalStateException("We haven't reached the expected number of members with " + "more than the minQuota partitions, but no more partitions to be assigned"); } else { - for (String unfilledMember : unfilledMembers) { + for (String unfilledMember : unfilledMembersWithUnderMinQuotaPartitions) { int assignedPartitionsCount = assignment.get(unfilledMember).size(); if (assignedPartitionsCount != minQuota) { log.error("Consumer: [{}] should have {} partitions, but got {} partitions, and no more partitions " + - "to be assigned. The remaining unfilled consumers are: {}", unfilledMember, minQuota, assignedPartitionsCount, unfilledMembers); + "to be assigned. The remaining unfilled consumers are: {}", unfilledMember, minQuota, assignedPartitionsCount, unfilledMembersWithUnderMinQuotaPartitions); throw new IllegalStateException(String.format("Consumer: [%s] doesn't reach minQuota partitions, " + "and no more partitions to be assigned", unfilledMember)); } else { From e269e26c50d6e0aa61e9931c9cf3e160754e56a6 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Tue, 13 Jul 2021 15:56:08 -0700 Subject: [PATCH 9/9] sort the unfilledMembersWithExactlyMinQuotaPartitions for deterministic results --- .../internals/AbstractStickyAssignor.java | 4 +-- .../CooperativeStickyAssignorTest.java | 4 +-- .../clients/consumer/StickyAssignorTest.java | 4 +-- .../internals/AbstractStickyAssignorTest.java | 25 +++++++++---------- 4 files changed, 18 insertions(+), 19 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java index 39d752e1f8613..9534862b7f61c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java @@ -28,7 +28,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; -import java.util.Queue; import java.util.Set; import java.util.TreeSet; import java.util.stream.Collectors; @@ -192,7 +191,7 @@ private Map> constrainedAssign(Map // the consumers which may still be assigned one or more partitions to reach expected capacity List unfilledMembersWithUnderMinQuotaPartitions = new LinkedList<>(); - Queue unfilledMembersWithExactlyMinQuotaPartitions = new LinkedList<>(); + LinkedList unfilledMembersWithExactlyMinQuotaPartitions = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum); @@ -268,6 +267,7 @@ private Map> constrainedAssign(Map } Collections.sort(unfilledMembersWithUnderMinQuotaPartitions); + Collections.sort(unfilledMembersWithExactlyMinQuotaPartitions); Iterator unfilledConsumerIter = unfilledMembersWithUnderMinQuotaPartitions.iterator(); // Round-Robin filling remaining members up to the expected numbers of maxQuota, otherwise, to minQuota diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java index 3e1f3f0437e5a..f94aa23180049 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java @@ -107,8 +107,8 @@ public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleCo subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList())); Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); - assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1)); - assertEquals(partitions(tp(topic, 2), tp(topic, 3)), assignment.get(consumer2)); + assertEquals(partitions(tp(topic, 1), tp(topic, 3)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2)); // In the cooperative assignor, topic-0 has to be considered "owned" and so it cant be assigned until both have "revoked" it assertTrue(assignment.get(consumer3).isEmpty()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java index 7ea6a08660f34..bb03de25a81a6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java @@ -89,8 +89,8 @@ public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleCo subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList())); Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); - assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1)); - assertEquals(partitions(tp(topic, 2), tp(topic, 3)), assignment.get(consumer2)); + assertEquals(partitions(tp(topic, 1), tp(topic, 3)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2)); assertEquals(partitions(tp(topic, 0)), assignment.get(consumer3)); verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java index bb316480384e1..789e6f753b98f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java @@ -40,7 +40,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import static java.util.Collections.emptyList; public abstract class AbstractStickyAssignorTest { protected AbstractStickyAssignor assignor; @@ -75,7 +74,7 @@ public void setUp() { @Test public void testOneConsumerNoTopic() { Map partitionsPerTopic = new HashMap<>(); - subscriptions = Collections.singletonMap(consumerId, new Subscription(emptyList())); + subscriptions = Collections.singletonMap(consumerId, new Subscription(Collections.emptyList())); Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); assertEquals(Collections.singleton(consumerId), assignment.keySet()); @@ -230,8 +229,8 @@ public void testConsumerOwningMinQuotaExpectedMaxQuota() { buildSubscription(subscribedTopics, partitions(tp(topic1, 1), tp(topic2, 2)))); Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); - assertEquals(partitions(tp(topic1, 0), tp(topic2, 1)), assignment.get(consumer1)); - assertEquals(partitions(tp(topic1, 1), tp(topic2, 2), tp(topic2, 0)), assignment.get(consumer2)); + assertEquals(partitions(tp(topic1, 0), tp(topic2, 1), tp(topic2, 0)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic1, 1), tp(topic2, 2)), assignment.get(consumer2)); verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); assertTrue(isFullyBalanced(assignment)); @@ -252,7 +251,7 @@ public void testMaxQuotaConsumerMoreThanNumExpectedMaxCapacityMembers() { buildSubscription(subscribedTopics, partitions(tp(topic1, 0), tp(topic2, 0)))); subscriptions.put(consumer2, buildSubscription(subscribedTopics, partitions(tp(topic1, 1), tp(topic2, 1)))); - subscriptions.put(consumer3, buildSubscription(subscribedTopics, emptyList())); + subscriptions.put(consumer3, buildSubscription(subscribedTopics, Collections.emptyList())); Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); @@ -268,7 +267,7 @@ public void testMaxQuotaConsumerMoreThanNumExpectedMaxCapacityMembers() { * This unit test is testing all consumers owned less than minQuota partitions situation */ @Test - public void testAllConsumerAreUnderMinQuota() { + public void testAllConsumersAreUnderMinQuota() { Map partitionsPerTopic = new HashMap<>(); partitionsPerTopic.put(topic1, 2); partitionsPerTopic.put(topic2, 3); @@ -279,13 +278,13 @@ public void testAllConsumerAreUnderMinQuota() { buildSubscription(subscribedTopics, partitions(tp(topic1, 0)))); subscriptions.put(consumer2, buildSubscription(subscribedTopics, partitions(tp(topic1, 1)))); - subscriptions.put(consumer3, buildSubscription(subscribedTopics, emptyList())); + subscriptions.put(consumer3, buildSubscription(subscribedTopics, Collections.emptyList())); Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); - assertEquals(partitions(tp(topic1, 0), tp(topic2, 2)), assignment.get(consumer1)); - assertEquals(partitions(tp(topic1, 1), tp(topic2, 1)), assignment.get(consumer2)); + assertEquals(partitions(tp(topic1, 0), tp(topic2, 1)), assignment.get(consumer1)); + assertEquals(partitions(tp(topic1, 1), tp(topic2, 2)), assignment.get(consumer2)); assertEquals(partitions(tp(topic2, 0)), assignment.get(consumer3)); assertTrue(isFullyBalanced(assignment)); @@ -304,7 +303,7 @@ public void testAddRemoveConsumerOneTopic() { assertTrue(isFullyBalanced(assignment)); subscriptions.put(consumer1, buildSubscription(topics(topic), assignment.get(consumer1))); - subscriptions.put(consumer2, buildSubscription(topics(topic), emptyList())); + subscriptions.put(consumer2, buildSubscription(topics(topic), Collections.emptyList())); assignment = assignor.assign(partitionsPerTopic, subscriptions); verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); @@ -342,8 +341,8 @@ public void testAddRemoveTwoConsumersTwoTopics() { // add 2 consumers subscriptions.put(consumer1, buildSubscription(allTopics, assignment.get(consumer1))); subscriptions.put(consumer2, buildSubscription(allTopics, assignment.get(consumer2))); - subscriptions.put(consumer3, buildSubscription(allTopics, emptyList())); - subscriptions.put(consumer4, buildSubscription(allTopics, emptyList())); + subscriptions.put(consumer3, buildSubscription(allTopics, Collections.emptyList())); + subscriptions.put(consumer4, buildSubscription(allTopics, Collections.emptyList())); assignment = assignor.assign(partitionsPerTopic, subscriptions); verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic); @@ -800,7 +799,7 @@ public void testAllConsumersReachExpectedQuotaAndAreConsideredFilled() { subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1)))); subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 2)))); - subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList())); + subscriptions.put(consumer3, buildSubscription(topics(topic), Collections.emptyList())); Map> assignment = assignor.assign(partitionsPerTopic, subscriptions); assertEquals(partitions(tp(topic, 0), tp(topic, 1)), assignment.get(consumer1));