Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -794,18 +794,16 @@ public ClassicGroup classicGroup(
* @return A boolean indicating whether it's valid to online downgrade the consumer group.
*/
private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) {
if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.",
consumerGroup.groupId());
return false;
} else if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
log.debug("Cannot downgrade consumer group {} to classic group because not all its members use the classic protocol.",
consumerGroup.groupId());
if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
return false;
} else if (consumerGroup.numMembers() <= 1) {
log.debug("Skip downgrading the consumer group {} to classic group because it's empty.",
consumerGroup.groupId());
return false;
} else if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.",
consumerGroup.groupId());
return false;
} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.",
consumerGroup.groupId());
Expand Down Expand Up @@ -927,8 +925,8 @@ ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List<Coordinator

// Create the session timeouts for the new members. If the conversion fails, the group will remain a
// classic group, thus these timers will fail the group type check and do nothing.
consumerGroup.members().forEach((memberId, __) ->
scheduleConsumerGroupSessionTimeout(consumerGroup.groupId(), memberId)
consumerGroup.members().forEach((memberId, member) ->
scheduleConsumerGroupSessionTimeout(consumerGroup.groupId(), memberId, member.classicProtocolSessionTimeout().get())
);

return consumerGroup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9630,7 +9630,7 @@ public void testConsumerGroupHeartbeatWithStableClassicGroup() {

assertRecordsEquals(expectedRecords, result.records());

context.assertSessionTimeout(groupId, memberId1, 45000);
context.assertSessionTimeout(groupId, memberId1, expectedMember1.classicProtocolSessionTimeout().get());
context.assertSessionTimeout(groupId, memberId2, 45000);

// Simulate a failed replay. The context is rolled back and the group is converted back to the classic group.
Expand Down Expand Up @@ -9876,8 +9876,8 @@ public void testConsumerGroupHeartbeatWithPreparingRebalanceClassicGroup() throw
assertTrue(joinResult.joinFuture.isDone());
assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), joinResult.joinFuture.get().errorCode());

context.assertSessionTimeout(groupId, memberId1, 45000);
context.assertSessionTimeout(groupId, memberId2, 45000);
context.assertSessionTimeout(groupId, memberId1, expectedMember1.classicProtocolSessionTimeout().get());
context.assertSessionTimeout(groupId, memberId2, expectedMember2.classicProtocolSessionTimeout().get());
context.assertSessionTimeout(groupId, memberId3, 45000);

// Simulate a failed replay. The context is rolled back and the group is converted back to the classic group.
Expand Down Expand Up @@ -10141,8 +10141,8 @@ public void testConsumerGroupHeartbeatWithCompletingRebalanceClassicGroup() thro
assertTrue(syncResult.syncFuture.isDone());
assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), syncResult.syncFuture.get().errorCode());

context.assertSessionTimeout(groupId, memberId1, 45000);
context.assertSessionTimeout(groupId, memberId2, 45000);
context.assertSessionTimeout(groupId, memberId1, expectedMember1.classicProtocolSessionTimeout().get());
context.assertSessionTimeout(groupId, memberId2, expectedMember2.classicProtocolSessionTimeout().get());
context.assertSessionTimeout(groupId, memberId3, 45000);

// Simulate a failed replay. The context is rolled back and the group is converted back to the classic group.
Expand Down