diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java index 66306d663c..4ac833533a 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java @@ -810,12 +810,6 @@ public CompletableFuture> handleCommitOffsets( }); }); - result.whenCompleteAsync((ignore, e) ->{ - if (e == null){ - offsetAcker.ackOffsets(groupId, offsetMetadata); - } - }); - return result; } @@ -850,6 +844,7 @@ private CompletableFuture> doCommitOffsets( // The group is only using Kafka to store offsets. // Also, for transactional offset commits we don't need to validate group membership // and the generation. + offsetAcker.ackOffsets(group.groupId(), offsetMetadata); return groupManager.storeOffsets(group, memberId, offsetMetadata, producerId, producerEpoch); } else if (group.is(CompletingRebalance)) { return CompletableFuture.completedFuture( @@ -866,6 +861,7 @@ private CompletableFuture> doCommitOffsets( } else { MemberMetadata member = group.get(memberId); completeAndScheduleNextHeartbeatExpiration(group, member); + offsetAcker.ackOffsets(group.groupId(), offsetMetadata); return groupManager.storeOffsets( group, memberId, offsetMetadata ); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java index 1040305c3f..c6639b06a1 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java @@ -15,11 +15,13 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import java.util.Arrays; import java.util.Collections; import java.util.List; import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -28,6 +30,7 @@ /** * Basic end-to-end test with `entryFormat=kafka`. */ +@Slf4j public class BasicEndToEndKafkaTest extends BasicEndToEndTestBase { public BasicEndToEndKafkaTest() { @@ -61,16 +64,22 @@ public void testDeleteClosedTopics() throws Exception { try { admin.topics().deletePartitionedTopic(topic); + fail(); } catch (PulsarAdminException e) { - assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")); + log.info("Failed to delete partitioned topic \"{}\": {}", topic, e.getMessage()); + assertTrue(e.getMessage().contains("Topic has active producers/subscriptions") + || e.getMessage().contains("Partitioned topic does not exist")); } final KafkaConsumer kafkaConsumer1 = newKafkaConsumer(topic, "sub-1"); assertEquals(receiveMessages(kafkaConsumer1, expectedMessages.size()), expectedMessages); try { admin.topics().deletePartitionedTopic(topic); + fail(); } catch (PulsarAdminException e) { - assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")); + log.info("Failed to delete partitioned topic \"{}\": {}", topic, e.getMessage()); + assertTrue(e.getMessage().contains("Topic has active producers/subscriptions") + || e.getMessage().contains("Partitioned topic does not exist")); } final KafkaConsumer kafkaConsumer2 = newKafkaConsumer(topic, "sub-2"); @@ -80,8 +89,11 @@ public void testDeleteClosedTopics() throws Exception { kafkaConsumer1.close(); try { admin.topics().deletePartitionedTopic(topic); + fail(); } catch (PulsarAdminException e) { - assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")); + log.info("Failed to delete partitioned topic \"{}\": {}", topic, e.getMessage()); + assertTrue(e.getMessage().contains("Topic has active producers/subscriptions") + || e.getMessage().contains("Partitioned topic does not exist")); } kafkaConsumer2.close();