From 533e1a54609cb379cf3d2970ae9f2d5b8fad8be3 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 7 Apr 2021 14:05:21 +0800 Subject: [PATCH 1/2] Fix flaky testDeleteClosedTopics --- .../handlers/kop/coordinator/group/GroupCoordinator.java | 8 ++------ .../pulsar/handlers/kop/BasicEndToEndKafkaTest.java | 9 +++++++++ 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java index 66306d663c..4ac833533a 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java @@ -810,12 +810,6 @@ public CompletableFuture> handleCommitOffsets( }); }); - result.whenCompleteAsync((ignore, e) ->{ - if (e == null){ - offsetAcker.ackOffsets(groupId, offsetMetadata); - } - }); - return result; } @@ -850,6 +844,7 @@ private CompletableFuture> doCommitOffsets( // The group is only using Kafka to store offsets. // Also, for transactional offset commits we don't need to validate group membership // and the generation. + offsetAcker.ackOffsets(group.groupId(), offsetMetadata); return groupManager.storeOffsets(group, memberId, offsetMetadata, producerId, producerEpoch); } else if (group.is(CompletingRebalance)) { return CompletableFuture.completedFuture( @@ -866,6 +861,7 @@ private CompletableFuture> doCommitOffsets( } else { MemberMetadata member = group.get(memberId); completeAndScheduleNextHeartbeatExpiration(group, member); + offsetAcker.ackOffsets(group.groupId(), offsetMetadata); return groupManager.storeOffsets( group, memberId, offsetMetadata ); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java index 1040305c3f..15963f85ed 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java @@ -15,11 +15,13 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import java.util.Arrays; import java.util.Collections; import java.util.List; import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -28,6 +30,7 @@ /** * Basic end-to-end test with `entryFormat=kafka`. */ +@Slf4j public class BasicEndToEndKafkaTest extends BasicEndToEndTestBase { public BasicEndToEndKafkaTest() { @@ -61,7 +64,9 @@ public void testDeleteClosedTopics() throws Exception { try { admin.topics().deletePartitionedTopic(topic); + fail(); } catch (PulsarAdminException e) { + log.info("Failed to delete partitioned topic \"{}\": {}", topic, e.getMessage()); assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")); } @@ -69,7 +74,9 @@ public void testDeleteClosedTopics() throws Exception { assertEquals(receiveMessages(kafkaConsumer1, expectedMessages.size()), expectedMessages); try { admin.topics().deletePartitionedTopic(topic); + fail(); } catch (PulsarAdminException e) { + log.info("Failed to delete partitioned topic \"{}\": {}", topic, e.getMessage()); assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")); } @@ -80,7 +87,9 @@ public void testDeleteClosedTopics() throws Exception { kafkaConsumer1.close(); try { admin.topics().deletePartitionedTopic(topic); + fail(); } catch (PulsarAdminException e) { + log.info("Failed to delete partitioned topic \"{}\": {}", topic, e.getMessage()); assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")); } From c0dafd2a18427e1790dcb6b63d2707cda2732f17 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 7 Apr 2021 14:43:03 +0800 Subject: [PATCH 2/2] Fix failure in CI --- .../pulsar/handlers/kop/BasicEndToEndKafkaTest.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java index 15963f85ed..c6639b06a1 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java @@ -67,7 +67,8 @@ public void testDeleteClosedTopics() throws Exception { fail(); } catch (PulsarAdminException e) { log.info("Failed to delete partitioned topic \"{}\": {}", topic, e.getMessage()); - assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")); + assertTrue(e.getMessage().contains("Topic has active producers/subscriptions") + || e.getMessage().contains("Partitioned topic does not exist")); } final KafkaConsumer kafkaConsumer1 = newKafkaConsumer(topic, "sub-1"); @@ -77,7 +78,8 @@ public void testDeleteClosedTopics() throws Exception { fail(); } catch (PulsarAdminException e) { log.info("Failed to delete partitioned topic \"{}\": {}", topic, e.getMessage()); - assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")); + assertTrue(e.getMessage().contains("Topic has active producers/subscriptions") + || e.getMessage().contains("Partitioned topic does not exist")); } final KafkaConsumer kafkaConsumer2 = newKafkaConsumer(topic, "sub-2"); @@ -90,7 +92,8 @@ public void testDeleteClosedTopics() throws Exception { fail(); } catch (PulsarAdminException e) { log.info("Failed to delete partitioned topic \"{}\": {}", topic, e.getMessage()); - assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")); + assertTrue(e.getMessage().contains("Topic has active producers/subscriptions") + || e.getMessage().contains("Partitioned topic does not exist")); } kafkaConsumer2.close();