From d28806a170a4c8493902fd190c0f319353bebb5c Mon Sep 17 00:00:00 2001 From: David Jacot Date: Tue, 11 Apr 2023 10:18:33 +0200 Subject: [PATCH 1/8] Add CurrentAssignmentBuilder --- .../consumer/CurrentAssignmentBuilder.java | 415 +++++++++++++ .../CurrentAssignmentBuilderTest.java | 548 ++++++++++++++++++ 2 files changed, 963 insertions(+) create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java create mode 100644 group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java new file mode 100644 index 0000000000000..9aa5d3c3527c1 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + * the target/desired assignment. The member transition to this epoch + * when it has revoked the partitions that it does not owned or if it + * does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + * the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it could transition + * to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions + * in this set are still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts + * here when the next epoch does not match the target epoch. It means that + * a new target assignment has been installed so the reconciliation process + * must restart. In this state, the Assigned, Revoking and Assigning sets + * are computed. If Revoking is not empty, the state machine transitions + * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; + * otherwise it transitions to STABLE. + * - REVOKE - This state means that the member must revoke partitions before it can + * transition to the next epoch and thus start receiving new partitions. + * The member transitions to the next state only when it has acknowledged + * the revocation. + * - ASSIGNING - This state means that the member waits on partitions which are still + * owned by other members in the group. It remains in this state until + * they are all freed up. + * - STABLE - This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { + /** + * The consumer group member which is reconciled. + */ + private final ConsumerGroupMember member; + + /** + * The target assignment epoch. + */ + private int targetAssignmentEpoch; + + /** + * The target assignment. + */ + private Assignment targetAssignment; + + /** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ + private BiFunction currentPartitionEpoch; + + /** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ + private List ownedTopicPartitions; + + /** + * Constructs the CurrentAssignmentBuilder based on the current state of the + * provided consumer group member. + * + * @param member The consumer group member that must be reconciled. + */ + public CurrentAssignmentBuilder(ConsumerGroupMember member) { + this.member = Objects.requireNonNull(member); + } + + /** + * Sets the target assignment epoch and the target assignment that the + * consumer group member must be reconciled to. + * + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @return This object. + */ + public CurrentAssignmentBuilder withTargetAssignment( + int targetAssignmentEpoch, + Assignment targetAssignment + ) { + this.targetAssignmentEpoch = targetAssignmentEpoch; + this.targetAssignment = Objects.requireNonNull(targetAssignment); + return this; + } + + /** + * Sets a BiFunction which allows to retrieve the current epoch of a + * partition. This is used by the state machine to determine if a + * partition is free or still used by another member. + * + * @param currentPartitionEpoch A BiFunction which gets the epoch of a + * topic id / partitions id pair. + * @return This object. + */ + public CurrentAssignmentBuilder withCurrentPartitionEpoch( + BiFunction currentPartitionEpoch + ) { + this.currentPartitionEpoch = Objects.requireNonNull(currentPartitionEpoch); + return this; + } + + /** + * Sets the partitions currently owned by the member. This comes directly + * from the last ConsumerGroupHeartbeat request. This is used to determine + * if the member has revoked the necessary partitions. + * + * @param ownedTopicPartitions A list of topic-partitions. + * @return This object. + */ + public CurrentAssignmentBuilder withOwnedTopicPartitions( + List ownedTopicPartitions + ) { + this.ownedTopicPartitions = ownedTopicPartitions; + return this; + } + + /** + * Builds the next state for the member or keep the current one if it + * is not possible to move forward with the current state. + * + * @return A new ConsumerGroupMember or the current one. + */ + public ConsumerGroupMember build() { + // A new target assignment has been installed, we need to restart + // the reconciliation loop from the beginning. + if (targetAssignmentEpoch != member.nextMemberEpoch()) { + return transitionToInitialState(); + } + + switch (member.state()) { + // Check if the partitions have been revoked by the member. + case REVOKING: + return maybeTransitionFromRevokingToAssigningOrStable(); + + // Check if pending partitions have been freed up. + case ASSIGNING: + return maybeTransitionFromAssigningToAssigningOrStable(); + + // Nothing to do. + case STABLE: + return member; + } + + return member; + } + + /** + * Transitions to the initial state. Here we compute the Assigned, + * Revoking and Assigning sets. + * + * @return A new ConsumerGroupMember. + */ + private ConsumerGroupMember transitionToInitialState() { + Map> newAssignedSet = new HashMap<>(); + Map> newRevokingSet = new HashMap<>(); + Map> newAssigningSet = new HashMap<>(); + + // Compute the combined set of topics. + Set allTopicIds = new HashSet<>(targetAssignment.partitions().keySet()); + allTopicIds.addAll(member.assignedPartitions().keySet()); + allTopicIds.addAll(member.partitionsPendingRevocation().keySet()); + allTopicIds.addAll(member.partitionsPendingAssignment().keySet()); + + for (Uuid topicId : allTopicIds) { + Set target = targetAssignment.partitions() + .getOrDefault(topicId, Collections.emptySet()); + Set currentAssignedPartitions = member.assignedPartitions() + .getOrDefault(topicId, Collections.emptySet()); + Set currentRevokingPartitions = member.partitionsPendingRevocation() + .getOrDefault(topicId, Collections.emptySet()); + + // Assigned_1 = (Assigned_0 + Revoking_0) /\ Target + Set newAssignedPartitions = new HashSet<>(currentAssignedPartitions); + newAssignedPartitions.addAll(currentRevokingPartitions); + newAssignedPartitions.retainAll(target); + + // Revoking_1 = (Assigned_0 + Revoking_0) - Assigned_1 + Set newRevokingPartitions = new HashSet<>(currentAssignedPartitions); + newRevokingPartitions.addAll(currentRevokingPartitions); + newRevokingPartitions.removeAll(newAssignedPartitions); + + // Assigning_1 = Target - Assigned_1 + Set newAssigningPartitions = new HashSet<>(target); + newAssigningPartitions.removeAll(newAssignedPartitions); + + if (!newAssignedPartitions.isEmpty()) { + newAssignedSet.put(topicId, newAssignedPartitions); + } + + if (!newRevokingPartitions.isEmpty()) { + newRevokingSet.put(topicId, newRevokingPartitions); + } + + if (!newAssigningPartitions.isEmpty()) { + newAssigningSet.put(topicId, newAssigningPartitions); + } + } + + if (!newRevokingSet.isEmpty()) { + // If the revoking set is not empty, we transition to Revoking and we + // stay in the current epoch. + return new ConsumerGroupMember.Builder(member) + .setAssignedPartitions(newAssignedSet) + .setPartitionsPendingRevocation(newRevokingSet) + .setPartitionsPendingAssignment(newAssigningSet) + .setNextMemberEpoch(targetAssignmentEpoch) + .build(); + } else { + if (!newAssigningSet.isEmpty()) { + // If the assigning set is not empty, we check if some or all + // partitions are free to use. If they are, we move them to + // the assigned set. + maybeAssignPendingPartitions(newAssignedSet, newAssigningSet); + } + + // We transition to the target epoch. If the assigning set is empty, + // the member transition to stable, otherwise to assigning. + return new ConsumerGroupMember.Builder(member) + .setAssignedPartitions(newAssignedSet) + .setPartitionsPendingRevocation(Collections.emptyMap()) + .setPartitionsPendingAssignment(newAssigningSet) + .setPreviousMemberEpoch(member.memberEpoch()) + .setMemberEpoch(targetAssignmentEpoch) + .setNextMemberEpoch(targetAssignmentEpoch) + .build(); + } + } + + /** + * Tries to transition from Revoke to Assigning or Stable. This is only + * possible when the member acknowledges that it only owns the partition + * in the Assigned set. + * + * @return A new ConsumerGroupMember with the new state or the current one + * if the member stays in the current state. + */ + private ConsumerGroupMember maybeTransitionFromRevokingToAssigningOrStable() { + if (member.partitionsPendingRevocation().isEmpty() || hasRevokedAllPartitions(ownedTopicPartitions)) { + Map> newAssignedSet = deepCopy(member.assignedPartitions()); + Map> newAssigningSet = deepCopy(member.partitionsPendingAssignment()); + + if (!newAssigningSet.isEmpty()) { + // If the assigning set is not empty, we check if some or all + // partitions are free to use. If they are, we move them to + // the assigned set. + maybeAssignPendingPartitions(newAssignedSet, newAssigningSet); + } + + // We transition to the target epoch. If the assigning set is empty, + // the member transition to stable, otherwise to assigning. + return new ConsumerGroupMember.Builder(member) + .setAssignedPartitions(newAssignedSet) + .setPartitionsPendingRevocation(Collections.emptyMap()) + .setPartitionsPendingAssignment(newAssigningSet) + .setPreviousMemberEpoch(member.memberEpoch()) + .setMemberEpoch(targetAssignmentEpoch) + .setNextMemberEpoch(targetAssignmentEpoch) + .build(); + } else { + return member; + } + } + + /** + * Tries to transition from Assigning to Assigning or Stable. This is only + * possible when one or more partitions in the Assigning set have been freed + * up by other members in the group. + * + * @return A new ConsumerGroupMember with the new state or the current one + * if the member stays in the current state. + */ + private ConsumerGroupMember maybeTransitionFromAssigningToAssigningOrStable() { + Map> newAssignedSet = deepCopy(member.assignedPartitions()); + Map> newAssigningSet = deepCopy(member.partitionsPendingAssignment()); + + // If any partition can transition from assigning to assigned, we update + // the member. Otherwise, we return the current one. + if (maybeAssignPendingPartitions(newAssignedSet, newAssigningSet)) { + return new ConsumerGroupMember.Builder(member) + .setAssignedPartitions(newAssignedSet) + .setPartitionsPendingRevocation(Collections.emptyMap()) + .setPartitionsPendingAssignment(newAssigningSet) + .setPreviousMemberEpoch(member.memberEpoch()) + .setMemberEpoch(targetAssignmentEpoch) + .setNextMemberEpoch(targetAssignmentEpoch) + .build(); + } else { + return member; + } + } + + /** + * Makes a deep copy of an assignment map. + * + * @param map The Map to copy. + * @return The copy. + */ + private Map> deepCopy(Map> map) { + Map> copy = new HashMap<>(); + map.forEach((topicId, partitions) -> { + copy.put(topicId, new HashSet<>(partitions)); + }); + return copy; + } + + /** + * Tries to move partitions from the Assigning set to the Assigned set + * if they are no longer owned. + * + * @param newAssignedSet The Assigned set. + * @param newAssigningSet The Assigning set. + * @return A boolean indicating if any partitions were moved. + */ + private boolean maybeAssignPendingPartitions( + Map> newAssignedSet, + Map> newAssigningSet + ) { + boolean changed = false; + + Iterator>> assigningSetIterator = newAssigningSet.entrySet().iterator(); + while (assigningSetIterator.hasNext()) { + Map.Entry> pair = assigningSetIterator.next(); + Uuid topicId = pair.getKey(); + Set assigning = pair.getValue(); + Iterator assigningIterator = assigning.iterator(); + while (assigningIterator.hasNext()) { + Integer partitionId = assigningIterator.next(); + Integer partitionEpoch = currentPartitionEpoch.apply(topicId, partitionId); + if (partitionEpoch == -1) { + assigningIterator.remove(); + put(newAssignedSet, topicId, partitionId); + changed = true; + } + } + if (assigning.isEmpty()) { + assigningSetIterator.remove(); + } + } + + return changed; + } + + /** + * Checks whether the owned topic partitions passed by the member to the state + * machine via the ConsumerGroupHeartbeat request corresponds to the Assigned + * set. + * + * @param ownedTopicPartitions The topic partitions owned by the remove client. + * @return A boolean indicating if the owned partitions matches the Assigned set. + */ + private boolean hasRevokedAllPartitions( + List ownedTopicPartitions + ) { + if (ownedTopicPartitions == null) return false; + if (ownedTopicPartitions.size() != member.assignedPartitions().size()) return false; + + for (ConsumerGroupHeartbeatRequestData.TopicPartitions topicPartitions : ownedTopicPartitions) { + Set partitions = member.assignedPartitions().get(topicPartitions.topicId()); + if (partitions == null) return false; + for (Integer partitionId : topicPartitions.partitions()) { + if (!partitions.contains(partitionId)) return false; + } + } + + return true; + } + + /** + * Puts the given TopicId and Partitions to the given map. + */ + private void put(Map> map, Uuid topicId, Integer partitionId) { + map.compute(topicId, (__, partitionsOrNull) -> { + if (partitionsOrNull == null) partitionsOrNull = new HashSet<>(); + partitionsOrNull.add(partitionId); + return partitionsOrNull; + }); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java new file mode 100644 index 0000000000000..c7bb21e0a9c82 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class CurrentAssignmentBuilderTest { + + @Test + public void testTransitionFromNewTargetToRevoke() { + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setNextMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3), + mkTopicAssignment(topicId2, 4, 5, 6))) + .build(); + + assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + + Assignment targetAssignment = new Assignment(mkAssignment( + mkTopicAssignment(topicId1, 3, 4, 5), + mkTopicAssignment(topicId2, 6, 7, 8) + )); + + ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(11, targetAssignment) + .withCurrentPartitionEpoch((topicId, partitionId) -> 10) + .build(); + + assertEquals(ConsumerGroupMember.MemberState.REVOKING, updatedMember.state()); + assertEquals(10, updatedMember.previousMemberEpoch()); + assertEquals(10, updatedMember.memberEpoch()); + assertEquals(11, updatedMember.nextMemberEpoch()); + assertEquals(mkAssignment( + mkTopicAssignment(topicId1, 3), + mkTopicAssignment(topicId2, 6) + ), updatedMember.assignedPartitions()); + assertEquals(mkAssignment( + mkTopicAssignment(topicId1, 1, 2), + mkTopicAssignment(topicId2, 4, 5) + ), updatedMember.partitionsPendingRevocation()); + assertEquals(mkAssignment( + mkTopicAssignment(topicId1, 4, 5), + mkTopicAssignment(topicId2, 7, 8) + ), updatedMember.partitionsPendingAssignment()); + } + + @Test + public void testTransitionFromNewTargetToAssigning() { + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setNextMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3), + mkTopicAssignment(topicId2, 4, 5, 6))) + .build(); + + assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + + Assignment targetAssignment = new Assignment(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3, 4, 5), + mkTopicAssignment(topicId2, 4, 5, 6, 7, 8) + )); + + ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(11, targetAssignment) + .withCurrentPartitionEpoch((topicId, partitionId) -> 10) + .build(); + + assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, updatedMember.state()); + assertEquals(10, updatedMember.previousMemberEpoch()); + assertEquals(11, updatedMember.memberEpoch()); + assertEquals(11, updatedMember.nextMemberEpoch()); + assertEquals(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3), + mkTopicAssignment(topicId2, 4, 5, 6) + ), updatedMember.assignedPartitions()); + assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingRevocation()); + assertEquals(mkAssignment( + mkTopicAssignment(topicId1, 4, 5), + mkTopicAssignment(topicId2, 7, 8) + ), updatedMember.partitionsPendingAssignment()); + } + + @Test + public void testTransitionFromNewTargetToStable() { + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setNextMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3), + mkTopicAssignment(topicId2, 4, 5, 6))) + .build(); + + assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + + Assignment targetAssignment = new Assignment(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3), + mkTopicAssignment(topicId2, 4, 5, 6) + )); + + ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(10, targetAssignment) + .withCurrentPartitionEpoch((topicId, partitionId) -> 10) + .build(); + + assertEquals(ConsumerGroupMember.MemberState.STABLE, updatedMember.state()); + assertEquals(10, updatedMember.previousMemberEpoch()); + assertEquals(10, updatedMember.memberEpoch()); + assertEquals(10, updatedMember.nextMemberEpoch()); + assertEquals(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3), + mkTopicAssignment(topicId2, 4, 5, 6) + ), updatedMember.assignedPartitions()); + assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingRevocation()); + assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingAssignment()); + } + + @Test + public void testTransitionFromRevokeToRevokeWithNull() { + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setNextMemberEpoch(11) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 3), + mkTopicAssignment(topicId2, 6))) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(topicId1, 1, 2), + mkTopicAssignment(topicId2, 4, 5))) + .setPartitionsPendingAssignment(mkAssignment( + mkTopicAssignment(topicId1, 4, 5), + mkTopicAssignment(topicId2, 7, 8))) + .build(); + + assertEquals(ConsumerGroupMember.MemberState.REVOKING, member.state()); + + Assignment targetAssignment = new Assignment(mkAssignment( + mkTopicAssignment(topicId1, 3, 4, 5), + mkTopicAssignment(topicId2, 6, 7, 8) + )); + + ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(11, targetAssignment) + .withCurrentPartitionEpoch((topicId, partitionId) -> -1) + .withOwnedTopicPartitions(null) // The client has not revoked yet. + .build(); + + assertEquals(ConsumerGroupMember.MemberState.REVOKING, updatedMember.state()); + assertEquals(10, updatedMember.previousMemberEpoch()); + assertEquals(10, updatedMember.memberEpoch()); + assertEquals(11, updatedMember.nextMemberEpoch()); + assertEquals(mkAssignment( + mkTopicAssignment(topicId1, 3), + mkTopicAssignment(topicId2, 6) + ), updatedMember.assignedPartitions()); + assertEquals(mkAssignment( + mkTopicAssignment(topicId1, 1, 2), + mkTopicAssignment(topicId2, 4, 5) + ), updatedMember.partitionsPendingRevocation()); + assertEquals(mkAssignment( + mkTopicAssignment(topicId1, 4, 5), + mkTopicAssignment(topicId2, 7, 8) + ), updatedMember.partitionsPendingAssignment()); + } + + @Test + public void testTransitionFromRevokeToRevokeWithEmptyList() { + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setNextMemberEpoch(11) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 3), + mkTopicAssignment(topicId2, 6))) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(topicId1, 1, 2), + mkTopicAssignment(topicId2, 4, 5))) + .setPartitionsPendingAssignment(mkAssignment( + mkTopicAssignment(topicId1, 4, 5), + mkTopicAssignment(topicId2, 7, 8))) + .build(); + + assertEquals(ConsumerGroupMember.MemberState.REVOKING, member.state()); + + Assignment targetAssignment = new Assignment(mkAssignment( + mkTopicAssignment(topicId1, 3, 4, 5), + mkTopicAssignment(topicId2, 6, 7, 8) + )); + + ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(11, targetAssignment) + .withCurrentPartitionEpoch((topicId, partitionId) -> -1) + .withOwnedTopicPartitions(Collections.emptyList()) // The client has not revoked yet. + .build(); + + assertEquals(ConsumerGroupMember.MemberState.REVOKING, updatedMember.state()); + assertEquals(10, updatedMember.previousMemberEpoch()); + assertEquals(10, updatedMember.memberEpoch()); + assertEquals(11, updatedMember.nextMemberEpoch()); + assertEquals(mkAssignment( + mkTopicAssignment(topicId1, 3), + mkTopicAssignment(topicId2, 6) + ), updatedMember.assignedPartitions()); + assertEquals(mkAssignment( + mkTopicAssignment(topicId1, 1, 2), + mkTopicAssignment(topicId2, 4, 5) + ), updatedMember.partitionsPendingRevocation()); + assertEquals(mkAssignment( + mkTopicAssignment(topicId1, 4, 5), + mkTopicAssignment(topicId2, 7, 8) + ), updatedMember.partitionsPendingAssignment()); + } + + @Test + public void testTransitionFromRevokeToAssigning() { + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setNextMemberEpoch(11) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 3), + mkTopicAssignment(topicId2, 6))) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(topicId1, 1, 2), + mkTopicAssignment(topicId2, 4, 5))) + .setPartitionsPendingAssignment(mkAssignment( + mkTopicAssignment(topicId1, 4, 5), + mkTopicAssignment(topicId2, 7, 8))) + .build(); + + assertEquals(ConsumerGroupMember.MemberState.REVOKING, member.state()); + + Assignment targetAssignment = new Assignment(mkAssignment( + mkTopicAssignment(topicId1, 3, 4, 5), + mkTopicAssignment(topicId2, 6, 7, 8) + )); + + ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(11, targetAssignment) + .withCurrentPartitionEpoch((topicId, partitionId) -> 10) + .withOwnedTopicPartitions(requestFromAssignment(mkAssignment( + mkTopicAssignment(topicId1, 3), + mkTopicAssignment(topicId2, 6)))) + .build(); + + assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, updatedMember.state()); + assertEquals(10, updatedMember.previousMemberEpoch()); + assertEquals(11, updatedMember.memberEpoch()); + assertEquals(11, updatedMember.nextMemberEpoch()); + assertEquals(mkAssignment( + mkTopicAssignment(topicId1, 3), + mkTopicAssignment(topicId2, 6) + ), updatedMember.assignedPartitions()); + assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingRevocation()); + assertEquals(mkAssignment( + mkTopicAssignment(topicId1, 4, 5), + mkTopicAssignment(topicId2, 7, 8) + ), updatedMember.partitionsPendingAssignment()); + } + + @Test + public void testTransitionFromRevokeToStable() { + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setNextMemberEpoch(11) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 3), + mkTopicAssignment(topicId2, 6))) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(topicId1, 1, 2), + mkTopicAssignment(topicId2, 4, 5))) + .setPartitionsPendingAssignment(mkAssignment( + mkTopicAssignment(topicId1, 4, 5), + mkTopicAssignment(topicId2, 7, 8))) + .build(); + + assertEquals(ConsumerGroupMember.MemberState.REVOKING, member.state()); + + Assignment targetAssignment = new Assignment(mkAssignment( + mkTopicAssignment(topicId1, 3, 4, 5), + mkTopicAssignment(topicId2, 6, 7, 8) + )); + + ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(11, targetAssignment) + .withCurrentPartitionEpoch((topicId, partitionId) -> -1) + .withOwnedTopicPartitions(requestFromAssignment(mkAssignment( + mkTopicAssignment(topicId1, 3), + mkTopicAssignment(topicId2, 6)))) + .build(); + + assertEquals(ConsumerGroupMember.MemberState.STABLE, updatedMember.state()); + assertEquals(10, updatedMember.previousMemberEpoch()); + assertEquals(11, updatedMember.memberEpoch()); + assertEquals(11, updatedMember.nextMemberEpoch()); + assertEquals(mkAssignment( + mkTopicAssignment(topicId1, 3, 4, 5), + mkTopicAssignment(topicId2, 6, 7, 8) + ), updatedMember.assignedPartitions()); + assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingRevocation()); + assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingAssignment()); + } + + @Test + public void testTransitionFromAssigningToAssigning() { + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") + .setMemberEpoch(10) + .setPreviousMemberEpoch(11) + .setNextMemberEpoch(11) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 3), + mkTopicAssignment(topicId2, 6))) + .setPartitionsPendingAssignment(mkAssignment( + mkTopicAssignment(topicId1, 4, 5), + mkTopicAssignment(topicId2, 7, 8))) + .build(); + + assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, member.state()); + + Assignment targetAssignment = new Assignment(mkAssignment( + mkTopicAssignment(topicId1, 3, 4, 5), + mkTopicAssignment(topicId2, 6, 7, 8) + )); + + ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(11, targetAssignment) + .withCurrentPartitionEpoch((topicId, partitionId) -> { + if (topicId.equals(topicId1)) + return -1; + else + return 10; + }) + .build(); + + assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, updatedMember.state()); + assertEquals(10, updatedMember.previousMemberEpoch()); + assertEquals(11, updatedMember.memberEpoch()); + assertEquals(11, updatedMember.nextMemberEpoch()); + assertEquals(mkAssignment( + mkTopicAssignment(topicId1, 3, 4, 5), + mkTopicAssignment(topicId2, 6) + ), updatedMember.assignedPartitions()); + assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingRevocation()); + assertEquals(mkAssignment( + mkTopicAssignment(topicId2, 7, 8) + ), updatedMember.partitionsPendingAssignment()); + } + + @Test + public void testTransitionFromAssigningToStable() { + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") + .setMemberEpoch(10) + .setPreviousMemberEpoch(11) + .setNextMemberEpoch(11) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 3), + mkTopicAssignment(topicId2, 6))) + .setPartitionsPendingAssignment(mkAssignment( + mkTopicAssignment(topicId1, 4, 5), + mkTopicAssignment(topicId2, 7, 8))) + .build(); + + assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, member.state()); + + Assignment targetAssignment = new Assignment(mkAssignment( + mkTopicAssignment(topicId1, 3, 4, 5), + mkTopicAssignment(topicId2, 6, 7, 8) + )); + + ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(11, targetAssignment) + .withCurrentPartitionEpoch((topicId, partitionId) -> -1) + .build(); + + assertEquals(ConsumerGroupMember.MemberState.STABLE, updatedMember.state()); + assertEquals(10, updatedMember.previousMemberEpoch()); + assertEquals(11, updatedMember.memberEpoch()); + assertEquals(11, updatedMember.nextMemberEpoch()); + assertEquals(mkAssignment( + mkTopicAssignment(topicId1, 3, 4, 5), + mkTopicAssignment(topicId2, 6, 7, 8) + ), updatedMember.assignedPartitions()); + assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingRevocation()); + assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingAssignment()); + } + + @Test + public void testTransitionFromStableToStable() { + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") + .setMemberEpoch(11) + .setPreviousMemberEpoch(11) + .setNextMemberEpoch(11) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 3), + mkTopicAssignment(topicId2, 6))) + .setPartitionsPendingAssignment(mkAssignment( + mkTopicAssignment(topicId1, 4, 5), + mkTopicAssignment(topicId2, 7, 8))) + .build(); + + assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, member.state()); + + Assignment targetAssignment = new Assignment(mkAssignment( + mkTopicAssignment(topicId1, 3, 4, 5), + mkTopicAssignment(topicId2, 6, 7, 8) + )); + + ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(11, targetAssignment) + .withCurrentPartitionEpoch((topicId, partitionId) -> -1) + .build(); + + assertEquals(ConsumerGroupMember.MemberState.STABLE, updatedMember.state()); + assertEquals(11, updatedMember.previousMemberEpoch()); + assertEquals(11, updatedMember.memberEpoch()); + assertEquals(11, updatedMember.nextMemberEpoch()); + assertEquals(mkAssignment( + mkTopicAssignment(topicId1, 3, 4, 5), + mkTopicAssignment(topicId2, 6, 7, 8) + ), updatedMember.assignedPartitions()); + assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingRevocation()); + assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingAssignment()); + } + + @Test + public void testNewTargetRestartReconciliation() { + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setNextMemberEpoch(11) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 3), + mkTopicAssignment(topicId2, 6))) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(topicId1, 1, 2), + mkTopicAssignment(topicId2, 4, 5))) + .setPartitionsPendingAssignment(mkAssignment( + mkTopicAssignment(topicId1, 4, 5), + mkTopicAssignment(topicId2, 7, 8))) + .build(); + + assertEquals(ConsumerGroupMember.MemberState.REVOKING, member.state()); + + Assignment targetAssignment = new Assignment(mkAssignment( + mkTopicAssignment(topicId1, 6, 7, 8), + mkTopicAssignment(topicId2, 9, 10, 11) + )); + + ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(12, targetAssignment) + .withCurrentPartitionEpoch((topicId, partitionId) -> -1) + .build(); + + assertEquals(ConsumerGroupMember.MemberState.REVOKING, updatedMember.state()); + assertEquals(10, updatedMember.previousMemberEpoch()); + assertEquals(10, updatedMember.memberEpoch()); + assertEquals(12, updatedMember.nextMemberEpoch()); + assertEquals(Collections.emptyMap(), updatedMember.assignedPartitions()); + assertEquals(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3), + mkTopicAssignment(topicId2, 4, 5, 6) + ), updatedMember.partitionsPendingRevocation()); + assertEquals(mkAssignment( + mkTopicAssignment(topicId1, 6, 7, 8), + mkTopicAssignment(topicId2, 9, 10, 11) + ), updatedMember.partitionsPendingAssignment()); + } + + private static List requestFromAssignment( + Map> assignment + ) { + List topicPartitions = new ArrayList<>(); + + assignment.forEach((topicId, partitions) -> { + ConsumerGroupHeartbeatRequestData.TopicPartitions topic = new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(topicId) + .setPartitions(new ArrayList<>(partitions)); + topicPartitions.add(topic); + }); + + return topicPartitions; + } +} From adb8672ba286a6dfc17de9b0bb848de412fe6f33 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Thu, 27 Apr 2023 14:41:24 +0200 Subject: [PATCH 2/8] address minor comments --- .../group/consumer/CurrentAssignmentBuilder.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java index 9aa5d3c3527c1..9a56d43f68e8c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java @@ -37,13 +37,13 @@ * The member state has the following properties: * - Current Epoch - The current epoch of the member. * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of - * the target/desired assignment. The member transition to this epoch - * when it has revoked the partitions that it does not owned or if it + * the target/desired assignment. The member transitions to this epoch + * when it has revoked the partitions that it does not own or if it * does not have to revoke any. * - Previous Epoch - The previous epoch of the member when the state was updated. * - Assigned Set - The set of partitions currently assigned to the member. This represents what * the member should have. - * - Revoking Set - The set of partitions that the member should revoke before it could transition + * - Revoking Set - The set of partitions that the member should revoke before it can transition * to the next state. * - Assigning Set - The set of partitions that the member will eventually receive. The partitions * in this set are still owned by other members in the group. From cd2a8353a15dd27f72aa3c6acc76d002450db63d Mon Sep 17 00:00:00 2001 From: David Jacot Date: Wed, 3 May 2023 08:47:53 +0200 Subject: [PATCH 3/8] address minor comments and minor refactor --- .../consumer/CurrentAssignmentBuilder.java | 211 ++++++++++-------- 1 file changed, 116 insertions(+), 95 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java index 9a56d43f68e8c..d034d67f77955 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java @@ -35,35 +35,47 @@ * assignment state, the state machine takes the necessary steps to converge them. * * The member state has the following properties: - * - Current Epoch - The current epoch of the member. - * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of - * the target/desired assignment. The member transitions to this epoch - * when it has revoked the partitions that it does not own or if it - * does not have to revoke any. - * - Previous Epoch - The previous epoch of the member when the state was updated. - * - Assigned Set - The set of partitions currently assigned to the member. This represents what - * the member should have. - * - Revoking Set - The set of partitions that the member should revoke before it can transition - * to the next state. - * - Assigning Set - The set of partitions that the member will eventually receive. The partitions - * in this set are still owned by other members in the group. + * - Current Epoch: + * The current epoch of the member. + * + * - Next Epoch: + * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. + * The member transitions to this epoch when it has revoked the partitions that it does not own + * or if it does not have to revoke any. + * + * - Previous Epoch: + * The previous epoch of the member when the state was updated. + * + * - Assigned Partitions: + * The set of partitions currently assigned to the member. This represents what the member should have. + * + * - Partitions Pending Revocation: + * The set of partitions that the member should revoke before it can transition to the next state. + * + * - Partitions Pending Assignment: + * The set of partitions that the member will eventually receive. The partitions in this set are + * still owned by other members in the group. * * The state machine has four states: - * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts - * here when the next epoch does not match the target epoch. It means that - * a new target assignment has been installed so the reconciliation process - * must restart. In this state, the Assigned, Revoking and Assigning sets - * are computed. If Revoking is not empty, the state machine transitions - * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; - * otherwise it transitions to STABLE. - * - REVOKE - This state means that the member must revoke partitions before it can - * transition to the next epoch and thus start receiving new partitions. - * The member transitions to the next state only when it has acknowledged - * the revocation. - * - ASSIGNING - This state means that the member waits on partitions which are still - * owned by other members in the group. It remains in this state until - * they are all freed up. - * - STABLE - This state means that the member has received all its assigned partitions. + * - NEW_TARGET_ASSIGNMENT: + * This is the initial state of the state machine. The state machine starts here when the next epoch + * does not match the target epoch. It means that a new target assignment has been installed so the + * reconciliation process must restart. In this state, the Assigned, Revoking and Assigning sets are + * computed. If Revoking is not empty, the state machine transitions to REVOKING; if Assigning is not + * empty, it transitions to ASSIGNING; otherwise it transitions to STABLE. + * + * - REVOKING: + * This state means that the member must revoke partitions before it can transition to the next epoch + * and thus start receiving new partitions. This is to guarantee that offsets of revoked partitions + * are committed with the current epoch. The member transitions to the next state only when it has + * acknowledged the revocation. + * + * - ASSIGNING: + * This state means that the member waits on partitions which are still owned by other members in the + * group. It remains in this state until they are all freed up. + * + * - STABLE: + * This state means that the member has received all its assigned partitions. */ public class CurrentAssignmentBuilder { /** @@ -188,9 +200,9 @@ public ConsumerGroupMember build() { * @return A new ConsumerGroupMember. */ private ConsumerGroupMember transitionToInitialState() { - Map> newAssignedSet = new HashMap<>(); - Map> newRevokingSet = new HashMap<>(); - Map> newAssigningSet = new HashMap<>(); + Map> newAssignedPartitions = new HashMap<>(); + Map> newPartitionsPendingRevocation = new HashMap<>(); + Map> newPartitionsPendingAssignment = new HashMap<>(); // Compute the combined set of topics. Set allTopicIds = new HashSet<>(targetAssignment.partitions().keySet()); @@ -207,55 +219,55 @@ private ConsumerGroupMember transitionToInitialState() { .getOrDefault(topicId, Collections.emptySet()); // Assigned_1 = (Assigned_0 + Revoking_0) /\ Target - Set newAssignedPartitions = new HashSet<>(currentAssignedPartitions); - newAssignedPartitions.addAll(currentRevokingPartitions); - newAssignedPartitions.retainAll(target); + Set assignedPartitions = new HashSet<>(currentAssignedPartitions); + assignedPartitions.addAll(currentRevokingPartitions); + assignedPartitions.retainAll(target); // Revoking_1 = (Assigned_0 + Revoking_0) - Assigned_1 - Set newRevokingPartitions = new HashSet<>(currentAssignedPartitions); - newRevokingPartitions.addAll(currentRevokingPartitions); - newRevokingPartitions.removeAll(newAssignedPartitions); + Set partitionsPendingRevocation = new HashSet<>(currentAssignedPartitions); + partitionsPendingRevocation.addAll(currentRevokingPartitions); + partitionsPendingRevocation.removeAll(assignedPartitions); // Assigning_1 = Target - Assigned_1 - Set newAssigningPartitions = new HashSet<>(target); - newAssigningPartitions.removeAll(newAssignedPartitions); + Set partitionsPendingAssignment = new HashSet<>(target); + partitionsPendingAssignment.removeAll(assignedPartitions); - if (!newAssignedPartitions.isEmpty()) { - newAssignedSet.put(topicId, newAssignedPartitions); + if (!assignedPartitions.isEmpty()) { + newAssignedPartitions.put(topicId, assignedPartitions); } - if (!newRevokingPartitions.isEmpty()) { - newRevokingSet.put(topicId, newRevokingPartitions); + if (!partitionsPendingRevocation.isEmpty()) { + newPartitionsPendingRevocation.put(topicId, partitionsPendingRevocation); } - if (!newAssigningPartitions.isEmpty()) { - newAssigningSet.put(topicId, newAssigningPartitions); + if (!partitionsPendingAssignment.isEmpty()) { + newPartitionsPendingAssignment.put(topicId, partitionsPendingAssignment); } } - if (!newRevokingSet.isEmpty()) { + if (!newPartitionsPendingRevocation.isEmpty()) { // If the revoking set is not empty, we transition to Revoking and we // stay in the current epoch. return new ConsumerGroupMember.Builder(member) - .setAssignedPartitions(newAssignedSet) - .setPartitionsPendingRevocation(newRevokingSet) - .setPartitionsPendingAssignment(newAssigningSet) + .setAssignedPartitions(newAssignedPartitions) + .setPartitionsPendingRevocation(newPartitionsPendingRevocation) + .setPartitionsPendingAssignment(newPartitionsPendingAssignment) .setNextMemberEpoch(targetAssignmentEpoch) .build(); } else { - if (!newAssigningSet.isEmpty()) { + if (!newPartitionsPendingAssignment.isEmpty()) { // If the assigning set is not empty, we check if some or all // partitions are free to use. If they are, we move them to // the assigned set. - maybeAssignPendingPartitions(newAssignedSet, newAssigningSet); + maybeAssignPendingPartitions(newAssignedPartitions, newPartitionsPendingAssignment); } // We transition to the target epoch. If the assigning set is empty, // the member transition to stable, otherwise to assigning. return new ConsumerGroupMember.Builder(member) - .setAssignedPartitions(newAssignedSet) + .setAssignedPartitions(newAssignedPartitions) .setPartitionsPendingRevocation(Collections.emptyMap()) - .setPartitionsPendingAssignment(newAssigningSet) + .setPartitionsPendingAssignment(newPartitionsPendingAssignment) .setPreviousMemberEpoch(member.memberEpoch()) .setMemberEpoch(targetAssignmentEpoch) .setNextMemberEpoch(targetAssignmentEpoch) @@ -266,29 +278,29 @@ private ConsumerGroupMember transitionToInitialState() { /** * Tries to transition from Revoke to Assigning or Stable. This is only * possible when the member acknowledges that it only owns the partition - * in the Assigned set. + * in the assigned partitions. * * @return A new ConsumerGroupMember with the new state or the current one * if the member stays in the current state. */ private ConsumerGroupMember maybeTransitionFromRevokingToAssigningOrStable() { - if (member.partitionsPendingRevocation().isEmpty() || hasRevokedAllPartitions(ownedTopicPartitions)) { - Map> newAssignedSet = deepCopy(member.assignedPartitions()); - Map> newAssigningSet = deepCopy(member.partitionsPendingAssignment()); - - if (!newAssigningSet.isEmpty()) { - // If the assigning set is not empty, we check if some or all - // partitions are free to use. If they are, we move them to - // the assigned set. - maybeAssignPendingPartitions(newAssignedSet, newAssigningSet); + if (member.partitionsPendingRevocation().isEmpty() || matchesAssignedPartitions(ownedTopicPartitions)) { + Map> newAssignedPartitions = deepCopy(member.assignedPartitions()); + Map> newPartitionsPendingAssignment = deepCopy(member.partitionsPendingAssignment()); + + if (!newPartitionsPendingAssignment.isEmpty()) { + // If the partitions pending assignment set is not empty, we check + // if some or all partitions are free to use. If they are, we move + // them to the assigned set. + maybeAssignPendingPartitions(newAssignedPartitions, newPartitionsPendingAssignment); } // We transition to the target epoch. If the assigning set is empty, // the member transition to stable, otherwise to assigning. return new ConsumerGroupMember.Builder(member) - .setAssignedPartitions(newAssignedSet) + .setAssignedPartitions(newAssignedPartitions) .setPartitionsPendingRevocation(Collections.emptyMap()) - .setPartitionsPendingAssignment(newAssigningSet) + .setPartitionsPendingAssignment(newPartitionsPendingAssignment) .setPreviousMemberEpoch(member.memberEpoch()) .setMemberEpoch(targetAssignmentEpoch) .setNextMemberEpoch(targetAssignmentEpoch) @@ -300,23 +312,23 @@ private ConsumerGroupMember maybeTransitionFromRevokingToAssigningOrStable() { /** * Tries to transition from Assigning to Assigning or Stable. This is only - * possible when one or more partitions in the Assigning set have been freed - * up by other members in the group. + * possible when one or more partitions in the partitions pending assignment + * set have been freed up by other members in the group. * * @return A new ConsumerGroupMember with the new state or the current one * if the member stays in the current state. */ private ConsumerGroupMember maybeTransitionFromAssigningToAssigningOrStable() { - Map> newAssignedSet = deepCopy(member.assignedPartitions()); - Map> newAssigningSet = deepCopy(member.partitionsPendingAssignment()); + Map> newAssignedPartitions = deepCopy(member.assignedPartitions()); + Map> newPartitionsPendingAssignment = deepCopy(member.partitionsPendingAssignment()); // If any partition can transition from assigning to assigned, we update // the member. Otherwise, we return the current one. - if (maybeAssignPendingPartitions(newAssignedSet, newAssigningSet)) { + if (maybeAssignPendingPartitions(newAssignedPartitions, newPartitionsPendingAssignment)) { return new ConsumerGroupMember.Builder(member) - .setAssignedPartitions(newAssignedSet) + .setAssignedPartitions(newAssignedPartitions) .setPartitionsPendingRevocation(Collections.emptyMap()) - .setPartitionsPendingAssignment(newAssigningSet) + .setPartitionsPendingAssignment(newPartitionsPendingAssignment) .setPreviousMemberEpoch(member.memberEpoch()) .setMemberEpoch(targetAssignmentEpoch) .setNextMemberEpoch(targetAssignmentEpoch) @@ -326,49 +338,42 @@ private ConsumerGroupMember maybeTransitionFromAssigningToAssigningOrStable() { } } - /** - * Makes a deep copy of an assignment map. - * - * @param map The Map to copy. - * @return The copy. - */ - private Map> deepCopy(Map> map) { - Map> copy = new HashMap<>(); - map.forEach((topicId, partitions) -> { - copy.put(topicId, new HashSet<>(partitions)); - }); - return copy; - } - /** * Tries to move partitions from the Assigning set to the Assigned set * if they are no longer owned. * - * @param newAssignedSet The Assigned set. - * @param newAssigningSet The Assigning set. + * @param newAssignedPartitions The assigned partitions. + * @param newPartitionsPendingAssignment The partitions pending assignment. * @return A boolean indicating if any partitions were moved. */ private boolean maybeAssignPendingPartitions( - Map> newAssignedSet, - Map> newAssigningSet + Map> newAssignedPartitions, + Map> newPartitionsPendingAssignment ) { boolean changed = false; - Iterator>> assigningSetIterator = newAssigningSet.entrySet().iterator(); + Iterator>> assigningSetIterator = + newPartitionsPendingAssignment.entrySet().iterator(); + while (assigningSetIterator.hasNext()) { Map.Entry> pair = assigningSetIterator.next(); Uuid topicId = pair.getKey(); Set assigning = pair.getValue(); + Iterator assigningIterator = assigning.iterator(); while (assigningIterator.hasNext()) { Integer partitionId = assigningIterator.next(); + + // A partition can be assigned to this member iff it has been + // released by its previous owner. This is signaled by -1. Integer partitionEpoch = currentPartitionEpoch.apply(topicId, partitionId); if (partitionEpoch == -1) { assigningIterator.remove(); - put(newAssignedSet, topicId, partitionId); + put(newAssignedPartitions, topicId, partitionId); changed = true; } } + if (assigning.isEmpty()) { assigningSetIterator.remove(); } @@ -379,13 +384,13 @@ private boolean maybeAssignPendingPartitions( /** * Checks whether the owned topic partitions passed by the member to the state - * machine via the ConsumerGroupHeartbeat request corresponds to the Assigned - * set. + * machine via the ConsumerGroupHeartbeat request corresponds to the assigned + * partitions. * * @param ownedTopicPartitions The topic partitions owned by the remove client. * @return A boolean indicating if the owned partitions matches the Assigned set. */ - private boolean hasRevokedAllPartitions( + private boolean matchesAssignedPartitions( List ownedTopicPartitions ) { if (ownedTopicPartitions == null) return false; @@ -402,10 +407,26 @@ private boolean hasRevokedAllPartitions( return true; } + /** + * Makes a deep copy of an assignment map. + * + * @param map The Map to copy. + * @return The copy. + */ + private Map> deepCopy(Map> map) { + Map> copy = new HashMap<>(); + map.forEach((topicId, partitions) -> copy.put(topicId, new HashSet<>(partitions))); + return copy; + } + /** * Puts the given TopicId and Partitions to the given map. */ - private void put(Map> map, Uuid topicId, Integer partitionId) { + private void put( + Map> map, + Uuid topicId, + Integer partitionId + ) { map.compute(topicId, (__, partitionsOrNull) -> { if (partitionsOrNull == null) partitionsOrNull = new HashSet<>(); partitionsOrNull.add(partitionId); From 1f8106db32c0ba35dcbfc5b345518ac3e1bd72e3 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Thu, 4 May 2023 10:08:34 +0200 Subject: [PATCH 4/8] address comments --- .../group/consumer/ConsumerGroupMember.java | 4 + .../consumer/CurrentAssignmentBuilder.java | 42 +++++---- .../CurrentAssignmentBuilderTest.java | 86 +++++++++++-------- 3 files changed, 75 insertions(+), 57 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java index b2e4fcb782e63..c40bb7c937c5f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java @@ -264,6 +264,10 @@ public ConsumerGroupMember build() { } } + /** + * The various states that a member can be in. For their definition, + * refer to the documentation of {{@link CurrentAssignmentBuilder}}. + */ public enum MemberState { REVOKING("revoking"), ASSIGNING("assigning"), diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java index d034d67f77955..0ffac85a7915b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java @@ -44,7 +44,7 @@ * or if it does not have to revoke any. * * - Previous Epoch: - * The previous epoch of the member when the state was updated. + * The epoch of the member when the state was last updated. * * - Assigned Partitions: * The set of partitions currently assigned to the member. This represents what the member should have. @@ -173,7 +173,7 @@ public ConsumerGroupMember build() { // A new target assignment has been installed, we need to restart // the reconciliation loop from the beginning. if (targetAssignmentEpoch != member.nextMemberEpoch()) { - return transitionToInitialState(); + return reinitializeState(); } switch (member.state()) { @@ -194,12 +194,12 @@ public ConsumerGroupMember build() { } /** - * Transitions to the initial state. Here we compute the Assigned, - * Revoking and Assigning sets. + * Reinitialize the state machine. Here we compute the assigned partitions, + * the partitions pending revocation and the partitions pending assignment. * * @return A new ConsumerGroupMember. */ - private ConsumerGroupMember transitionToInitialState() { + private ConsumerGroupMember reinitializeState() { Map> newAssignedPartitions = new HashMap<>(); Map> newPartitionsPendingRevocation = new HashMap<>(); Map> newPartitionsPendingAssignment = new HashMap<>(); @@ -218,17 +218,21 @@ private ConsumerGroupMember transitionToInitialState() { Set currentRevokingPartitions = member.partitionsPendingRevocation() .getOrDefault(topicId, Collections.emptySet()); - // Assigned_1 = (Assigned_0 + Revoking_0) /\ Target + // Assigned_1 = (Assigned_0 + Pending_Revocation_0) ∩ Target + // Assigned_0 + Pending_Revocation_0 is used here because the partitions + // being revoked are still owned until the revocation is acknowledged. Set assignedPartitions = new HashSet<>(currentAssignedPartitions); assignedPartitions.addAll(currentRevokingPartitions); assignedPartitions.retainAll(target); - // Revoking_1 = (Assigned_0 + Revoking_0) - Assigned_1 + // Pending_Revocation_1 = (Assigned_0 + Pending_Revocation_0) - Assigned_1 + // Assigned_0 + Pending_Revocation_0 is used here because the partitions + // being revoked are still owned until the revocation is acknowledged. Set partitionsPendingRevocation = new HashSet<>(currentAssignedPartitions); partitionsPendingRevocation.addAll(currentRevokingPartitions); partitionsPendingRevocation.removeAll(assignedPartitions); - // Assigning_1 = Target - Assigned_1 + // Pending_Assignment_1 = Target - Assigned_1 Set partitionsPendingAssignment = new HashSet<>(target); partitionsPendingAssignment.removeAll(assignedPartitions); @@ -246,8 +250,8 @@ private ConsumerGroupMember transitionToInitialState() { } if (!newPartitionsPendingRevocation.isEmpty()) { - // If the revoking set is not empty, we transition to Revoking and we - // stay in the current epoch. + // If the partition pending revocation set is not empty, we transition to + // Revoking and we stay in the current epoch. return new ConsumerGroupMember.Builder(member) .setAssignedPartitions(newAssignedPartitions) .setPartitionsPendingRevocation(newPartitionsPendingRevocation) @@ -256,14 +260,14 @@ private ConsumerGroupMember transitionToInitialState() { .build(); } else { if (!newPartitionsPendingAssignment.isEmpty()) { - // If the assigning set is not empty, we check if some or all - // partitions are free to use. If they are, we move them to - // the assigned set. + // If the partitions pending assignment set is not empty, we check + // if some or all partitions are free to use. If they are, we move + // them to the partitions assigned set. maybeAssignPendingPartitions(newAssignedPartitions, newPartitionsPendingAssignment); } - // We transition to the target epoch. If the assigning set is empty, - // the member transition to stable, otherwise to assigning. + // We transition to the target epoch. If the partitions pending assignment + // set is empty, the member transition to stable, otherwise to assigning. return new ConsumerGroupMember.Builder(member) .setAssignedPartitions(newAssignedPartitions) .setPartitionsPendingRevocation(Collections.emptyMap()) @@ -295,8 +299,8 @@ private ConsumerGroupMember maybeTransitionFromRevokingToAssigningOrStable() { maybeAssignPendingPartitions(newAssignedPartitions, newPartitionsPendingAssignment); } - // We transition to the target epoch. If the assigning set is empty, - // the member transition to stable, otherwise to assigning. + // We transition to the target epoch. If the partitions pending assignment + // set is empty, the member transition to stable, otherwise to assigning. return new ConsumerGroupMember.Builder(member) .setAssignedPartitions(newAssignedPartitions) .setPartitionsPendingRevocation(Collections.emptyMap()) @@ -339,8 +343,8 @@ private ConsumerGroupMember maybeTransitionFromAssigningToAssigningOrStable() { } /** - * Tries to move partitions from the Assigning set to the Assigned set - * if they are no longer owned. + * Tries to move partitions from the partitions pending assignment set to + * the partitions assigned set if they are no longer owned. * * @param newAssignedPartitions The assigned partitions. * @param newPartitionsPendingAssignment The partitions pending assignment. diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java index c7bb21e0a9c82..f7d96ff8a4717 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java @@ -19,12 +19,16 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Stream; import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; @@ -155,8 +159,20 @@ public void testTransitionFromNewTargetToStable() { assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingAssignment()); } - @Test - public void testTransitionFromRevokeToRevokeWithNull() { + private static Stream ownedTopicPartitionsArguments() { + return Stream.of( + // Field not set in the heartbeat request. + null, + // Owned partitions does not match the assigned partitions. + Collections.emptyList() + ).map(Arguments::of); + } + + @ParameterizedTest + @MethodSource("ownedTopicPartitionsArguments") + public void testTransitionFromRevokeToRevoke( + List ownedTopicPartitions + ) { Uuid topicId1 = Uuid.randomUuid(); Uuid topicId2 = Uuid.randomUuid(); @@ -185,7 +201,7 @@ public void testTransitionFromRevokeToRevokeWithNull() { ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) .withTargetAssignment(11, targetAssignment) .withCurrentPartitionEpoch((topicId, partitionId) -> -1) - .withOwnedTopicPartitions(null) // The client has not revoked yet. + .withOwnedTopicPartitions(ownedTopicPartitions) .build(); assertEquals(ConsumerGroupMember.MemberState.REVOKING, updatedMember.state()); @@ -207,7 +223,7 @@ public void testTransitionFromRevokeToRevokeWithNull() { } @Test - public void testTransitionFromRevokeToRevokeWithEmptyList() { + public void testTransitionFromRevokeToAssigning() { Uuid topicId1 = Uuid.randomUuid(); Uuid topicId2 = Uuid.randomUuid(); @@ -235,22 +251,21 @@ public void testTransitionFromRevokeToRevokeWithEmptyList() { ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) .withTargetAssignment(11, targetAssignment) - .withCurrentPartitionEpoch((topicId, partitionId) -> -1) - .withOwnedTopicPartitions(Collections.emptyList()) // The client has not revoked yet. + .withCurrentPartitionEpoch((topicId, partitionId) -> 10) + .withOwnedTopicPartitions(requestFromAssignment(mkAssignment( + mkTopicAssignment(topicId1, 3), + mkTopicAssignment(topicId2, 6)))) .build(); - assertEquals(ConsumerGroupMember.MemberState.REVOKING, updatedMember.state()); + assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, updatedMember.state()); assertEquals(10, updatedMember.previousMemberEpoch()); - assertEquals(10, updatedMember.memberEpoch()); + assertEquals(11, updatedMember.memberEpoch()); assertEquals(11, updatedMember.nextMemberEpoch()); assertEquals(mkAssignment( mkTopicAssignment(topicId1, 3), mkTopicAssignment(topicId2, 6) ), updatedMember.assignedPartitions()); - assertEquals(mkAssignment( - mkTopicAssignment(topicId1, 1, 2), - mkTopicAssignment(topicId2, 4, 5) - ), updatedMember.partitionsPendingRevocation()); + assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingRevocation()); assertEquals(mkAssignment( mkTopicAssignment(topicId1, 4, 5), mkTopicAssignment(topicId2, 7, 8) @@ -258,7 +273,7 @@ public void testTransitionFromRevokeToRevokeWithEmptyList() { } @Test - public void testTransitionFromRevokeToAssigning() { + public void testTransitionFromRevokeToStable() { Uuid topicId1 = Uuid.randomUuid(); Uuid topicId2 = Uuid.randomUuid(); @@ -286,29 +301,26 @@ public void testTransitionFromRevokeToAssigning() { ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) .withTargetAssignment(11, targetAssignment) - .withCurrentPartitionEpoch((topicId, partitionId) -> 10) + .withCurrentPartitionEpoch((topicId, partitionId) -> -1) .withOwnedTopicPartitions(requestFromAssignment(mkAssignment( mkTopicAssignment(topicId1, 3), mkTopicAssignment(topicId2, 6)))) .build(); - assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, updatedMember.state()); + assertEquals(ConsumerGroupMember.MemberState.STABLE, updatedMember.state()); assertEquals(10, updatedMember.previousMemberEpoch()); assertEquals(11, updatedMember.memberEpoch()); assertEquals(11, updatedMember.nextMemberEpoch()); assertEquals(mkAssignment( - mkTopicAssignment(topicId1, 3), - mkTopicAssignment(topicId2, 6) + mkTopicAssignment(topicId1, 3, 4, 5), + mkTopicAssignment(topicId2, 6, 7, 8) ), updatedMember.assignedPartitions()); assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingRevocation()); - assertEquals(mkAssignment( - mkTopicAssignment(topicId1, 4, 5), - mkTopicAssignment(topicId2, 7, 8) - ), updatedMember.partitionsPendingAssignment()); + assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingAssignment()); } @Test - public void testTransitionFromRevokeToStable() { + public void testTransitionFromRevokeToStableWhenPartitionsPendingRevocationAreReassignedBeforeBeingRevoked() { Uuid topicId1 = Uuid.randomUuid(); Uuid topicId2 = Uuid.randomUuid(); @@ -329,26 +341,27 @@ public void testTransitionFromRevokeToStable() { assertEquals(ConsumerGroupMember.MemberState.REVOKING, member.state()); + // A new target assignment is computed (epoch 12) before the partitions + // pending revocation are revoked by the member and those partitions + // have been reassigned to the member. In this case, the member + // can keep them a jump to epoch 12. Assignment targetAssignment = new Assignment(mkAssignment( - mkTopicAssignment(topicId1, 3, 4, 5), - mkTopicAssignment(topicId2, 6, 7, 8) + mkTopicAssignment(topicId1, 1, 2, 3), + mkTopicAssignment(topicId2, 4, 5, 6) )); ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) - .withTargetAssignment(11, targetAssignment) + .withTargetAssignment(12, targetAssignment) .withCurrentPartitionEpoch((topicId, partitionId) -> -1) - .withOwnedTopicPartitions(requestFromAssignment(mkAssignment( - mkTopicAssignment(topicId1, 3), - mkTopicAssignment(topicId2, 6)))) .build(); assertEquals(ConsumerGroupMember.MemberState.STABLE, updatedMember.state()); assertEquals(10, updatedMember.previousMemberEpoch()); - assertEquals(11, updatedMember.memberEpoch()); - assertEquals(11, updatedMember.nextMemberEpoch()); + assertEquals(12, updatedMember.memberEpoch()); + assertEquals(12, updatedMember.nextMemberEpoch()); assertEquals(mkAssignment( - mkTopicAssignment(topicId1, 3, 4, 5), - mkTopicAssignment(topicId2, 6, 7, 8) + mkTopicAssignment(topicId1, 1, 2, 3), + mkTopicAssignment(topicId2, 4, 5, 6) ), updatedMember.assignedPartitions()); assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingRevocation()); assertEquals(Collections.emptyMap(), updatedMember.partitionsPendingAssignment()); @@ -453,14 +466,11 @@ public void testTransitionFromStableToStable() { .setPreviousMemberEpoch(11) .setNextMemberEpoch(11) .setAssignedPartitions(mkAssignment( - mkTopicAssignment(topicId1, 3), - mkTopicAssignment(topicId2, 6))) - .setPartitionsPendingAssignment(mkAssignment( - mkTopicAssignment(topicId1, 4, 5), - mkTopicAssignment(topicId2, 7, 8))) + mkTopicAssignment(topicId1, 3, 4, 5), + mkTopicAssignment(topicId2, 6, 7, 8))) .build(); - assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, member.state()); + assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); Assignment targetAssignment = new Assignment(mkAssignment( mkTopicAssignment(topicId1, 3, 4, 5), From 83f9ccd4fef9a95db152795d776347e58af3707f Mon Sep 17 00:00:00 2001 From: David Jacot Date: Thu, 4 May 2023 17:35:41 +0200 Subject: [PATCH 5/8] update comments --- .../consumer/CurrentAssignmentBuilder.java | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java index 0ffac85a7915b..edf095215b410 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java @@ -173,7 +173,7 @@ public ConsumerGroupMember build() { // A new target assignment has been installed, we need to restart // the reconciliation loop from the beginning. if (targetAssignmentEpoch != member.nextMemberEpoch()) { - return reinitializeState(); + return transitionToNewTargetAssignmentState(); } switch (member.state()) { @@ -194,12 +194,13 @@ public ConsumerGroupMember build() { } /** - * Reinitialize the state machine. Here we compute the assigned partitions, - * the partitions pending revocation and the partitions pending assignment. + * Transitions to NewTargetAssignment state. This is a transient state where + * we compute the assigned partitions, the partitions pending revocation, + * the partitions pending assignment, and transition to the next state. * * @return A new ConsumerGroupMember. */ - private ConsumerGroupMember reinitializeState() { + private ConsumerGroupMember transitionToNewTargetAssignmentState() { Map> newAssignedPartitions = new HashMap<>(); Map> newPartitionsPendingRevocation = new HashMap<>(); Map> newPartitionsPendingAssignment = new HashMap<>(); @@ -250,8 +251,9 @@ private ConsumerGroupMember reinitializeState() { } if (!newPartitionsPendingRevocation.isEmpty()) { - // If the partition pending revocation set is not empty, we transition to - // Revoking and we stay in the current epoch. + // If the partition pending revocation set is not empty, we transition the + // member to revoking and keep the current epoch. The transition to the new + // state is done when the member is updated. return new ConsumerGroupMember.Builder(member) .setAssignedPartitions(newAssignedPartitions) .setPartitionsPendingRevocation(newPartitionsPendingRevocation) @@ -268,6 +270,7 @@ private ConsumerGroupMember reinitializeState() { // We transition to the target epoch. If the partitions pending assignment // set is empty, the member transition to stable, otherwise to assigning. + // The transition to the new state is done when the member is updated. return new ConsumerGroupMember.Builder(member) .setAssignedPartitions(newAssignedPartitions) .setPartitionsPendingRevocation(Collections.emptyMap()) @@ -301,6 +304,7 @@ private ConsumerGroupMember maybeTransitionFromRevokingToAssigningOrStable() { // We transition to the target epoch. If the partitions pending assignment // set is empty, the member transition to stable, otherwise to assigning. + // The transition to the new state is done when the member is updated. return new ConsumerGroupMember.Builder(member) .setAssignedPartitions(newAssignedPartitions) .setPartitionsPendingRevocation(Collections.emptyMap()) @@ -327,7 +331,8 @@ private ConsumerGroupMember maybeTransitionFromAssigningToAssigningOrStable() { Map> newPartitionsPendingAssignment = deepCopy(member.partitionsPendingAssignment()); // If any partition can transition from assigning to assigned, we update - // the member. Otherwise, we return the current one. + // the member. Otherwise, we return the current one. The transition to the + // new state is done when the member is updated. if (maybeAssignPendingPartitions(newAssignedPartitions, newPartitionsPendingAssignment)) { return new ConsumerGroupMember.Builder(member) .setAssignedPartitions(newAssignedPartitions) From 0395a1780acd73e735bfee21da733075db85504d Mon Sep 17 00:00:00 2001 From: David Jacot Date: Fri, 5 May 2023 08:52:07 +0200 Subject: [PATCH 6/8] fix test --- .../group/consumer/CurrentAssignmentBuilderTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java index f7d96ff8a4717..b67c68e642534 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java @@ -143,14 +143,14 @@ public void testTransitionFromNewTargetToStable() { )); ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) - .withTargetAssignment(10, targetAssignment) + .withTargetAssignment(11, targetAssignment) .withCurrentPartitionEpoch((topicId, partitionId) -> 10) .build(); assertEquals(ConsumerGroupMember.MemberState.STABLE, updatedMember.state()); assertEquals(10, updatedMember.previousMemberEpoch()); - assertEquals(10, updatedMember.memberEpoch()); - assertEquals(10, updatedMember.nextMemberEpoch()); + assertEquals(11, updatedMember.memberEpoch()); + assertEquals(11, updatedMember.nextMemberEpoch()); assertEquals(mkAssignment( mkTopicAssignment(topicId1, 1, 2, 3), mkTopicAssignment(topicId2, 4, 5, 6) From d66917b7a0a0d66ddd801d867cd8308681b440d3 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Mon, 8 May 2023 18:08:32 +0200 Subject: [PATCH 7/8] update comment --- .../group/consumer/CurrentAssignmentBuilder.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java index edf095215b410..61dacf5675c69 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java @@ -56,14 +56,7 @@ * The set of partitions that the member will eventually receive. The partitions in this set are * still owned by other members in the group. * - * The state machine has four states: - * - NEW_TARGET_ASSIGNMENT: - * This is the initial state of the state machine. The state machine starts here when the next epoch - * does not match the target epoch. It means that a new target assignment has been installed so the - * reconciliation process must restart. In this state, the Assigned, Revoking and Assigning sets are - * computed. If Revoking is not empty, the state machine transitions to REVOKING; if Assigning is not - * empty, it transitions to ASSIGNING; otherwise it transitions to STABLE. - * + * The state machine has three states: * - REVOKING: * This state means that the member must revoke partitions before it can transition to the next epoch * and thus start receiving new partitions. This is to guarantee that offsets of revoked partitions @@ -76,6 +69,13 @@ * * - STABLE: * This state means that the member has received all its assigned partitions. + * + * The reconciliation process is started or re-started whenever a new target assignment is installed; + * the epoch of the next assignment is different from the next epoch of the member. In this transient + * state, the assigned partitions, the partitions pending revocation and the partitions pending assignment + * are updates. If the partitions pending revocation is not empty, the state machine transitions to + * REVOKING; if partitions pending assignment is not empty, it transitions to ASSIGNING; otherwise it + * transitions to STABLE. */ public class CurrentAssignmentBuilder { /** From 3f9961cfb710742dd6fb9fb6fe91e5f52dee5e61 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Mon, 8 May 2023 18:13:25 +0200 Subject: [PATCH 8/8] fix typos --- .../coordinator/group/consumer/CurrentAssignmentBuilder.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java index 61dacf5675c69..6a255ae8e53a9 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java @@ -71,9 +71,9 @@ * This state means that the member has received all its assigned partitions. * * The reconciliation process is started or re-started whenever a new target assignment is installed; - * the epoch of the next assignment is different from the next epoch of the member. In this transient + * the epoch of the new target assignment is different from the next epoch of the member. In this transient * state, the assigned partitions, the partitions pending revocation and the partitions pending assignment - * are updates. If the partitions pending revocation is not empty, the state machine transitions to + * are updated. If the partitions pending revocation is not empty, the state machine transitions to * REVOKING; if partitions pending assignment is not empty, it transitions to ASSIGNING; otherwise it * transitions to STABLE. */