diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index cfa727234aee0..1a2add86575a5 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -42,7 +42,7 @@
files="AbstractResponse.java"/>
+ files="(KerberosLogin|RequestResponseTest|ConnectMetricsRegistry|KafkaConsumer|AbstractStickyAssignor).java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
index c7c0679575a9b..5f0bb0c7f912e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -25,6 +26,10 @@
import java.util.Set;
import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
/**
* A cooperative version of the {@link AbstractStickyAssignor AbstractStickyAssignor}. This follows the same (sticky)
@@ -43,6 +48,13 @@
*/
public class CooperativeStickyAssignor extends AbstractStickyAssignor {
+ // these schemas are used for preserving useful metadata for the assignment, such as the last stable generation
+ private static final String GENERATION_KEY_NAME = "generation";
+ private static final Schema COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0 = new Schema(
+ new Field(GENERATION_KEY_NAME, Type.INT32));
+
+ private int generation = DEFAULT_GENERATION; // consumer group generation
+
@Override
public String name() {
return "cooperative-sticky";
@@ -53,9 +65,37 @@ public List supportedProtocols() {
return Arrays.asList(RebalanceProtocol.COOPERATIVE, RebalanceProtocol.EAGER);
}
+ @Override
+ public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
+ this.generation = metadata.generationId();
+ }
+
+ @Override
+ public ByteBuffer subscriptionUserData(Set topics) {
+ Struct struct = new Struct(COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0);
+
+ struct.set(GENERATION_KEY_NAME, generation);
+ ByteBuffer buffer = ByteBuffer.allocate(COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.sizeOf(struct));
+ COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.write(buffer, struct);
+ buffer.flip();
+ return buffer;
+ }
+
@Override
protected MemberData memberData(Subscription subscription) {
- return new MemberData(subscription.ownedPartitions(), Optional.empty());
+ ByteBuffer buffer = subscription.userData();
+ Optional encodedGeneration;
+ if (buffer == null) {
+ encodedGeneration = Optional.empty();
+ } else {
+ try {
+ Struct struct = COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.read(buffer);
+ encodedGeneration = Optional.of(struct.getInt(GENERATION_KEY_NAME));
+ } catch (Exception e) {
+ encodedGeneration = Optional.of(DEFAULT_GENERATION);
+ }
+ }
+ return new MemberData(subscription.ownedPartitions(), encodedGeneration);
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
index e111aa62599e4..9534862b7f61c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
@@ -71,14 +71,16 @@ public MemberData(List partitions, Optional generation)
public Map> assign(Map partitionsPerTopic,
Map subscriptions) {
Map> consumerToOwnedPartitions = new HashMap<>();
- if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, consumerToOwnedPartitions)) {
+ Set partitionsWithMultiplePreviousOwners = new HashSet<>();
+ if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, consumerToOwnedPartitions, partitionsWithMultiplePreviousOwners)) {
log.debug("Detected that all consumers were subscribed to same set of topics, invoking the "
+ "optimized assignment algorithm");
partitionsTransferringOwnership = new HashMap<>();
- return constrainedAssign(partitionsPerTopic, consumerToOwnedPartitions);
+ return constrainedAssign(partitionsPerTopic, consumerToOwnedPartitions, partitionsWithMultiplePreviousOwners);
} else {
log.debug("Detected that not all consumers were subscribed to same set of topics, falling back to the "
+ "general case assignment algorithm");
+ // we must set this to null for the general case so the cooperative assignor knows to compute it from scratch
partitionsTransferringOwnership = null;
return generalAssign(partitionsPerTopic, subscriptions, consumerToOwnedPartitions);
}
@@ -86,17 +88,22 @@ public Map> assign(Map partitionsP
/**
* Returns true iff all consumers have an identical subscription. Also fills out the passed in
- * {@code consumerToOwnedPartitions} with each consumer's previously owned and still-subscribed partitions
+ * {@code consumerToOwnedPartitions} with each consumer's previously owned and still-subscribed partitions,
+ * and the {@code partitionsWithMultiplePreviousOwners} with any partitions claimed by multiple previous owners
*/
private boolean allSubscriptionsEqual(Set allTopics,
Map subscriptions,
- Map> consumerToOwnedPartitions) {
- Set membersWithOldGeneration = new HashSet<>();
+ Map> consumerToOwnedPartitions,
+ Set partitionsWithMultiplePreviousOwners) {
Set membersOfCurrentHighestGeneration = new HashSet<>();
boolean isAllSubscriptionsEqual = true;
Set subscribedTopics = new HashSet<>();
+ // keep track of all previously owned partitions so we can invalidate them if invalid input is
+ // detected, eg two consumers somehow claiming the same partition in the same/current generation
+ Map allPreviousPartitionsToOwner = new HashMap<>();
+
for (Map.Entry subscriptionEntry : subscriptions.entrySet()) {
String consumer = subscriptionEntry.getKey();
Subscription subscription = subscriptionEntry.getValue();
@@ -121,7 +128,12 @@ private boolean allSubscriptionsEqual(Set allTopics,
// If the current member's generation is higher, all the previously owned partitions are invalid
if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
- membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
+ allPreviousPartitionsToOwner.clear();
+ partitionsWithMultiplePreviousOwners.clear();
+ for (String droppedOutConsumer : membersOfCurrentHighestGeneration) {
+ consumerToOwnedPartitions.get(droppedOutConsumer).clear();
+ }
+
membersOfCurrentHighestGeneration.clear();
maxGeneration = memberData.generation.get();
}
@@ -130,19 +142,26 @@ private boolean allSubscriptionsEqual(Set allTopics,
for (final TopicPartition tp : memberData.partitions) {
// filter out any topics that no longer exist or aren't part of the current subscription
if (allTopics.contains(tp.topic())) {
- ownedPartitions.add(tp);
+
+ if (!allPreviousPartitionsToOwner.containsKey(tp)) {
+ allPreviousPartitionsToOwner.put(tp, consumer);
+ ownedPartitions.add(tp);
+ } else {
+ String otherConsumer = allPreviousPartitionsToOwner.get(tp);
+ log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the "
+ + "same generation {}, this will be invalidated and removed from their previous assignment.",
+ consumer, otherConsumer, tp, maxGeneration);
+ consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+ partitionsWithMultiplePreviousOwners.add(tp);
+ }
}
}
}
}
- for (String consumer : membersWithOldGeneration) {
- consumerToOwnedPartitions.get(consumer).clear();
- }
return isAllSubscriptionsEqual;
}
-
/**
* This constrainedAssign optimizes the assignment algorithm when all consumers were subscribed to same set of topics.
* The method includes the following steps:
@@ -154,57 +173,72 @@ private boolean allSubscriptionsEqual(Set allTopics,
* we're still under the number of expected max capacity members
* 2. Fill remaining members up to the expected numbers of maxQuota partitions, otherwise, to minQuota partitions
*
- * @param partitionsPerTopic The number of partitions for each subscribed topic
- * @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions
+ * @param partitionsPerTopic The number of partitions for each subscribed topic
+ * @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions
+ * @param partitionsWithMultiplePreviousOwners The partitions being claimed in the previous assignment of multiple consumers
*
- * @return Map from each member to the list of partitions assigned to them.
+ * @return Map from each member to the list of partitions assigned to them.
*/
private Map> constrainedAssign(Map partitionsPerTopic,
- Map> consumerToOwnedPartitions) {
+ Map> consumerToOwnedPartitions,
+ Set partitionsWithMultiplePreviousOwners) {
if (log.isDebugEnabled()) {
- log.debug("performing constrained assign. partitionsPerTopic: {}, consumerToOwnedPartitions: {}",
+ log.debug("Performing constrained assign with partitionsPerTopic: {}, consumerToOwnedPartitions: {}.",
partitionsPerTopic, consumerToOwnedPartitions);
}
Set allRevokedPartitions = new HashSet<>();
- // the consumers not yet at expected capacity
- List unfilledMembers = new LinkedList<>();
+ // the consumers which may still be assigned one or more partitions to reach expected capacity
+ List unfilledMembersWithUnderMinQuotaPartitions = new LinkedList<>();
+ LinkedList unfilledMembersWithExactlyMinQuotaPartitions = new LinkedList<>();
int numberOfConsumers = consumerToOwnedPartitions.size();
int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum);
int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers);
int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers);
- // the expected number of members with over minQuota assignment
- int expectedNumMembersAssignedOverMinQuota = totalPartitionsCount % numberOfConsumers;
- // the number of members with over minQuota partitions assigned
- int numMembersAssignedOverMinQuota = 0;
+ // the expected number of members receiving more than minQuota partitions (zero when minQuota == maxQuota)
+ int expectedNumMembersWithOverMinQuotaPartitions = totalPartitionsCount % numberOfConsumers;
+ // the current number of members receiving more than minQuota partitions (zero when minQuota == maxQuota)
+ int currentNumMembersWithOverMinQuotaPartitions = 0;
// initialize the assignment map with an empty array of size maxQuota for all members
Map> assignment = new HashMap<>(
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota))));
List assignedPartitions = new ArrayList<>();
- // Reassign previously owned partitions to the expected number
+ // Reassign previously owned partitions, up to the expected number of partitions per consumer
for (Map.Entry> consumerEntry : consumerToOwnedPartitions.entrySet()) {
String consumer = consumerEntry.getKey();
List ownedPartitions = consumerEntry.getValue();
List consumerAssignment = assignment.get(consumer);
+ for (TopicPartition doublyClaimedPartition : partitionsWithMultiplePreviousOwners) {
+ if (ownedPartitions.contains(doublyClaimedPartition)) {
+ log.error("Found partition {} still claimed as owned by consumer {}, despite being claimed by multiple "
+ + "consumers already in the same generation. Removing it from the ownedPartitions",
+ doublyClaimedPartition, consumer);
+ ownedPartitions.remove(doublyClaimedPartition);
+ }
+ }
+
if (ownedPartitions.size() < minQuota) {
- // the expected assignment size is more than consumer have now, so keep all the owned partitions
- // and put this member into unfilled member list
+ // the expected assignment size is more than this consumer has now, so keep all the owned partitions
+ // and put this member into the unfilled member list
if (ownedPartitions.size() > 0) {
consumerAssignment.addAll(ownedPartitions);
assignedPartitions.addAll(ownedPartitions);
}
- unfilledMembers.add(consumer);
- } else if (ownedPartitions.size() >= maxQuota && numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) {
+ unfilledMembersWithUnderMinQuotaPartitions.add(consumer);
+ } else if (ownedPartitions.size() >= maxQuota && currentNumMembersWithOverMinQuotaPartitions < expectedNumMembersWithOverMinQuotaPartitions) {
// consumer owned the "maxQuota" of partitions or more, and we're still under the number of expected members
// with more than the minQuota partitions, so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions
- numMembersAssignedOverMinQuota++;
+ currentNumMembersWithOverMinQuotaPartitions++;
+ if (currentNumMembersWithOverMinQuotaPartitions == expectedNumMembersWithOverMinQuotaPartitions) {
+ unfilledMembersWithExactlyMinQuotaPartitions.clear();
+ }
List maxQuotaPartitions = ownedPartitions.subList(0, maxQuota);
consumerAssignment.addAll(maxQuotaPartitions);
assignedPartitions.addAll(maxQuotaPartitions);
@@ -218,70 +252,88 @@ private Map> constrainedAssign(Map
allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size()));
// this consumer is potential maxQuota candidate since we're still under the number of expected members
// with more than the minQuota partitions. Note, if the number of expected members with more than
- // the minQuota partitions is 0, it means minQuota == maxQuota, so they won't be put into unfilledMembers
- if (numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) {
- unfilledMembers.add(consumer);
+ // the minQuota partitions is 0, it means minQuota == maxQuota, and there are no potentially unfilled
+ if (currentNumMembersWithOverMinQuotaPartitions < expectedNumMembersWithOverMinQuotaPartitions) {
+ unfilledMembersWithExactlyMinQuotaPartitions.add(consumer);
}
}
}
List unassignedPartitions = getUnassignedPartitions(totalPartitionsCount, partitionsPerTopic, assignedPartitions);
- assignedPartitions = null;
if (log.isDebugEnabled()) {
log.debug("After reassigning previously owned partitions, unfilled members: {}, unassigned partitions: {}, " +
- "current assignment: {}", unfilledMembers, unassignedPartitions, assignment);
+ "current assignment: {}", unfilledMembersWithUnderMinQuotaPartitions, unassignedPartitions, assignment);
}
- Collections.sort(unfilledMembers);
+ Collections.sort(unfilledMembersWithUnderMinQuotaPartitions);
+ Collections.sort(unfilledMembersWithExactlyMinQuotaPartitions);
- Iterator unfilledConsumerIter = unfilledMembers.iterator();
+ Iterator unfilledConsumerIter = unfilledMembersWithUnderMinQuotaPartitions.iterator();
// Round-Robin filling remaining members up to the expected numbers of maxQuota, otherwise, to minQuota
for (TopicPartition unassignedPartition : unassignedPartitions) {
- if (!unfilledConsumerIter.hasNext()) {
- if (unfilledMembers.isEmpty()) {
- // Should not enter here since we have calculated the exact number to assign to each consumer
- // There might be issues in the assigning algorithm, or maybe assigning the same partition to two owners.
+ String consumer;
+ if (unfilledConsumerIter.hasNext()) {
+ consumer = unfilledConsumerIter.next();
+ } else {
+ if (unfilledMembersWithUnderMinQuotaPartitions.isEmpty() && unfilledMembersWithExactlyMinQuotaPartitions.isEmpty()) {
+ // Should not enter here since we have calculated the exact number to assign to each consumer.
+ // This indicates issues in the assignment algorithm
int currentPartitionIndex = unassignedPartitions.indexOf(unassignedPartition);
log.error("No more unfilled consumers to be assigned. The remaining unassigned partitions are: {}",
- unassignedPartitions.subList(currentPartitionIndex, unassignedPartitions.size()));
+ unassignedPartitions.subList(currentPartitionIndex, unassignedPartitions.size()));
throw new IllegalStateException("No more unfilled consumers to be assigned.");
+ } else if (unfilledMembersWithUnderMinQuotaPartitions.isEmpty()) {
+ consumer = unfilledMembersWithExactlyMinQuotaPartitions.poll();
+ } else {
+ unfilledConsumerIter = unfilledMembersWithUnderMinQuotaPartitions.iterator();
+ consumer = unfilledConsumerIter.next();
}
- unfilledConsumerIter = unfilledMembers.iterator();
}
- String consumer = unfilledConsumerIter.next();
+
List consumerAssignment = assignment.get(consumer);
consumerAssignment.add(unassignedPartition);
// We already assigned all possible ownedPartitions, so we know this must be newly assigned to this consumer
- if (allRevokedPartitions.contains(unassignedPartition))
+ // or else the partition was actually claimed by multiple previous owners and had to be invalidated from all
+ // members claimed ownedPartitions
+ if (allRevokedPartitions.contains(unassignedPartition) || partitionsWithMultiplePreviousOwners.contains(unassignedPartition))
partitionsTransferringOwnership.put(unassignedPartition, consumer);
int currentAssignedCount = consumerAssignment.size();
- int expectedAssignedCount = numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota ? maxQuota : minQuota;
- if (currentAssignedCount == expectedAssignedCount) {
- if (currentAssignedCount == maxQuota) {
- numMembersAssignedOverMinQuota++;
- }
+ if (currentAssignedCount == minQuota) {
unfilledConsumerIter.remove();
+ unfilledMembersWithExactlyMinQuotaPartitions.add(consumer);
+ } else if (currentAssignedCount == maxQuota) {
+ currentNumMembersWithOverMinQuotaPartitions++;
+ if (currentNumMembersWithOverMinQuotaPartitions == expectedNumMembersWithOverMinQuotaPartitions) {
+ // We only start to iterate over the "potentially unfilled" members at minQuota after we've filled
+ // all members up to at least minQuota, so once the last minQuota member reaches maxQuota, we
+ // should be done. But in case of some algorithmic error, just log a warning and continue to
+ // assign any remaining partitions within the assignment constraints
+ if (unassignedPartitions.indexOf(unassignedPartition) != unassignedPartitions.size() - 1) {
+ log.error("Filled the last member up to maxQuota but still had partitions remaining to assign, "
+ + "will continue but this indicates a bug in the assignment.");
+ }
+ }
}
}
- if (!unfilledMembers.isEmpty()) {
+ if (!unfilledMembersWithUnderMinQuotaPartitions.isEmpty()) {
// we expected all the remaining unfilled members have minQuota partitions and we're already at the expected number
// of members with more than the minQuota partitions. Otherwise, there must be error here.
- if (numMembersAssignedOverMinQuota != expectedNumMembersAssignedOverMinQuota) {
+ if (currentNumMembersWithOverMinQuotaPartitions != expectedNumMembersWithOverMinQuotaPartitions) {
log.error("Current number of members with more than the minQuota partitions: {}, is less than the expected number " +
"of members with more than the minQuota partitions: {}, and no more partitions to be assigned to the remaining unfilled consumers: {}",
- numMembersAssignedOverMinQuota, expectedNumMembersAssignedOverMinQuota, unfilledMembers);
+ currentNumMembersWithOverMinQuotaPartitions, expectedNumMembersWithOverMinQuotaPartitions, unfilledMembersWithUnderMinQuotaPartitions);
throw new IllegalStateException("We haven't reached the expected number of members with " +
"more than the minQuota partitions, but no more partitions to be assigned");
} else {
- for (String unfilledMember : unfilledMembers) {
+ for (String unfilledMember : unfilledMembersWithUnderMinQuotaPartitions) {
int assignedPartitionsCount = assignment.get(unfilledMember).size();
if (assignedPartitionsCount != minQuota) {
log.error("Consumer: [{}] should have {} partitions, but got {} partitions, and no more partitions " +
- "to be assigned. The remaining unfilled consumers are: {}", unfilledMember, minQuota, assignedPartitionsCount, unfilledMembers);
+ "to be assigned. The remaining unfilled consumers are: {}", unfilledMember, minQuota, assignedPartitionsCount, unfilledMembersWithUnderMinQuotaPartitions);
throw new IllegalStateException(String.format("Consumer: [%s] doesn't reach minQuota partitions, " +
"and no more partitions to be assigned", unfilledMember));
} else {
@@ -292,9 +344,7 @@ private Map> constrainedAssign(Map
}
}
- if (log.isDebugEnabled()) {
- log.debug("Final assignment of partitions to consumers: \n{}", assignment);
- }
+ log.info("Final assignment of partitions to consumers: \n{}", assignment);
return assignment;
}
@@ -412,7 +462,6 @@ private Map> generalAssign(Map par
// all partitions that needed to be assigned
List unassignedPartitions = getUnassignedPartitions(sortedAllPartitions, assignedPartitions, topic2AllPotentialConsumers);
- assignedPartitions = null;
if (log.isDebugEnabled()) {
log.debug("unassigned Partitions: {}", unassignedPartitions);
@@ -430,9 +479,7 @@ private Map> generalAssign(Map par
consumer2AllPotentialTopics, topic2AllPotentialConsumers, currentPartitionConsumer, revocationRequired,
partitionsPerTopic, totalPartitionsCount);
- if (log.isDebugEnabled()) {
- log.debug("final assignment: {}", currentAssignment);
- }
+ log.info("Final assignment of partitions to consumers: \n{}", currentAssignment);
return currentAssignment;
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
index d7d671e1a2abe..f94aa23180049 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignorTest.java
@@ -16,16 +16,25 @@
*/
package org.apache.kafka.clients.consumer;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
+import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor;
+import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest;
+import org.apache.kafka.common.TopicPartition;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
-import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
-import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor;
-import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest;
-import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static java.util.Collections.emptyList;
public class CooperativeStickyAssignorTest extends AbstractStickyAssignorTest {
@@ -39,6 +48,74 @@ public Subscription buildSubscription(List topics, List
return new Subscription(topics, assignor.subscriptionUserData(new HashSet<>(topics)), partitions);
}
+ @Override
+ public Subscription buildSubscriptionWithGeneration(List topics, List partitions, int generation) {
+ assignor.onAssignment(null, new ConsumerGroupMetadata("dummy-group-id", generation, "dummy-member-id", Optional.empty()));
+ return new Subscription(topics, assignor.subscriptionUserData(new HashSet<>(topics)), partitions);
+ }
+
+ @Test
+ public void testEncodeAndDecodeGeneration() {
+ Subscription subscription = new Subscription(topics(topic), assignor.subscriptionUserData(new HashSet<>(topics(topic))));
+
+ Optional encodedGeneration = ((CooperativeStickyAssignor) assignor).memberData(subscription).generation;
+ assertTrue(encodedGeneration.isPresent());
+ assertEquals(encodedGeneration.get(), DEFAULT_GENERATION);
+
+ int generation = 10;
+ assignor.onAssignment(null, new ConsumerGroupMetadata("dummy-group-id", generation, "dummy-member-id", Optional.empty()));
+
+ subscription = new Subscription(topics(topic), assignor.subscriptionUserData(new HashSet<>(topics(topic))));
+ encodedGeneration = ((CooperativeStickyAssignor) assignor).memberData(subscription).generation;
+
+ assertTrue(encodedGeneration.isPresent());
+ assertEquals(encodedGeneration.get(), generation);
+ }
+
+ @Test
+ public void testDecodeGeneration() {
+ Subscription subscription = new Subscription(topics(topic));
+ assertFalse(((CooperativeStickyAssignor) assignor).memberData(subscription).generation.isPresent());
+ }
+
+ @Test
+ public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithEqualPartitionsPerConsumer() {
+ Map partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 3);
+
+ subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1))));
+ subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 2))));
+ subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList()));
+
+ Map> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1));
+ assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2));
+ // In the cooperative assignor, topic-0 has to be considered "owned" and so it cant be assigned until both have "revoked" it
+ assertTrue(assignment.get(consumer3).isEmpty());
+
+ verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
+ @Test
+ public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithUnequalPartitionsPerConsumer() {
+ Map partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 4);
+
+ subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1))));
+ subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 2))));
+ subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList()));
+
+ Map> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(partitions(tp(topic, 1), tp(topic, 3)), assignment.get(consumer1));
+ assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2));
+ // In the cooperative assignor, topic-0 has to be considered "owned" and so it cant be assigned until both have "revoked" it
+ assertTrue(assignment.get(consumer3).isEmpty());
+
+ verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
/**
* The cooperative assignor must do some additional work and verification of some assignments relative to the eager
* assignor, since it may or may not need to trigger a second follow-up rebalance.
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
index 684a421be1807..bb03de25a81a6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
@@ -40,6 +40,8 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import static java.util.Collections.emptyList;
+
public class StickyAssignorTest extends AbstractStickyAssignorTest {
@Override
@@ -53,6 +55,48 @@ public Subscription buildSubscription(List topics, List
serializeTopicPartitionAssignment(new MemberData(partitions, Optional.of(DEFAULT_GENERATION))));
}
+ @Override
+ public Subscription buildSubscriptionWithGeneration(List topics, List partitions, int generation) {
+ return new Subscription(topics,
+ serializeTopicPartitionAssignment(new MemberData(partitions, Optional.of(generation))));
+ }
+
+ @Test
+ public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithEqualPartitionsPerConsumer() {
+ Map partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 3);
+
+ subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1))));
+ subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 2))));
+ subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList()));
+
+ Map> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(partitions(tp(topic, 1)), assignment.get(consumer1));
+ assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2));
+ assertEquals(partitions(tp(topic, 0)), assignment.get(consumer3));
+
+ verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
+ @Test
+ public void testAllConsumersHaveOwnedPartitionInvalidatedWhenClaimedByMultipleConsumersInSameGenerationWithUnequalPartitionsPerConsumer() {
+ Map partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 4);
+
+ subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1))));
+ subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 2))));
+ subscriptions.put(consumer3, buildSubscription(topics(topic), emptyList()));
+
+ Map> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(partitions(tp(topic, 1), tp(topic, 3)), assignment.get(consumer1));
+ assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2));
+ assertEquals(partitions(tp(topic, 0)), assignment.get(consumer3));
+
+ verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
@ParameterizedTest(name = "testAssignmentWithMultipleGenerations1 with isAllSubscriptionsEqual: {0}")
@ValueSource(booleans = {true, false})
public void testAssignmentWithMultipleGenerations1(boolean isAllSubscriptionsEqual) {
@@ -228,11 +272,6 @@ public void testSchemaBackwardCompatibility() {
assertTrue(isFullyBalanced(assignment));
}
- private Subscription buildSubscriptionWithGeneration(List topics, List partitions, int generation) {
- return new Subscription(topics,
- serializeTopicPartitionAssignment(new MemberData(partitions, Optional.of(generation))));
- }
-
private static Subscription buildSubscriptionWithOldSchema(List topics, List partitions) {
Struct struct = new Struct(StickyAssignor.STICKY_ASSIGNOR_USER_DATA_V0);
List topicAssignments = new ArrayList<>();
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
index a650cbbf39a67..789e6f753b98f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
@@ -34,6 +34,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -57,6 +58,8 @@ public abstract class AbstractStickyAssignorTest {
protected abstract Subscription buildSubscription(List topics, List partitions);
+ protected abstract Subscription buildSubscriptionWithGeneration(List topics, List partitions, int generation);
+
@BeforeEach
public void setUp() {
assignor = createAssignor();
@@ -264,7 +267,7 @@ public void testMaxQuotaConsumerMoreThanNumExpectedMaxCapacityMembers() {
* This unit test is testing all consumers owned less than minQuota partitions situation
*/
@Test
- public void testAllConsumerAreUnderMinQuota() {
+ public void testAllConsumersAreUnderMinQuota() {
Map partitionsPerTopic = new HashMap<>();
partitionsPerTopic.put(topic1, 2);
partitionsPerTopic.put(topic2, 3);
@@ -280,9 +283,9 @@ public void testAllConsumerAreUnderMinQuota() {
Map> assignment = assignor.assign(partitionsPerTopic, subscriptions);
verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
- assertEquals(partitions(tp(topic1, 0), tp(topic2, 0)), assignment.get(consumer1));
- assertEquals(partitions(tp(topic1, 1), tp(topic2, 1)), assignment.get(consumer2));
- assertEquals(partitions(tp(topic2, 2)), assignment.get(consumer3));
+ assertEquals(partitions(tp(topic1, 0), tp(topic2, 1)), assignment.get(consumer1));
+ assertEquals(partitions(tp(topic1, 1), tp(topic2, 2)), assignment.get(consumer2));
+ assertEquals(partitions(tp(topic2, 0)), assignment.get(consumer3));
assertTrue(isFullyBalanced(assignment));
}
@@ -355,8 +358,8 @@ public void testAddRemoveTwoConsumersTwoTopics() {
subscriptions.put(consumer3, buildSubscription(allTopics, assignment.get(consumer3)));
subscriptions.put(consumer4, buildSubscription(allTopics, assignment.get(consumer4)));
assignment = assignor.assign(partitionsPerTopic, subscriptions);
- assertEquals(partitions(tp(topic2, 1), tp(topic2, 3), tp(topic1, 0), tp(topic1, 2)), assignment.get(consumer3));
- assertEquals(partitions(tp(topic2, 2), tp(topic1, 1), tp(topic2, 0)), assignment.get(consumer4));
+ assertEquals(partitions(tp(topic2, 1), tp(topic2, 3), tp(topic1, 0), tp(topic2, 0)), assignment.get(consumer3));
+ assertEquals(partitions(tp(topic2, 2), tp(topic1, 1), tp(topic1, 2)), assignment.get(consumer4));
verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
assertTrue(isFullyBalanced(assignment));
@@ -449,7 +452,6 @@ public void testAddRemoveTopicTwoConsumers() {
assertTrue(consumer2assignment.containsAll(consumer2Assignment3));
}
-
@Test
public void testReassignmentAfterOneConsumerLeaves() {
Map partitionsPerTopic = new HashMap<>();
@@ -555,7 +557,7 @@ public void testLargeAssignmentAndGroupWithUniformSubscription() {
assignor.assign(partitionsPerTopic, subscriptions);
}
- @Timeout(40)
+ @Timeout(60)
@Test
public void testLargeAssignmentAndGroupWithNonEqualSubscription() {
// 1 million partitions!
@@ -790,6 +792,62 @@ public void testReassignmentWithRandomSubscriptionsAndChanges() {
}
}
+ @Test
+ public void testAllConsumersReachExpectedQuotaAndAreConsideredFilled() {
+ Map partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 4);
+
+ subscriptions.put(consumer1, buildSubscription(topics(topic), partitions(tp(topic, 0), tp(topic, 1))));
+ subscriptions.put(consumer2, buildSubscription(topics(topic), partitions(tp(topic, 2))));
+ subscriptions.put(consumer3, buildSubscription(topics(topic), Collections.emptyList()));
+
+ Map> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(partitions(tp(topic, 0), tp(topic, 1)), assignment.get(consumer1));
+ assertEquals(partitions(tp(topic, 2)), assignment.get(consumer2));
+ assertEquals(partitions(tp(topic, 3)), assignment.get(consumer3));
+
+ verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
+ @Test
+ public void testOwnedPartitionsAreInvalidatedForConsumerWithStaleGeneration() {
+ Map partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 3);
+ partitionsPerTopic.put(topic2, 3);
+
+ int currentGeneration = 10;
+
+ subscriptions.put(consumer1, buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1)), currentGeneration));
+ subscriptions.put(consumer2, buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1)), currentGeneration - 1));
+
+ Map> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1))), new HashSet<>(assignment.get(consumer1)));
+ assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 0), tp(topic2, 2))), new HashSet<>(assignment.get(consumer2)));
+
+ verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
+ @Test
+ public void testOwnedPartitionsAreInvalidatedForConsumerWithNoGeneration() {
+ Map partitionsPerTopic = new HashMap<>();
+ partitionsPerTopic.put(topic, 3);
+ partitionsPerTopic.put(topic2, 3);
+
+ int currentGeneration = 10;
+
+ subscriptions.put(consumer1, buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1)), currentGeneration));
+ subscriptions.put(consumer2, buildSubscriptionWithGeneration(topics(topic, topic2), partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1)), DEFAULT_GENERATION));
+
+ Map> assignment = assignor.assign(partitionsPerTopic, subscriptions);
+ assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic, 2), tp(topic2, 1))), new HashSet<>(assignment.get(consumer1)));
+ assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 0), tp(topic2, 2))), new HashSet<>(assignment.get(consumer2)));
+
+ verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
+ assertTrue(isFullyBalanced(assignment));
+ }
+
private String getTopicName(int i, int maxNum) {
return getCanonicalName("t", i, maxNum);
}