From 252dda27c90a11db4e11d6d1247bcacf5e8958ce Mon Sep 17 00:00:00 2001 From: dongnuolyu Date: Thu, 30 May 2024 16:24:36 -0400 Subject: [PATCH 01/10] validateOffsetCommit --- .../group/OffsetMetadataManager.java | 24 +++------ .../group/classic/ClassicGroup.java | 1 + .../group/consumer/ConsumerGroup.java | 53 ++++++++++++++++++- 3 files changed, 59 insertions(+), 19 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java index 01c861c038655..ed2928eb3c9f3 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java @@ -325,24 +325,12 @@ private Group validateOffsetCommit( } } - try { - group.validateOffsetCommit( - request.memberId(), - request.groupInstanceId(), - request.generationIdOrMemberEpoch(), - false - ); - } catch (StaleMemberEpochException ex) { - // The STALE_MEMBER_EPOCH error is only returned for new consumer group (KIP-848). When - // it is, the member should be using the OffsetCommit API version >= 9. As we don't - // support upgrading from the old to the new protocol yet, we return UNSUPPORTED_VERSION - // error if an older version is used. We will revise this when the upgrade path is implemented. - if (context.header.apiVersion() >= 9) { - throw ex; - } else { - throw Errors.UNSUPPORTED_VERSION.exception(); - } - } + group.validateOffsetCommit( + request.memberId(), + request.groupInstanceId(), + request.generationIdOrMemberEpoch(), + false + ); return group; } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java index c9f5bc75ca98f..fe80a2b043571 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java @@ -857,6 +857,7 @@ public void validateOffsetCommit( throw Errors.UNKNOWN_MEMBER_ID.exception(); } + // TODO: A temp marker. Will remove it when the pr is open. if (!isTransactional && isInState(COMPLETING_REBALANCE)) { // We should not receive a commit request if the group has not completed rebalance; // but since the consumer's member.id and generation is valid, it means it has received diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java index bbc544289b26b..7ff608df2561f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java @@ -21,6 +21,9 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.IllegalGenerationException; +import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.errors.StaleMemberEpochException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; @@ -806,7 +809,29 @@ public void validateOffsetCommit( if (memberEpoch < 0 && members().isEmpty()) return; final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, false); - validateMemberEpoch(memberEpoch, member.memberEpoch()); + if (member.useClassicProtocol()) { + validateMemberInstanceId(member, groupInstanceId); + + try { + validateMemberEpoch(memberEpoch, member.memberEpoch()); + } catch (StaleMemberEpochException ex) { + // StaleMemberEpochException is not supported in the classic protocol. We throw + // IllegalGenerationException instead for compatibility. + throw new IllegalGenerationException(String.format("Invalid offset commit because the " + + "received generation id %d does not match the expected member epoch %d.", + memberEpoch, member.memberEpoch())); + } + + if (member.memberEpoch() < groupEpoch() || + member.state() == MemberState.UNREVOKED_PARTITIONS || + (member.state() == MemberState.UNRELEASED_PARTITIONS && !waitingOnUnreleasedPartition(member))) { + throw new RebalanceInProgressException(String.format("Invalid offset commit because" + + " a new rebalance has been triggered in group %s and member %s should rejoin to catch up.", + groupId(), member.memberId())); + } + } else { + validateMemberEpoch(memberEpoch, member.memberEpoch()); + } } /** @@ -1421,4 +1446,30 @@ public boolean waitingOnUnreleasedPartition(ConsumerGroupMember member) { } return false; } + + /** + * Validates that the instance id exists and is mapped to the member id + * if the group instance id is provided. + * + * @param member The ConsumerGroupMember. + * @param instanceId The instance id. + */ + private void validateMemberInstanceId( + ConsumerGroupMember member, + String instanceId + ) throws UnknownMemberIdException, FencedInstanceIdException { + if (instanceId != null) { + ConsumerGroupMember staticMember = staticMember(instanceId); + if (member == null) { + throw new UnknownMemberIdException( + String.format("Member with instance id %s is not a member of group %s.", instanceId, groupId()) + ); + } + + if (!staticMember.memberId().equals(member.memberId())) { + throw Errors.FENCED_INSTANCE_ID.exception("Static member " + member.memberId() + " with instance id " + + instanceId + " was fenced by member " + staticMember.memberId() + "."); + } + } + } } From 44f1952e86a67810421badb26eaa06b8e58e5b80 Mon Sep 17 00:00:00 2001 From: dongnuolyu Date: Fri, 31 May 2024 14:24:33 -0400 Subject: [PATCH 02/10] comments --- .../group/classic/ClassicGroup.java | 1 - .../group/consumer/ConsumerGroup.java | 55 ++++++------------- 2 files changed, 17 insertions(+), 39 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java index fe80a2b043571..c9f5bc75ca98f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java @@ -857,7 +857,6 @@ public void validateOffsetCommit( throw Errors.UNKNOWN_MEMBER_ID.exception(); } - // TODO: A temp marker. Will remove it when the pr is open. if (!isTransactional && isInState(COMPLETING_REBALANCE)) { // We should not receive a commit request if the group has not completed rebalance; // but since the consumer's member.id and generation is valid, it means it has received diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java index 7ff608df2561f..131ef3ae4f71f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java @@ -795,6 +795,13 @@ public DeadlineAndEpoch metadataRefreshDeadline() { * @param memberEpoch The member epoch. * @param isTransactional Whether the offset commit is transactional or not. It has no * impact when a consumer group is used. + * @throws UnknownMemberIdException If the member is not found. + * @throws StaleMemberEpochException If the member uses the consumer protocol and the provided + * member epoch doesn't match the actual member epoch. + * @throws FencedInstanceIdException If the member uses the classic protocol and the provided + * instance id is fenced by an existing member. + * @throws IllegalGenerationException If the member uses the classic protocol and the provided + * generation id is not equal to the member epoch. */ @Override public void validateOffsetCommit( @@ -802,7 +809,8 @@ public void validateOffsetCommit( String groupInstanceId, int memberEpoch, boolean isTransactional - ) throws UnknownMemberIdException, StaleMemberEpochException { + ) throws UnknownMemberIdException, StaleMemberEpochException, + FencedInstanceIdException, IllegalGenerationException { // When the member epoch is -1, the request comes from either the admin client // or a consumer which does not use the group management facility. In this case, // the request can commit offsets if the group is empty. @@ -810,8 +818,6 @@ public void validateOffsetCommit( final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, false); if (member.useClassicProtocol()) { - validateMemberInstanceId(member, groupInstanceId); - try { validateMemberEpoch(memberEpoch, member.memberEpoch()); } catch (StaleMemberEpochException ex) { @@ -821,14 +827,6 @@ public void validateOffsetCommit( + "received generation id %d does not match the expected member epoch %d.", memberEpoch, member.memberEpoch())); } - - if (member.memberEpoch() < groupEpoch() || - member.state() == MemberState.UNREVOKED_PARTITIONS || - (member.state() == MemberState.UNRELEASED_PARTITIONS && !waitingOnUnreleasedPartition(member))) { - throw new RebalanceInProgressException(String.format("Invalid offset commit because" + - " a new rebalance has been triggered in group %s and member %s should rejoin to catch up.", - groupId(), member.memberId())); - } } else { validateMemberEpoch(memberEpoch, member.memberEpoch()); } @@ -840,13 +838,18 @@ public void validateOffsetCommit( * @param memberId The member id for consumer groups. * @param memberEpoch The member epoch for consumer groups. * @param lastCommittedOffset The last committed offsets in the timeline. + * @throws UnknownMemberIdException If the member is not found. + * @throws StaleMemberEpochException If the member uses the consumer protocol and the provided + * member epoch doesn't match the actual member epoch. + * @throws IllegalGenerationException If the member uses the classic protocol and the provided + * generation id is not equal to the member epoch. */ @Override public void validateOffsetFetch( String memberId, int memberEpoch, long lastCommittedOffset - ) throws UnknownMemberIdException, StaleMemberEpochException { + ) throws UnknownMemberIdException, StaleMemberEpochException, IllegalGenerationException { // When the member id is null and the member epoch is -1, the request either comes // from the admin client or from a client which does not provide them. In this case, // the fetch request is accepted. @@ -858,6 +861,8 @@ public void validateOffsetFetch( memberId, groupId)); } validateMemberEpoch(memberEpoch, member.memberEpoch()); + // If the member uses the old protocol and the member epoch doesn't match, return illegal_generation + // same as the } /** @@ -1446,30 +1451,4 @@ public boolean waitingOnUnreleasedPartition(ConsumerGroupMember member) { } return false; } - - /** - * Validates that the instance id exists and is mapped to the member id - * if the group instance id is provided. - * - * @param member The ConsumerGroupMember. - * @param instanceId The instance id. - */ - private void validateMemberInstanceId( - ConsumerGroupMember member, - String instanceId - ) throws UnknownMemberIdException, FencedInstanceIdException { - if (instanceId != null) { - ConsumerGroupMember staticMember = staticMember(instanceId); - if (member == null) { - throw new UnknownMemberIdException( - String.format("Member with instance id %s is not a member of group %s.", instanceId, groupId()) - ); - } - - if (!staticMember.memberId().equals(member.memberId())) { - throw Errors.FENCED_INSTANCE_ID.exception("Static member " + member.memberId() + " with instance id " - + instanceId + " was fenced by member " + staticMember.memberId() + "."); - } - } - } } From 52c51781735808338fd016858fbbd273dbce1eec Mon Sep 17 00:00:00 2001 From: dongnuolyu Date: Fri, 31 May 2024 14:27:37 -0400 Subject: [PATCH 03/10] validate offset fetch --- .../group/consumer/ConsumerGroup.java | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java index 131ef3ae4f71f..98473a5f04ec3 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java @@ -798,8 +798,6 @@ public DeadlineAndEpoch metadataRefreshDeadline() { * @throws UnknownMemberIdException If the member is not found. * @throws StaleMemberEpochException If the member uses the consumer protocol and the provided * member epoch doesn't match the actual member epoch. - * @throws FencedInstanceIdException If the member uses the classic protocol and the provided - * instance id is fenced by an existing member. * @throws IllegalGenerationException If the member uses the classic protocol and the provided * generation id is not equal to the member epoch. */ @@ -809,8 +807,7 @@ public void validateOffsetCommit( String groupInstanceId, int memberEpoch, boolean isTransactional - ) throws UnknownMemberIdException, StaleMemberEpochException, - FencedInstanceIdException, IllegalGenerationException { + ) throws UnknownMemberIdException, StaleMemberEpochException, IllegalGenerationException { // When the member epoch is -1, the request comes from either the admin client // or a consumer which does not use the group management facility. In this case, // the request can commit offsets if the group is empty. @@ -860,9 +857,20 @@ public void validateOffsetFetch( throw new UnknownMemberIdException(String.format("Member %s is not a member of group %s.", memberId, groupId)); } - validateMemberEpoch(memberEpoch, member.memberEpoch()); - // If the member uses the old protocol and the member epoch doesn't match, return illegal_generation - // same as the + + if (member.useClassicProtocol()) { + try { + validateMemberEpoch(memberEpoch, member.memberEpoch()); + } catch (StaleMemberEpochException ex) { + // StaleMemberEpochException is not supported in the classic protocol. We throw + // IllegalGenerationException instead for compatibility. + throw new IllegalGenerationException(String.format("Invalid offset commit because the " + + "received generation id %d does not match the expected member epoch %d.", + memberEpoch, member.memberEpoch())); + } + } else { + validateMemberEpoch(memberEpoch, member.memberEpoch()); + } } /** From e2dbdb68ff696010de97763a3ba236d7f7655724 Mon Sep 17 00:00:00 2001 From: dongnuolyu Date: Fri, 31 May 2024 14:54:05 -0400 Subject: [PATCH 04/10] unit tests --- .../group/consumer/ConsumerGroup.java | 2 - .../group/OffsetMetadataManagerTest.java | 76 ++++++++++++------- 2 files changed, 49 insertions(+), 29 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java index 98473a5f04ec3..bdb8572e6a927 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java @@ -21,9 +21,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.ApiException; -import org.apache.kafka.common.errors.FencedInstanceIdException; import org.apache.kafka.common.errors.IllegalGenerationException; -import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.errors.StaleMemberEpochException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java index e2684a7cab0ae..89d3483dcdb36 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.errors.StaleMemberEpochException; import org.apache.kafka.common.errors.UnknownMemberIdException; -import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.OffsetCommitResponseData; @@ -51,6 +50,7 @@ import org.apache.kafka.coordinator.group.assignor.RangeAssignor; import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; import org.apache.kafka.coordinator.group.classic.ClassicGroup; @@ -1142,14 +1142,8 @@ public void testConsumerGroupOffsetCommitWithStaleMemberEpoch() { assertThrows(StaleMemberEpochException.class, () -> context.commitOffset(request)); } - @ParameterizedTest - @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) - public void testConsumerGroupOffsetCommitWithVersionSmallerThanVersion9(short version) { - // All the newer versions are fine. - if (version >= 9) return; - // Version 0 does not support MemberId and GenerationIdOrMemberEpoch fields. - if (version == 0) return; - + @Test + public void testConsumerGroupOffsetCommitWithIllegalGenerationId() { OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); // Create an empty group. @@ -1162,27 +1156,30 @@ public void testConsumerGroupOffsetCommitWithVersionSmallerThanVersion9(short ve group.updateMember(new ConsumerGroupMember.Builder("member") .setMemberEpoch(10) .setPreviousMemberEpoch(10) + .setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()) .build() ); - // Verify that the request is rejected with the correct exception. - assertThrows(UnsupportedVersionException.class, () -> context.commitOffset( - version, - new OffsetCommitRequestData() - .setGroupId("foo") - .setMemberId("member") - .setGenerationIdOrMemberEpoch(9) - .setTopics(Collections.singletonList( - new OffsetCommitRequestData.OffsetCommitRequestTopic() - .setName("bar") - .setPartitions(Collections.singletonList( - new OffsetCommitRequestData.OffsetCommitRequestPartition() - .setPartitionIndex(0) - .setCommittedOffset(100L) - )) - )) - ) - ); + OffsetCommitRequestData request = new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGenerationIdOrMemberEpoch(9) + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )); + + // Verify that a smaller epoch is rejected. + assertThrows(IllegalGenerationException.class, () -> context.commitOffset(request)); + + // Verify that a larger epoch is rejected. + request.setGenerationIdOrMemberEpoch(11); + assertThrows(IllegalGenerationException.class, () -> context.commitOffset(request)); } @Test @@ -2294,6 +2291,31 @@ public void testConsumerGroupOffsetFetchWithStaleMemberEpoch() { () -> context.fetchAllOffsets("group", "member", 10, Long.MAX_VALUE)); } + @Test + public void testConsumerGroupOffsetFetchWithIllegalGenerationId() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true); + group.updateMember(new ConsumerGroupMember.Builder("member") + .setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()) + .build() + ); + + // Fetch offsets case. + List topics = Collections.singletonList( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setPartitionIndexes(Collections.singletonList(0)) + ); + + // Fetch offsets case. + assertThrows(IllegalGenerationException.class, + () -> context.fetchOffsets("group", "member", 10, topics, Long.MAX_VALUE)); + + // Fetch all offsets case. + assertThrows(IllegalGenerationException.class, + () -> context.fetchAllOffsets("group", "member", 10, Long.MAX_VALUE)); + } + @Test public void testGenericGroupOffsetDelete() { OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); From f54e3b0d7cb6eb23ed04c4f9b133a1367b0f0510 Mon Sep 17 00:00:00 2001 From: dongnuolyu Date: Fri, 31 May 2024 16:21:02 -0400 Subject: [PATCH 05/10] validateMemberEpoch --- .../group/consumer/ConsumerGroup.java | 52 +++++++------------ 1 file changed, 19 insertions(+), 33 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java index bdb8572e6a927..63ec5372ff8a2 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java @@ -812,19 +812,7 @@ public void validateOffsetCommit( if (memberEpoch < 0 && members().isEmpty()) return; final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, false); - if (member.useClassicProtocol()) { - try { - validateMemberEpoch(memberEpoch, member.memberEpoch()); - } catch (StaleMemberEpochException ex) { - // StaleMemberEpochException is not supported in the classic protocol. We throw - // IllegalGenerationException instead for compatibility. - throw new IllegalGenerationException(String.format("Invalid offset commit because the " - + "received generation id %d does not match the expected member epoch %d.", - memberEpoch, member.memberEpoch())); - } - } else { - validateMemberEpoch(memberEpoch, member.memberEpoch()); - } + validateMemberEpoch(memberEpoch, member.memberEpoch(), member.useClassicProtocol()); } /** @@ -855,20 +843,7 @@ public void validateOffsetFetch( throw new UnknownMemberIdException(String.format("Member %s is not a member of group %s.", memberId, groupId)); } - - if (member.useClassicProtocol()) { - try { - validateMemberEpoch(memberEpoch, member.memberEpoch()); - } catch (StaleMemberEpochException ex) { - // StaleMemberEpochException is not supported in the classic protocol. We throw - // IllegalGenerationException instead for compatibility. - throw new IllegalGenerationException(String.format("Invalid offset commit because the " - + "received generation id %d does not match the expected member epoch %d.", - memberEpoch, member.memberEpoch())); - } - } else { - validateMemberEpoch(memberEpoch, member.memberEpoch()); - } + validateMemberEpoch(memberEpoch, member.memberEpoch(), member.useClassicProtocol()); } /** @@ -932,16 +907,27 @@ public boolean isInStates(Set statesFilter, long committedOffset) { } /** - * Throws a StaleMemberEpochException if the received member epoch does not match - * the expected member epoch. + * Throws an exception if the received member epoch does not match the expected member epoch. + * + * @param receivedMemberEpoch The received member epoch or generation id. + * @param expectedMemberEpoch The expected member epoch. + * @param useClassicProtocol The boolean indicating whether the checked member uses the classic protocol. + * @throws StaleMemberEpochException if the member with unmatched member epoch uses the consumer protocol. + * @throws IllegalGenerationException if the member with unmatched generation id uses the classic protocol. */ private void validateMemberEpoch( int receivedMemberEpoch, - int expectedMemberEpoch - ) throws StaleMemberEpochException { + int expectedMemberEpoch, + boolean useClassicProtocol + ) throws StaleMemberEpochException, IllegalGenerationException { if (receivedMemberEpoch != expectedMemberEpoch) { - throw new StaleMemberEpochException(String.format("The received member epoch %d does not match " - + "the expected member epoch %d.", receivedMemberEpoch, expectedMemberEpoch)); + if (useClassicProtocol) { + throw new IllegalGenerationException(String.format("The received generation id %d does not match " + + "the expected member epoch %d.", receivedMemberEpoch, expectedMemberEpoch)); + } else { + throw new StaleMemberEpochException(String.format("The received member epoch %d does not match " + + "the expected member epoch %d.", receivedMemberEpoch, expectedMemberEpoch)); + } } } From 0c3bd53bf1192157a02dd3f3f95e60a18263faee Mon Sep 17 00:00:00 2001 From: dongnuolyu Date: Mon, 3 Jun 2024 16:52:25 -0400 Subject: [PATCH 06/10] fix OffsetCommitRequestTest --- .../group/OffsetMetadataManager.java | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java index ed2928eb3c9f3..b6e23199ab8f8 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java @@ -325,12 +325,22 @@ private Group validateOffsetCommit( } } - group.validateOffsetCommit( - request.memberId(), - request.groupInstanceId(), - request.generationIdOrMemberEpoch(), - false - ); + try { + group.validateOffsetCommit( + request.memberId(), + request.groupInstanceId(), + request.generationIdOrMemberEpoch(), + false + ); + } catch (StaleMemberEpochException ex) { + // The STALE_MEMBER_EPOCH error is only returned for new consumer group (KIP-848). When + // it is, the member should be using the OffsetCommit API version >= 9. + if (context.header.apiVersion() >= 9) { + throw ex; + } else { + throw Errors.UNSUPPORTED_VERSION.exception(); + } + } return group; } From 90ede9c1a37697b1af34fa24094e5d06ccc80861 Mon Sep 17 00:00:00 2001 From: dongnuolyu Date: Tue, 4 Jun 2024 09:29:23 -0400 Subject: [PATCH 07/10] Revert "fix OffsetCommitRequestTest" This reverts commit 0c3bd53bf1192157a02dd3f3f95e60a18263faee. --- .../group/OffsetMetadataManager.java | 22 +++++-------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java index b6e23199ab8f8..ed2928eb3c9f3 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java @@ -325,22 +325,12 @@ private Group validateOffsetCommit( } } - try { - group.validateOffsetCommit( - request.memberId(), - request.groupInstanceId(), - request.generationIdOrMemberEpoch(), - false - ); - } catch (StaleMemberEpochException ex) { - // The STALE_MEMBER_EPOCH error is only returned for new consumer group (KIP-848). When - // it is, the member should be using the OffsetCommit API version >= 9. - if (context.header.apiVersion() >= 9) { - throw ex; - } else { - throw Errors.UNSUPPORTED_VERSION.exception(); - } - } + group.validateOffsetCommit( + request.memberId(), + request.groupInstanceId(), + request.generationIdOrMemberEpoch(), + false + ); return group; } From f893a379ad6b64aa782145c23b48c085e1ca4db6 Mon Sep 17 00:00:00 2001 From: dongnuolyu Date: Tue, 4 Jun 2024 14:00:46 -0400 Subject: [PATCH 08/10] if useNewProtocol && version < 9 throw Errors.UNSUPPORTED_VERSION --- .../server/OffsetCommitRequestTest.scala | 2 +- .../apache/kafka/coordinator/group/Group.java | 5 +- .../group/OffsetMetadataManager.java | 10 +- .../group/classic/ClassicGroup.java | 4 +- .../group/consumer/ConsumerGroup.java | 12 +- .../group/classic/ClassicGroupTest.java | 110 +++++++++--------- .../group/consumer/ConsumerGroupTest.java | 109 ++++++++++++----- 7 files changed, 166 insertions(+), 86 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala index 7791c6c942086..2dab34f7d4f72 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala @@ -88,7 +88,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator topic = "foo", partition = 0, offset = 100L, - expectedError = Errors.NONE, + expectedError = if (useNewProtocol && version < 9) Errors.UNSUPPORTED_VERSION else Errors.NONE, version = version.toShort ) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java index f0d930648e72f..108f098d193f2 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java @@ -100,12 +100,15 @@ public static GroupType parse(String name) { * @param generationIdOrMemberEpoch The generation id for genetic groups or the member epoch * for consumer groups. * @param isTransactional Whether the offset commit is transactional or not. + * @param version The reqyest context api version. */ void validateOffsetCommit( String memberId, String groupInstanceId, int generationIdOrMemberEpoch, - boolean isTransactional + boolean isTransactional, + short version + ) throws KafkaException; /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java index ed2928eb3c9f3..9e2bc6c62a7f0 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java @@ -329,7 +329,8 @@ private Group validateOffsetCommit( request.memberId(), request.groupInstanceId(), request.generationIdOrMemberEpoch(), - false + false, + context.apiVersion() ); return group; @@ -338,9 +339,11 @@ private Group validateOffsetCommit( /** * Validates an TxnOffsetCommit request. * + * @param context The request context. * @param request The actual request. */ private Group validateTransactionalOffsetCommit( + RequestContext context, TxnOffsetCommitRequestData request ) throws ApiException { Group group; @@ -363,7 +366,8 @@ private Group validateTransactionalOffsetCommit( request.memberId(), request.groupInstanceId(), request.generationId(), - true + true, + context.apiVersion() ); } catch (StaleMemberEpochException ex) { throw Errors.ILLEGAL_GENERATION.exception(); @@ -518,7 +522,7 @@ public CoordinatorResult commitT RequestContext context, TxnOffsetCommitRequestData request ) throws ApiException { - validateTransactionalOffsetCommit(request); + validateTransactionalOffsetCommit(context, request); final TxnOffsetCommitResponseData response = new TxnOffsetCommitResponseData(); final List records = new ArrayList<>(); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java index c9f5bc75ca98f..50d62553c777d 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java @@ -824,13 +824,15 @@ public void validateMember( * @param groupInstanceId The group instance id. * @param generationId The generation id. * @param isTransactional Whether the offset commit is transactional or not. + * @param version The request context api version. */ @Override public void validateOffsetCommit( String memberId, String groupInstanceId, int generationId, - boolean isTransactional + boolean isTransactional, + short version ) throws CoordinatorNotAvailableException, UnknownMemberIdException, IllegalGenerationException, FencedInstanceIdException { if (isInState(DEAD)) { throw Errors.COORDINATOR_NOT_AVAILABLE.exception(); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java index 63ec5372ff8a2..25544245539f5 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.IllegalGenerationException; import org.apache.kafka.common.errors.StaleMemberEpochException; import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.protocol.Errors; @@ -793,6 +794,7 @@ public DeadlineAndEpoch metadataRefreshDeadline() { * @param memberEpoch The member epoch. * @param isTransactional Whether the offset commit is transactional or not. It has no * impact when a consumer group is used. + * @param version The request context api version. * @throws UnknownMemberIdException If the member is not found. * @throws StaleMemberEpochException If the member uses the consumer protocol and the provided * member epoch doesn't match the actual member epoch. @@ -804,7 +806,8 @@ public void validateOffsetCommit( String memberId, String groupInstanceId, int memberEpoch, - boolean isTransactional + boolean isTransactional, + short version ) throws UnknownMemberIdException, StaleMemberEpochException, IllegalGenerationException { // When the member epoch is -1, the request comes from either the admin client // or a consumer which does not use the group management facility. In this case, @@ -812,6 +815,13 @@ public void validateOffsetCommit( if (memberEpoch < 0 && members().isEmpty()) return; final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, false); + + // If the commit is not transactional and the member uses the new consumer protocol (KIP-848), + // the member should be using the OffsetCommit API version >= 9. + if (!isTransactional && !member.useClassicProtocol() && version < 9) { + throw new UnsupportedVersionException(String.format("The OffsetCommit API version %d " + + "is smaller than the lowest version supporting new consumer protocol 9.", version)); + } validateMemberEpoch(memberEpoch, member.memberEpoch(), member.useClassicProtocol()); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java index 14327ae9672c1..9847f1d767d53 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection; import org.apache.kafka.common.message.JoinGroupResponseData; import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; @@ -989,71 +990,76 @@ public void testMaybeElectNewJoinedLeaderChooseExisting() { @Test public void testValidateOffsetCommit() { - // A call from the admin client without any parameters should pass. - group.validateOffsetCommit("", "", -1, false); - - // Add a member. - group.add(new ClassicGroupMember( - "member-id", - Optional.of("instance-id"), - "", - "", - 100, - 100, - "consumer", - new JoinGroupRequestProtocolCollection(Collections.singletonList( - new JoinGroupRequestProtocol() - .setName("roundrobin") - .setMetadata(new byte[0])).iterator()) - )); + for (short v = ApiKeys.OFFSET_COMMIT.oldestVersion(); v <= ApiKeys.OFFSET_COMMIT.latestVersion(); v++) { + final short version = v; + initialize(); + + // A call from the admin client without any parameters should pass. + group.validateOffsetCommit("", "", -1, false, version); + + // Add a member. + group.add(new ClassicGroupMember( + "member-id", + Optional.of("instance-id"), + "", + "", + 100, + 100, + "consumer", + new JoinGroupRequestProtocolCollection(Collections.singletonList( + new JoinGroupRequestProtocol() + .setName("roundrobin") + .setMetadata(new byte[0])).iterator()) + )); - group.transitionTo(PREPARING_REBALANCE); - group.initNextGeneration(); + group.transitionTo(PREPARING_REBALANCE); + group.initNextGeneration(); - // No parameters and the group is not empty. - assertThrows(UnknownMemberIdException.class, - () -> group.validateOffsetCommit("", "", -1, false)); + // No parameters and the group is not empty. + assertThrows(UnknownMemberIdException.class, + () -> group.validateOffsetCommit("", "", -1, false, version)); - // A transactional offset commit without any parameters - // and a non-empty group is accepted. - group.validateOffsetCommit("", null, -1, true); + // A transactional offset commit without any parameters + // and a non-empty group is accepted. + group.validateOffsetCommit("", null, -1, true, version); - // The member id does not exist. - assertThrows(UnknownMemberIdException.class, - () -> group.validateOffsetCommit("unknown", "unknown", -1, false)); + // The member id does not exist. + assertThrows(UnknownMemberIdException.class, + () -> group.validateOffsetCommit("unknown", "unknown", -1, false, version)); - // The instance id does not exist. - assertThrows(UnknownMemberIdException.class, - () -> group.validateOffsetCommit("member-id", "unknown", -1, false)); + // The instance id does not exist. + assertThrows(UnknownMemberIdException.class, + () -> group.validateOffsetCommit("member-id", "unknown", -1, false, version)); - // The generation id is invalid. - assertThrows(IllegalGenerationException.class, - () -> group.validateOffsetCommit("member-id", "instance-id", 0, false)); + // The generation id is invalid. + assertThrows(IllegalGenerationException.class, + () -> group.validateOffsetCommit("member-id", "instance-id", 0, false, version)); - // Group is in prepare rebalance state. - assertThrows(RebalanceInProgressException.class, - () -> group.validateOffsetCommit("member-id", "instance-id", 1, false)); + // Group is in prepare rebalance state. + assertThrows(RebalanceInProgressException.class, + () -> group.validateOffsetCommit("member-id", "instance-id", 1, false, version)); - // Group transitions to stable. - group.transitionTo(STABLE); + // Group transitions to stable. + group.transitionTo(STABLE); - // This should work. - group.validateOffsetCommit("member-id", "instance-id", 1, false); + // This should work. + group.validateOffsetCommit("member-id", "instance-id", 1, false, version); - // Replace static member. - group.replaceStaticMember("instance-id", "member-id", "new-member-id"); + // Replace static member. + group.replaceStaticMember("instance-id", "member-id", "new-member-id"); - // The old instance id should be fenced. - assertThrows(FencedInstanceIdException.class, - () -> group.validateOffsetCommit("member-id", "instance-id", 1, false)); + // The old instance id should be fenced. + assertThrows(FencedInstanceIdException.class, + () -> group.validateOffsetCommit("member-id", "instance-id", 1, false, version)); - // Remove member and transitions to dead. - group.remove("new-instance-id"); - group.transitionTo(DEAD); + // Remove member and transitions to dead. + group.remove("new-instance-id"); + group.transitionTo(DEAD); - // This should fail with CoordinatorNotAvailableException. - assertThrows(CoordinatorNotAvailableException.class, - () -> group.validateOffsetCommit("member-id", "new-instance-id", 1, false)); + // This should fail with CoordinatorNotAvailableException. + assertThrows(CoordinatorNotAvailableException.class, + () -> group.validateOffsetCommit("member-id", "new-instance-id", 1, false, version)); + } } @Test diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java index a483fa3be022c..9421d18a7853c 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java @@ -20,9 +20,12 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.IllegalGenerationException; import org.apache.kafka.common.errors.StaleMemberEpochException; import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.coordinator.group.Group; @@ -35,8 +38,6 @@ import org.apache.kafka.image.MetadataImage; import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; import java.util.ArrayList; import java.util.Arrays; @@ -1016,32 +1017,86 @@ public void testMetadataRefreshDeadline() { assertEquals(0, group.metadataRefreshDeadline().epoch); } - @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void testValidateOffsetCommit(boolean isTransactional) { - ConsumerGroup group = createConsumerGroup("group-foo"); - - // Simulate a call from the admin client without member id and member epoch. - // This should pass only if the group is empty. - group.validateOffsetCommit("", "", -1, isTransactional); - - // The member does not exist. - assertThrows(UnknownMemberIdException.class, () -> - group.validateOffsetCommit("member-id", null, 0, isTransactional)); - - // Create a member. - group.updateMember(new ConsumerGroupMember.Builder("member-id").build()); - - // A call from the admin client should fail as the group is not empty. - assertThrows(UnknownMemberIdException.class, () -> - group.validateOffsetCommit("", "", -1, isTransactional)); - - // The member epoch is stale. - assertThrows(StaleMemberEpochException.class, () -> - group.validateOffsetCommit("member-id", "", 10, isTransactional)); + @Test + public void testValidateTransactionalOffsetCommit() { + boolean isTransactional = true; + for (short v = ApiKeys.OFFSET_COMMIT.oldestVersion(); v <= ApiKeys.OFFSET_COMMIT.latestVersion(); v++) { + final short version = v; + ConsumerGroup group = createConsumerGroup("group-foo"); + + // Simulate a call from the admin client without member id and member epoch. + // This should pass only if the group is empty. + group.validateOffsetCommit("", "", -1, isTransactional, version); + + // The member does not exist. + assertThrows(UnknownMemberIdException.class, () -> + group.validateOffsetCommit("member-id", null, 0, isTransactional, version)); + + // Create a member. + group.updateMember(new ConsumerGroupMember.Builder("member-id").build()); + + // A call from the admin client should fail as the group is not empty. + assertThrows(UnknownMemberIdException.class, () -> + group.validateOffsetCommit("", "", -1, isTransactional, version)); + + // The member epoch is stale. + assertThrows(StaleMemberEpochException.class, () -> + group.validateOffsetCommit("member-id", "", 10, isTransactional, version)); + + // This should succeed. + group.validateOffsetCommit("member-id", "", 0, isTransactional, version); + } + } - // This should succeed. - group.validateOffsetCommit("member-id", "", 0, isTransactional); + @Test + public void testNonTransactionalValidateOffsetCommit() { + boolean isTransactional = false; + for (short v = ApiKeys.OFFSET_COMMIT.oldestVersion(); v <= ApiKeys.OFFSET_COMMIT.latestVersion(); v++) { + final short version = v; + ConsumerGroup group = createConsumerGroup("group-foo"); + + // Simulate a call from the admin client without member id and member epoch. + // This should pass only if the group is empty. + group.validateOffsetCommit("", "", -1, isTransactional, version); + + // The member does not exist. + assertThrows(UnknownMemberIdException.class, () -> + group.validateOffsetCommit("member-id", null, 0, isTransactional, version)); + + // Create members. + group.updateMember( + new ConsumerGroupMember + .Builder("new-protocol-member-id").build() + ); + group.updateMember( + new ConsumerGroupMember.Builder("old-protocol-member-id") + .setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()) + .build() + ); + + // A call from the admin client should fail as the group is not empty. + assertThrows(UnknownMemberIdException.class, () -> + group.validateOffsetCommit("", "", -1, isTransactional, version)); + + // The member epoch is stale. + if (version >= 9) { + assertThrows(StaleMemberEpochException.class, () -> + group.validateOffsetCommit("new-protocol-member-id", "", 10, isTransactional, version)); + } else { + assertThrows(UnsupportedVersionException.class, () -> + group.validateOffsetCommit("new-protocol-member-id", "", 10, isTransactional, version)); + } + assertThrows(IllegalGenerationException.class, () -> + group.validateOffsetCommit("old-protocol-member-id", "", 10, isTransactional, version)); + + // This should succeed. + if (version >= 9) { + group.validateOffsetCommit("new-protocol-member-id", "", 0, isTransactional, version); + } else { + assertThrows(UnsupportedVersionException.class, () -> + group.validateOffsetCommit("new-protocol-member-id", "", 0, isTransactional, version)); + } + } } @Test From f212fcf56cf90fa5f8d8462d2bc8626654a8f38f Mon Sep 17 00:00:00 2001 From: dongnuolyu Date: Tue, 4 Jun 2024 14:02:14 -0400 Subject: [PATCH 09/10] nit --- .../kafka/coordinator/group/OffsetMetadataManagerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java index 89d3483dcdb36..6b328d66806d7 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -2300,7 +2300,6 @@ public void testConsumerGroupOffsetFetchWithIllegalGenerationId() { .build() ); - // Fetch offsets case. List topics = Collections.singletonList( new OffsetFetchRequestData.OffsetFetchRequestTopics() .setName("foo") From be093662631e504a150c77964c5472eac1601b85 Mon Sep 17 00:00:00 2001 From: dongnuolyu Date: Tue, 4 Jun 2024 15:58:50 -0400 Subject: [PATCH 10/10] comments --- .../apache/kafka/coordinator/group/Group.java | 4 +- .../group/classic/ClassicGroup.java | 4 +- .../group/consumer/ConsumerGroup.java | 11 +- .../group/classic/ClassicGroupTest.java | 116 ++++++++------- .../group/consumer/ConsumerGroupTest.java | 136 +++++++++--------- 5 files changed, 134 insertions(+), 137 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java index 108f098d193f2..acadf6d59259d 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java @@ -100,14 +100,14 @@ public static GroupType parse(String name) { * @param generationIdOrMemberEpoch The generation id for genetic groups or the member epoch * for consumer groups. * @param isTransactional Whether the offset commit is transactional or not. - * @param version The reqyest context api version. + * @param apiVersion The api version. */ void validateOffsetCommit( String memberId, String groupInstanceId, int generationIdOrMemberEpoch, boolean isTransactional, - short version + short apiVersion ) throws KafkaException; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java index 50d62553c777d..dd0d5c15fd62f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java @@ -824,7 +824,7 @@ public void validateMember( * @param groupInstanceId The group instance id. * @param generationId The generation id. * @param isTransactional Whether the offset commit is transactional or not. - * @param version The request context api version. + * @param apiVersion The api version. */ @Override public void validateOffsetCommit( @@ -832,7 +832,7 @@ public void validateOffsetCommit( String groupInstanceId, int generationId, boolean isTransactional, - short version + short apiVersion ) throws CoordinatorNotAvailableException, UnknownMemberIdException, IllegalGenerationException, FencedInstanceIdException { if (isInState(DEAD)) { throw Errors.COORDINATOR_NOT_AVAILABLE.exception(); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java index 25544245539f5..98f37a7ed6cae 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java @@ -794,7 +794,7 @@ public DeadlineAndEpoch metadataRefreshDeadline() { * @param memberEpoch The member epoch. * @param isTransactional Whether the offset commit is transactional or not. It has no * impact when a consumer group is used. - * @param version The request context api version. + * @param apiVersion The api version. * @throws UnknownMemberIdException If the member is not found. * @throws StaleMemberEpochException If the member uses the consumer protocol and the provided * member epoch doesn't match the actual member epoch. @@ -807,7 +807,7 @@ public void validateOffsetCommit( String groupInstanceId, int memberEpoch, boolean isTransactional, - short version + short apiVersion ) throws UnknownMemberIdException, StaleMemberEpochException, IllegalGenerationException { // When the member epoch is -1, the request comes from either the admin client // or a consumer which does not use the group management facility. In this case, @@ -818,10 +818,11 @@ public void validateOffsetCommit( // If the commit is not transactional and the member uses the new consumer protocol (KIP-848), // the member should be using the OffsetCommit API version >= 9. - if (!isTransactional && !member.useClassicProtocol() && version < 9) { - throw new UnsupportedVersionException(String.format("The OffsetCommit API version %d " + - "is smaller than the lowest version supporting new consumer protocol 9.", version)); + if (!isTransactional && !member.useClassicProtocol() && apiVersion < 9) { + throw new UnsupportedVersionException("OffsetCommit version 9 or above must be used " + + "by members using the consumer group protocol"); } + validateMemberEpoch(memberEpoch, member.memberEpoch(), member.useClassicProtocol()); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java index 9847f1d767d53..76ab8467385b8 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; import org.apache.kafka.coordinator.group.OffsetAndMetadata; import org.apache.kafka.coordinator.group.OffsetExpirationCondition; import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl; @@ -42,6 +43,7 @@ import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; import java.util.ArrayList; import java.util.Arrays; @@ -988,78 +990,74 @@ public void testMaybeElectNewJoinedLeaderChooseExisting() { assertTrue(group.isLeader(memberId)); } - @Test - public void testValidateOffsetCommit() { - for (short v = ApiKeys.OFFSET_COMMIT.oldestVersion(); v <= ApiKeys.OFFSET_COMMIT.latestVersion(); v++) { - final short version = v; - initialize(); - - // A call from the admin client without any parameters should pass. - group.validateOffsetCommit("", "", -1, false, version); - - // Add a member. - group.add(new ClassicGroupMember( - "member-id", - Optional.of("instance-id"), - "", - "", - 100, - 100, - "consumer", - new JoinGroupRequestProtocolCollection(Collections.singletonList( - new JoinGroupRequestProtocol() - .setName("roundrobin") - .setMetadata(new byte[0])).iterator()) - )); + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + public void testValidateOffsetCommit(short version) { + // A call from the admin client without any parameters should pass. + group.validateOffsetCommit("", "", -1, false, version); + + // Add a member. + group.add(new ClassicGroupMember( + "member-id", + Optional.of("instance-id"), + "", + "", + 100, + 100, + "consumer", + new JoinGroupRequestProtocolCollection(Collections.singletonList( + new JoinGroupRequestProtocol() + .setName("roundrobin") + .setMetadata(new byte[0])).iterator()) + )); - group.transitionTo(PREPARING_REBALANCE); - group.initNextGeneration(); + group.transitionTo(PREPARING_REBALANCE); + group.initNextGeneration(); - // No parameters and the group is not empty. - assertThrows(UnknownMemberIdException.class, - () -> group.validateOffsetCommit("", "", -1, false, version)); + // No parameters and the group is not empty. + assertThrows(UnknownMemberIdException.class, + () -> group.validateOffsetCommit("", "", -1, false, version)); - // A transactional offset commit without any parameters - // and a non-empty group is accepted. - group.validateOffsetCommit("", null, -1, true, version); + // A transactional offset commit without any parameters + // and a non-empty group is accepted. + group.validateOffsetCommit("", null, -1, true, version); - // The member id does not exist. - assertThrows(UnknownMemberIdException.class, - () -> group.validateOffsetCommit("unknown", "unknown", -1, false, version)); + // The member id does not exist. + assertThrows(UnknownMemberIdException.class, + () -> group.validateOffsetCommit("unknown", "unknown", -1, false, version)); - // The instance id does not exist. - assertThrows(UnknownMemberIdException.class, - () -> group.validateOffsetCommit("member-id", "unknown", -1, false, version)); + // The instance id does not exist. + assertThrows(UnknownMemberIdException.class, + () -> group.validateOffsetCommit("member-id", "unknown", -1, false, version)); - // The generation id is invalid. - assertThrows(IllegalGenerationException.class, - () -> group.validateOffsetCommit("member-id", "instance-id", 0, false, version)); + // The generation id is invalid. + assertThrows(IllegalGenerationException.class, + () -> group.validateOffsetCommit("member-id", "instance-id", 0, false, version)); - // Group is in prepare rebalance state. - assertThrows(RebalanceInProgressException.class, - () -> group.validateOffsetCommit("member-id", "instance-id", 1, false, version)); + // Group is in prepare rebalance state. + assertThrows(RebalanceInProgressException.class, + () -> group.validateOffsetCommit("member-id", "instance-id", 1, false, version)); - // Group transitions to stable. - group.transitionTo(STABLE); + // Group transitions to stable. + group.transitionTo(STABLE); - // This should work. - group.validateOffsetCommit("member-id", "instance-id", 1, false, version); + // This should work. + group.validateOffsetCommit("member-id", "instance-id", 1, false, version); - // Replace static member. - group.replaceStaticMember("instance-id", "member-id", "new-member-id"); + // Replace static member. + group.replaceStaticMember("instance-id", "member-id", "new-member-id"); - // The old instance id should be fenced. - assertThrows(FencedInstanceIdException.class, - () -> group.validateOffsetCommit("member-id", "instance-id", 1, false, version)); + // The old instance id should be fenced. + assertThrows(FencedInstanceIdException.class, + () -> group.validateOffsetCommit("member-id", "instance-id", 1, false, version)); - // Remove member and transitions to dead. - group.remove("new-instance-id"); - group.transitionTo(DEAD); + // Remove member and transitions to dead. + group.remove("new-instance-id"); + group.transitionTo(DEAD); - // This should fail with CoordinatorNotAvailableException. - assertThrows(CoordinatorNotAvailableException.class, - () -> group.validateOffsetCommit("member-id", "new-instance-id", 1, false, version)); - } + // This should fail with CoordinatorNotAvailableException. + assertThrows(CoordinatorNotAvailableException.class, + () -> group.validateOffsetCommit("member-id", "new-instance-id", 1, false, version)); } @Test diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java index 9421d18a7853c..a67a5b098b94d 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; import org.apache.kafka.coordinator.group.Group; import org.apache.kafka.coordinator.group.MetadataImageBuilder; import org.apache.kafka.coordinator.group.OffsetAndMetadata; @@ -38,6 +39,7 @@ import org.apache.kafka.image.MetadataImage; import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; import java.util.ArrayList; import java.util.Arrays; @@ -1017,85 +1019,81 @@ public void testMetadataRefreshDeadline() { assertEquals(0, group.metadataRefreshDeadline().epoch); } - @Test - public void testValidateTransactionalOffsetCommit() { + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + public void testValidateTransactionalOffsetCommit(short version) { boolean isTransactional = true; - for (short v = ApiKeys.OFFSET_COMMIT.oldestVersion(); v <= ApiKeys.OFFSET_COMMIT.latestVersion(); v++) { - final short version = v; - ConsumerGroup group = createConsumerGroup("group-foo"); + ConsumerGroup group = createConsumerGroup("group-foo"); - // Simulate a call from the admin client without member id and member epoch. - // This should pass only if the group is empty. - group.validateOffsetCommit("", "", -1, isTransactional, version); + // Simulate a call from the admin client without member id and member epoch. + // This should pass only if the group is empty. + group.validateOffsetCommit("", "", -1, isTransactional, version); - // The member does not exist. - assertThrows(UnknownMemberIdException.class, () -> - group.validateOffsetCommit("member-id", null, 0, isTransactional, version)); + // The member does not exist. + assertThrows(UnknownMemberIdException.class, () -> + group.validateOffsetCommit("member-id", null, 0, isTransactional, version)); - // Create a member. - group.updateMember(new ConsumerGroupMember.Builder("member-id").build()); + // Create a member. + group.updateMember(new ConsumerGroupMember.Builder("member-id").build()); - // A call from the admin client should fail as the group is not empty. - assertThrows(UnknownMemberIdException.class, () -> - group.validateOffsetCommit("", "", -1, isTransactional, version)); + // A call from the admin client should fail as the group is not empty. + assertThrows(UnknownMemberIdException.class, () -> + group.validateOffsetCommit("", "", -1, isTransactional, version)); - // The member epoch is stale. - assertThrows(StaleMemberEpochException.class, () -> - group.validateOffsetCommit("member-id", "", 10, isTransactional, version)); + // The member epoch is stale. + assertThrows(StaleMemberEpochException.class, () -> + group.validateOffsetCommit("member-id", "", 10, isTransactional, version)); - // This should succeed. - group.validateOffsetCommit("member-id", "", 0, isTransactional, version); - } + // This should succeed. + group.validateOffsetCommit("member-id", "", 0, isTransactional, version); } - @Test - public void testNonTransactionalValidateOffsetCommit() { + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + public void testValidateOffsetCommit(short version) { boolean isTransactional = false; - for (short v = ApiKeys.OFFSET_COMMIT.oldestVersion(); v <= ApiKeys.OFFSET_COMMIT.latestVersion(); v++) { - final short version = v; - ConsumerGroup group = createConsumerGroup("group-foo"); - - // Simulate a call from the admin client without member id and member epoch. - // This should pass only if the group is empty. - group.validateOffsetCommit("", "", -1, isTransactional, version); - - // The member does not exist. - assertThrows(UnknownMemberIdException.class, () -> - group.validateOffsetCommit("member-id", null, 0, isTransactional, version)); - - // Create members. - group.updateMember( - new ConsumerGroupMember - .Builder("new-protocol-member-id").build() - ); - group.updateMember( - new ConsumerGroupMember.Builder("old-protocol-member-id") - .setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()) - .build() - ); - - // A call from the admin client should fail as the group is not empty. - assertThrows(UnknownMemberIdException.class, () -> - group.validateOffsetCommit("", "", -1, isTransactional, version)); - - // The member epoch is stale. - if (version >= 9) { - assertThrows(StaleMemberEpochException.class, () -> - group.validateOffsetCommit("new-protocol-member-id", "", 10, isTransactional, version)); - } else { - assertThrows(UnsupportedVersionException.class, () -> - group.validateOffsetCommit("new-protocol-member-id", "", 10, isTransactional, version)); - } - assertThrows(IllegalGenerationException.class, () -> - group.validateOffsetCommit("old-protocol-member-id", "", 10, isTransactional, version)); - - // This should succeed. - if (version >= 9) { - group.validateOffsetCommit("new-protocol-member-id", "", 0, isTransactional, version); - } else { - assertThrows(UnsupportedVersionException.class, () -> - group.validateOffsetCommit("new-protocol-member-id", "", 0, isTransactional, version)); - } + ConsumerGroup group = createConsumerGroup("group-foo"); + + // Simulate a call from the admin client without member id and member epoch. + // This should pass only if the group is empty. + group.validateOffsetCommit("", "", -1, isTransactional, version); + + // The member does not exist. + assertThrows(UnknownMemberIdException.class, () -> + group.validateOffsetCommit("member-id", null, 0, isTransactional, version)); + + // Create members. + group.updateMember( + new ConsumerGroupMember + .Builder("new-protocol-member-id").build() + ); + group.updateMember( + new ConsumerGroupMember.Builder("old-protocol-member-id") + .setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()) + .build() + ); + + // A call from the admin client should fail as the group is not empty. + assertThrows(UnknownMemberIdException.class, () -> + group.validateOffsetCommit("", "", -1, isTransactional, version)); + + // The member epoch is stale. + if (version >= 9) { + assertThrows(StaleMemberEpochException.class, () -> + group.validateOffsetCommit("new-protocol-member-id", "", 10, isTransactional, version)); + } else { + assertThrows(UnsupportedVersionException.class, () -> + group.validateOffsetCommit("new-protocol-member-id", "", 10, isTransactional, version)); + } + assertThrows(IllegalGenerationException.class, () -> + group.validateOffsetCommit("old-protocol-member-id", "", 10, isTransactional, version)); + + // This should succeed. + if (version >= 9) { + group.validateOffsetCommit("new-protocol-member-id", "", 0, isTransactional, version); + } else { + assertThrows(UnsupportedVersionException.class, () -> + group.validateOffsetCommit("new-protocol-member-id", "", 0, isTransactional, version)); } }