From 159259b6a7d1c155007511c4dc8b4f14acad3df9 Mon Sep 17 00:00:00 2001 From: dongnuolyu Date: Tue, 11 Jun 2024 15:25:34 -0400 Subject: [PATCH 1/7] switch checking allMembersUseClassicProtocolExcept and checking isDowngradeEnabled; remove log if allMembersUseClassicProtocolExcept is false --- .../kafka/coordinator/group/GroupMetadataManager.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 6463d78d9ce74..e564b4647bf37 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -794,12 +794,10 @@ public ClassicGroup classicGroup( * @return A boolean indicating whether it's valid to online downgrade the consumer group. */ private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) { - if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { - log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", - consumerGroup.groupId()); + if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { return false; - } else if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { - log.debug("Cannot downgrade consumer group {} to classic group because not all its members use the classic protocol.", + } else if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { + log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", consumerGroup.groupId()); return false; } else if (consumerGroup.numMembers() <= 1) { From 4da8deda5244b41ed2434594321f256ecc1b600f Mon Sep 17 00:00:00 2001 From: dongnuolyu Date: Tue, 11 Jun 2024 15:29:58 -0400 Subject: [PATCH 2/7] switch checking numMembers and isDowngradeEnabled --- .../kafka/coordinator/group/GroupMetadataManager.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index e564b4647bf37..69355348e46e3 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -796,14 +796,14 @@ public ClassicGroup classicGroup( private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) { if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { return false; - } else if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { - log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", - consumerGroup.groupId()); - return false; } else if (consumerGroup.numMembers() <= 1) { log.debug("Skip downgrading the consumer group {} to classic group because it's empty.", consumerGroup.groupId()); return false; + } else if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { + log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", + consumerGroup.groupId()); + return false; } else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) { log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.", consumerGroup.groupId()); From ff0a27d53865b5fd37996091741ab41733257ea2 Mon Sep 17 00:00:00 2001 From: dongnuolyu Date: Tue, 11 Jun 2024 18:22:32 -0400 Subject: [PATCH 3/7] update the timeouts scheduled in consumer group conversion --- .../coordinator/group/GroupMetadataManager.java | 7 ++++--- .../group/GroupMetadataManagerTest.java | 15 ++++++++++----- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 69355348e46e3..bb5ba02eb7fa8 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -925,9 +925,10 @@ ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List - scheduleConsumerGroupSessionTimeout(consumerGroup.groupId(), memberId) - ); + consumerGroup.members().forEach((memberId, member) -> { + scheduleConsumerGroupSessionTimeout(consumerGroup.groupId(), memberId, member.classicProtocolSessionTimeout().get()); + scheduleConsumerGroupJoinTimeoutIfAbsent(consumerGroup.groupId(), memberId, member.rebalanceTimeoutMs()); + }); return consumerGroup; } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index ad68c96aaabea..629349b57ad6d 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -9630,7 +9630,8 @@ public void testConsumerGroupHeartbeatWithStableClassicGroup() { assertRecordsEquals(expectedRecords, result.records()); - context.assertSessionTimeout(groupId, memberId1, 45000); + context.assertSessionTimeout(groupId, memberId1, expectedMember1.classicProtocolSessionTimeout().get()); + context.assertJoinTimeout(groupId, memberId1, expectedMember1.rebalanceTimeoutMs()); context.assertSessionTimeout(groupId, memberId2, 45000); // Simulate a failed replay. The context is rolled back and the group is converted back to the classic group. @@ -9876,8 +9877,10 @@ public void testConsumerGroupHeartbeatWithPreparingRebalanceClassicGroup() throw assertTrue(joinResult.joinFuture.isDone()); assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), joinResult.joinFuture.get().errorCode()); - context.assertSessionTimeout(groupId, memberId1, 45000); - context.assertSessionTimeout(groupId, memberId2, 45000); + context.assertSessionTimeout(groupId, memberId1, expectedMember1.classicProtocolSessionTimeout().get()); + context.assertJoinTimeout(groupId, memberId1, expectedMember1.rebalanceTimeoutMs()); + context.assertSessionTimeout(groupId, memberId2, expectedMember2.classicProtocolSessionTimeout().get()); + context.assertJoinTimeout(groupId, memberId2, expectedMember2.rebalanceTimeoutMs()); context.assertSessionTimeout(groupId, memberId3, 45000); // Simulate a failed replay. The context is rolled back and the group is converted back to the classic group. @@ -10141,8 +10144,10 @@ public void testConsumerGroupHeartbeatWithCompletingRebalanceClassicGroup() thro assertTrue(syncResult.syncFuture.isDone()); assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), syncResult.syncFuture.get().errorCode()); - context.assertSessionTimeout(groupId, memberId1, 45000); - context.assertSessionTimeout(groupId, memberId2, 45000); + context.assertSessionTimeout(groupId, memberId1, expectedMember1.classicProtocolSessionTimeout().get()); + context.assertJoinTimeout(groupId, memberId1, expectedMember1.rebalanceTimeoutMs()); + context.assertSessionTimeout(groupId, memberId2, expectedMember2.classicProtocolSessionTimeout().get()); + context.assertJoinTimeout(groupId, memberId2, expectedMember2.rebalanceTimeoutMs()); context.assertSessionTimeout(groupId, memberId3, 45000); // Simulate a failed replay. The context is rolled back and the group is converted back to the classic group. From c1ef0cd57fddf5a70f0d4da60b0d911f0a7b80a0 Mon Sep 17 00:00:00 2001 From: dongnuolyu Date: Tue, 11 Jun 2024 20:37:02 -0400 Subject: [PATCH 4/7] enable rebalance timeout for classic protocol members in reconciliation --- .../group/GroupMetadataManager.java | 22 ++++++++----------- .../group/GroupMetadataManagerTest.java | 2 ++ 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index bb5ba02eb7fa8..a3f5b09768ebb 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -1847,19 +1847,15 @@ private ConsumerGroupMember maybeReconcile( groupId, updatedMember.memberId(), updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), updatedMember.state(), assignmentToString(updatedMember.assignedPartitions()), assignmentToString(updatedMember.partitionsPendingRevocation())); - // Schedule/cancel the rebalance timeout if the member uses the consumer protocol. - // The members using classic protocol only have join timer and sync timer. - if (!updatedMember.useClassicProtocol()) { - if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) { - scheduleConsumerGroupRebalanceTimeout( - groupId, - updatedMember.memberId(), - updatedMember.memberEpoch(), - updatedMember.rebalanceTimeoutMs() - ); - } else { - cancelConsumerGroupRebalanceTimeout(groupId, updatedMember.memberId()); - } + if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) { + scheduleConsumerGroupRebalanceTimeout( + groupId, + updatedMember.memberId(), + updatedMember.memberEpoch(), + updatedMember.rebalanceTimeoutMs() + ); + } else { + cancelConsumerGroupRebalanceTimeout(groupId, updatedMember.memberId()); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 629349b57ad6d..68184c9935018 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -11950,6 +11950,7 @@ public void testReconciliationInJoiningConsumerGroupWithCooperativeProtocol() th .setProtocolName("range"), joinResponse1 ); + context.assertRebalanceTimeout(groupId, memberId1, request1.rebalanceTimeoutMs()); context.assertSessionTimeout(groupId, memberId1, request1.sessionTimeoutMs()); context.assertSyncTimeout(groupId, memberId1, request1.rebalanceTimeoutMs()); @@ -12015,6 +12016,7 @@ public void testReconciliationInJoiningConsumerGroupWithCooperativeProtocol() th assertEquals(expectedMember2.state(), group.getOrMaybeCreateMember(memberId1, false).state()); joinResult2.appendFuture.complete(null); + context.assertNoRebalanceTimeout(groupId, memberId1); context.assertNoJoinTimeout(groupId, memberId1); JoinGroupResponseData joinResponse2 = joinResult2.joinFuture.get(); assertEquals( From 7216048e5ab1fa7ed9002e6f624dc104a74d8e4f Mon Sep 17 00:00:00 2001 From: dongnuolyu Date: Wed, 12 Jun 2024 12:01:17 -0400 Subject: [PATCH 5/7] Revert "update the timeouts scheduled in consumer group conversion" This reverts commit ff0a27d53865b5fd37996091741ab41733257ea2. --- .../coordinator/group/GroupMetadataManager.java | 7 +++---- .../group/GroupMetadataManagerTest.java | 15 +++++---------- 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index a3f5b09768ebb..9811ac077da80 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -925,10 +925,9 @@ ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List { - scheduleConsumerGroupSessionTimeout(consumerGroup.groupId(), memberId, member.classicProtocolSessionTimeout().get()); - scheduleConsumerGroupJoinTimeoutIfAbsent(consumerGroup.groupId(), memberId, member.rebalanceTimeoutMs()); - }); + consumerGroup.members().forEach((memberId, __) -> + scheduleConsumerGroupSessionTimeout(consumerGroup.groupId(), memberId) + ); return consumerGroup; } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 68184c9935018..a01cbc43a2827 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -9630,8 +9630,7 @@ public void testConsumerGroupHeartbeatWithStableClassicGroup() { assertRecordsEquals(expectedRecords, result.records()); - context.assertSessionTimeout(groupId, memberId1, expectedMember1.classicProtocolSessionTimeout().get()); - context.assertJoinTimeout(groupId, memberId1, expectedMember1.rebalanceTimeoutMs()); + context.assertSessionTimeout(groupId, memberId1, 45000); context.assertSessionTimeout(groupId, memberId2, 45000); // Simulate a failed replay. The context is rolled back and the group is converted back to the classic group. @@ -9877,10 +9876,8 @@ public void testConsumerGroupHeartbeatWithPreparingRebalanceClassicGroup() throw assertTrue(joinResult.joinFuture.isDone()); assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), joinResult.joinFuture.get().errorCode()); - context.assertSessionTimeout(groupId, memberId1, expectedMember1.classicProtocolSessionTimeout().get()); - context.assertJoinTimeout(groupId, memberId1, expectedMember1.rebalanceTimeoutMs()); - context.assertSessionTimeout(groupId, memberId2, expectedMember2.classicProtocolSessionTimeout().get()); - context.assertJoinTimeout(groupId, memberId2, expectedMember2.rebalanceTimeoutMs()); + context.assertSessionTimeout(groupId, memberId1, 45000); + context.assertSessionTimeout(groupId, memberId2, 45000); context.assertSessionTimeout(groupId, memberId3, 45000); // Simulate a failed replay. The context is rolled back and the group is converted back to the classic group. @@ -10144,10 +10141,8 @@ public void testConsumerGroupHeartbeatWithCompletingRebalanceClassicGroup() thro assertTrue(syncResult.syncFuture.isDone()); assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), syncResult.syncFuture.get().errorCode()); - context.assertSessionTimeout(groupId, memberId1, expectedMember1.classicProtocolSessionTimeout().get()); - context.assertJoinTimeout(groupId, memberId1, expectedMember1.rebalanceTimeoutMs()); - context.assertSessionTimeout(groupId, memberId2, expectedMember2.classicProtocolSessionTimeout().get()); - context.assertJoinTimeout(groupId, memberId2, expectedMember2.rebalanceTimeoutMs()); + context.assertSessionTimeout(groupId, memberId1, 45000); + context.assertSessionTimeout(groupId, memberId2, 45000); context.assertSessionTimeout(groupId, memberId3, 45000); // Simulate a failed replay. The context is rolled back and the group is converted back to the classic group. From e90ccd084652170338bad325d337474f07943f60 Mon Sep 17 00:00:00 2001 From: dongnuolyu Date: Wed, 12 Jun 2024 12:04:44 -0400 Subject: [PATCH 6/7] update schedule session timeout in conversion --- .../kafka/coordinator/group/GroupMetadataManager.java | 4 ++-- .../coordinator/group/GroupMetadataManagerTest.java | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 9811ac077da80..8c7d20c05ef19 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -925,8 +925,8 @@ ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List - scheduleConsumerGroupSessionTimeout(consumerGroup.groupId(), memberId) + consumerGroup.members().forEach((memberId, member) -> + scheduleConsumerGroupSessionTimeout(consumerGroup.groupId(), memberId, member.classicProtocolSessionTimeout().get()) ); return consumerGroup; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index a01cbc43a2827..75c459d9baaf7 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -9630,7 +9630,7 @@ public void testConsumerGroupHeartbeatWithStableClassicGroup() { assertRecordsEquals(expectedRecords, result.records()); - context.assertSessionTimeout(groupId, memberId1, 45000); + context.assertSessionTimeout(groupId, memberId1, expectedMember1.classicProtocolSessionTimeout().get()); context.assertSessionTimeout(groupId, memberId2, 45000); // Simulate a failed replay. The context is rolled back and the group is converted back to the classic group. @@ -9876,8 +9876,8 @@ public void testConsumerGroupHeartbeatWithPreparingRebalanceClassicGroup() throw assertTrue(joinResult.joinFuture.isDone()); assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), joinResult.joinFuture.get().errorCode()); - context.assertSessionTimeout(groupId, memberId1, 45000); - context.assertSessionTimeout(groupId, memberId2, 45000); + context.assertSessionTimeout(groupId, memberId1, expectedMember1.classicProtocolSessionTimeout().get()); + context.assertSessionTimeout(groupId, memberId2, expectedMember2.classicProtocolSessionTimeout().get()); context.assertSessionTimeout(groupId, memberId3, 45000); // Simulate a failed replay. The context is rolled back and the group is converted back to the classic group. @@ -10141,8 +10141,8 @@ public void testConsumerGroupHeartbeatWithCompletingRebalanceClassicGroup() thro assertTrue(syncResult.syncFuture.isDone()); assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), syncResult.syncFuture.get().errorCode()); - context.assertSessionTimeout(groupId, memberId1, 45000); - context.assertSessionTimeout(groupId, memberId2, 45000); + context.assertSessionTimeout(groupId, memberId1, expectedMember1.classicProtocolSessionTimeout().get()); + context.assertSessionTimeout(groupId, memberId2, expectedMember2.classicProtocolSessionTimeout().get()); context.assertSessionTimeout(groupId, memberId3, 45000); // Simulate a failed replay. The context is rolled back and the group is converted back to the classic group. From f43fef7aa6d87b32b583489bb766d42a05730561 Mon Sep 17 00:00:00 2001 From: dongnuolyu Date: Wed, 12 Jun 2024 12:05:12 -0400 Subject: [PATCH 7/7] Revert "enable rebalance timeout for classic protocol members in reconciliation" This reverts commit c1ef0cd57fddf5a70f0d4da60b0d911f0a7b80a0. --- .../group/GroupMetadataManager.java | 22 +++++++++++-------- .../group/GroupMetadataManagerTest.java | 2 -- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 8c7d20c05ef19..9afb8d7791d2e 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -1846,15 +1846,19 @@ private ConsumerGroupMember maybeReconcile( groupId, updatedMember.memberId(), updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), updatedMember.state(), assignmentToString(updatedMember.assignedPartitions()), assignmentToString(updatedMember.partitionsPendingRevocation())); - if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) { - scheduleConsumerGroupRebalanceTimeout( - groupId, - updatedMember.memberId(), - updatedMember.memberEpoch(), - updatedMember.rebalanceTimeoutMs() - ); - } else { - cancelConsumerGroupRebalanceTimeout(groupId, updatedMember.memberId()); + // Schedule/cancel the rebalance timeout if the member uses the consumer protocol. + // The members using classic protocol only have join timer and sync timer. + if (!updatedMember.useClassicProtocol()) { + if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) { + scheduleConsumerGroupRebalanceTimeout( + groupId, + updatedMember.memberId(), + updatedMember.memberEpoch(), + updatedMember.rebalanceTimeoutMs() + ); + } else { + cancelConsumerGroupRebalanceTimeout(groupId, updatedMember.memberId()); + } } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 75c459d9baaf7..248f5507a06da 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -11945,7 +11945,6 @@ public void testReconciliationInJoiningConsumerGroupWithCooperativeProtocol() th .setProtocolName("range"), joinResponse1 ); - context.assertRebalanceTimeout(groupId, memberId1, request1.rebalanceTimeoutMs()); context.assertSessionTimeout(groupId, memberId1, request1.sessionTimeoutMs()); context.assertSyncTimeout(groupId, memberId1, request1.rebalanceTimeoutMs()); @@ -12011,7 +12010,6 @@ public void testReconciliationInJoiningConsumerGroupWithCooperativeProtocol() th assertEquals(expectedMember2.state(), group.getOrMaybeCreateMember(memberId1, false).state()); joinResult2.appendFuture.complete(null); - context.assertNoRebalanceTimeout(groupId, memberId1); context.assertNoJoinTimeout(groupId, memberId1); JoinGroupResponseData joinResponse2 = joinResult2.joinFuture.get(); assertEquals(