From 1f1dc015149c23c6ba4c794bd22b4737b005fd25 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 6 Apr 2021 00:13:01 +0800 Subject: [PATCH 01/11] Avoid creating offset consumer before updating consumer metrics --- .../handlers/kop/KafkaTopicManager.java | 50 ++++++++++++++---- .../kop/coordinator/group/OffsetAcker.java | 51 ++++++++++++++++--- .../handlers/kop/BasicEndToEndKafkaTest.java | 42 +++++++++++++++ 3 files changed, 125 insertions(+), 18 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java index 25aa6c6a65..bb44aa1296 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java @@ -15,8 +15,10 @@ import static com.google.common.base.Preconditions.checkState; +import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetAcker; import io.streamnative.pulsar.handlers.kop.utils.KopTopic; import java.net.InetSocketAddress; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -27,6 +29,7 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.TopicPartition; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -379,14 +382,21 @@ public void deReference(String topicName) { } public CompletableFuture getGroupConsumers(String groupId, TopicPartition kafkaPartition) { - // make sure internal consumer existed - CompletableFuture consumerFuture = new CompletableFuture<>(); - if (groupId == null || groupId.isEmpty() || !requestHandler.getGroupCoordinator() - .getOffsetAcker().getConsumer(groupId, kafkaPartition).isDone()) { - log.warn("not get consumer for group {} this time", groupId); - consumerFuture.complete(null); - return consumerFuture; + if (StringUtils.isEmpty(groupId)) { + log.warn("Try to get group consumers with an empty group id"); + return CompletableFuture.completedFuture(null); } + + // The future of the offset consumer should be created before in `GroupCoordinator#handleSyncGroup` + final OffsetAcker offsetAcker = requestHandler.getGroupCoordinator().getOffsetAcker(); + final CompletableFuture> offsetConsumerFuture = + offsetAcker.getConsumer(groupId, kafkaPartition); + if (offsetConsumerFuture == null) { + log.warn("No offset consumer for [group={}] [topic={}]", groupId, kafkaPartition); + return CompletableFuture.completedFuture(null); + } + + CompletableFuture consumerFuture = new CompletableFuture<>(); return CONSUMERS_CACHE.computeIfAbsent(groupId, group -> { try { TopicName topicName = TopicName.get(KopTopic.toString(kafkaPartition)); @@ -396,10 +406,28 @@ public CompletableFuture getGroupConsumers(String groupId, TopicPartit .getBrokerService().getMultiLayerTopicsMap() .get(topicName.getNamespace()).get(namespaceBundle.toString()) .get(topicName.toString()); - // only one consumer existed for internal subscription - Consumer consumer = persistentTopic.getSubscriptions() - .get(groupId).getDispatcher().getConsumers().get(0); - consumerFuture.complete(consumer); + // The `Consumer` in broker side won't be created until the `Consumer` in client side subscribes + // successfully, so we should wait until offset consumer's future is completed. + offsetConsumerFuture.whenComplete((ignored, e) -> { + if (e != null) { + log.warn("Failed to create offset consumer for [group={}] [topic={}]: {}", + groupId, kafkaPartition, e.getMessage()); + offsetAcker.removeConsumer(groupId, kafkaPartition); + // Here we don't return because the `Consumer` in broker side may be created already + } + // Double check for if the `Consumer` in broker side has been created + final List consumers = + persistentTopic.getSubscriptions().get(groupId).getDispatcher().getConsumers(); + if (consumers.isEmpty()) { + log.error("There's no internal consumer for [group={}]", groupId); + consumerFuture.complete(null); + return; + } + // only one consumer existed for internal subscription + final Consumer consumer = persistentTopic.getSubscriptions() + .get(groupId).getDispatcher().getConsumers().get(0); + consumerFuture.complete(consumer); + }); } catch (Exception e) { log.error("get topic error", e); consumerFuture.complete(null); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java index d898de0f3c..e1b8abc3be 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java @@ -18,12 +18,14 @@ import io.streamnative.pulsar.handlers.kop.utils.OffsetSearchPredicate; import java.io.Closeable; import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; @@ -33,6 +35,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -43,9 +46,21 @@ @Slf4j public class OffsetAcker implements Closeable { + private static final Map>> EMPTY_CONSUMERS = new HashMap<>(); + private final ConsumerBuilder consumerBuilder; private final BrokerService brokerService; + // A map whose + // key is group id, + // value is a map whose + // key is the partition, + // value is the created future of consumer. + // The consumer, whose subscription is the group id, is used for acknowledging message id cumulatively. + // This behavior is equivalent to committing offsets in Kafka. + private final Map>>> + consumers = new ConcurrentHashMap<>(); + public OffsetAcker(PulsarClientImpl pulsarClient) { this.consumerBuilder = pulsarClient.newConsumer() .receiverQueueSize(0) @@ -60,16 +75,13 @@ public OffsetAcker(PulsarClientImpl pulsarClient, BrokerService brokerService) { this.brokerService = brokerService; } - // map off consumser: - Map>>> consumers = new ConcurrentHashMap<>(); - public void addOffsetsTracker(String groupId, byte[] assignment) { ByteBuffer assignBuffer = ByteBuffer.wrap(assignment); Assignment assign = ConsumerProtocol.deserializeAssignment(assignBuffer); if (log.isDebugEnabled()) { log.debug(" Add offsets after sync group: {}", assign.toString()); } - assign.partitions().forEach(topicPartition -> getConsumer(groupId, topicPartition)); + assign.partitions().forEach(topicPartition -> getOrCreateConsumer(groupId, topicPartition)); } public void ackOffsets(String groupId, Map offsetMetadata) { @@ -81,11 +93,13 @@ public void ackOffsets(String groupId, Map of } offsetMetadata.forEach(((topicPartition, offsetAndMetadata) -> { // 1. get consumer, then do ackCumulative - CompletableFuture> consumerFuture = getConsumer(groupId, topicPartition); + CompletableFuture> consumerFuture = getOrCreateConsumer(groupId, topicPartition); consumerFuture.whenComplete((consumer, throwable) -> { if (throwable != null) { - log.warn("Error when get consumer for offset ack:", throwable); + log.warn("Failed to create offset consumer for [group={}] [topic={}]: {}", + groupId, topicPartition, throwable.getMessage()); + removeConsumer(groupId, topicPartition); return; } KopTopic kopTopic = new KopTopic(topicPartition.topic()); @@ -148,7 +162,8 @@ public void close() { close(consumers.keySet()); } - public CompletableFuture> getConsumer(String groupId, TopicPartition topicPartition) { + @NonNull + public CompletableFuture> getOrCreateConsumer(String groupId, TopicPartition topicPartition) { Map>> group = consumers .computeIfAbsent(groupId, gid -> new ConcurrentHashMap<>()); return group.computeIfAbsent( @@ -156,6 +171,7 @@ public CompletableFuture> getConsumer(String groupId, TopicPart partition -> createConsumer(groupId, partition)); } + @NonNull private CompletableFuture> createConsumer(String groupId, TopicPartition topicPartition) { KopTopic kopTopic = new KopTopic(topicPartition.topic()); return consumerBuilder.clone() @@ -164,4 +180,25 @@ private CompletableFuture> createConsumer(String groupId, Topic .subscribeAsync(); } + public CompletableFuture> getConsumer(String groupId, TopicPartition topicPartition) { + return consumers.getOrDefault(groupId, EMPTY_CONSUMERS).get(topicPartition); + } + + public void removeConsumer(String groupId, TopicPartition topicPartition) { + final CompletableFuture> consumerFuture = + consumers.getOrDefault(groupId, EMPTY_CONSUMERS).remove(topicPartition); + if (consumerFuture != null) { + consumerFuture.whenComplete((consumer, e) -> { + if (e == null) { + try { + consumer.close(); + } catch (PulsarClientException ignored) { + } + } else { + log.error("Failed to create consumer for [group={}] [topic={}]: {}", + groupId, topicPartition, e.getMessage()); + } + }); + } + } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java index 1040305c3f..502c202a25 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java @@ -16,18 +16,28 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import io.streamnative.pulsar.handlers.kop.utils.KopTopic; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Properties; + import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.kafka.common.TopicPartition; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ConsumerStats; import org.testng.annotations.Test; /** * Basic end-to-end test with `entryFormat=kafka`. */ +@Slf4j public class BasicEndToEndKafkaTest extends BasicEndToEndTestBase { public BasicEndToEndKafkaTest() { @@ -87,4 +97,36 @@ public void testDeleteClosedTopics() throws Exception { kafkaConsumer2.close(); admin.topics().deletePartitionedTopic(topic); } + + @Test(timeOut = 20000) + public void testKafkaConsumerMetrics() throws Exception { + final String topic = "test-kafka-consumer-metrics"; + final String group = "group-test-kafka-consumer-metrics"; + final List expectedMessages = Arrays.asList("A", "B", "C"); + + @Cleanup + final KafkaProducer kafkaProducer = newKafkaProducer(); + sendSingleMessages(kafkaProducer, topic, expectedMessages); + + final Properties consumerProps = newKafkaConsumerProperties(); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, group); + @Cleanup + final KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProps); + kafkaConsumer.subscribe(Collections.singleton(topic)); + List kafkaReceives = receiveMessages(kafkaConsumer, expectedMessages.size()); + assertEquals(kafkaReceives, expectedMessages); + + // Check stats + final TopicName topicName = TopicName.get(KopTopic.toString(new TopicPartition(topic, 0))); + final PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getMultiLayerTopicsMap() + .get(topicName.getNamespace()) + .get(pulsar.getNamespaceService().getBundle(topicName).toString()) + .get(topicName.toString()); + final ConsumerStats stats = + persistentTopic.getSubscriptions().get(group).getDispatcher().getConsumers().get(0).getStats(); + log.info("Consumer stats: [msgOutCounter={}] [bytesOutCounter={}]", + stats.msgOutCounter, stats.bytesOutCounter); + assertEquals(stats.msgOutCounter, expectedMessages.size()); + assertTrue(stats.bytesOutCounter > 0); + } } From d6599a967ece0e1717c76a83952a7625956d3b9a Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 6 Apr 2021 12:22:47 +0800 Subject: [PATCH 02/11] Fix checkstyle --- .../pulsar/handlers/kop/BasicEndToEndKafkaTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java index 502c202a25..198702e6c7 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java @@ -21,15 +21,14 @@ import java.util.Collections; import java.util.List; import java.util.Properties; - import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.kafka.common.TopicPartition; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ConsumerStats; import org.testng.annotations.Test; From 399a6c1f80ad63248890e4e966d53b29d80af3f2 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 6 Apr 2021 12:24:42 +0800 Subject: [PATCH 03/11] Use closeAsync to avoid blocking until consumer closed --- .../pulsar/handlers/kop/coordinator/group/OffsetAcker.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java index e1b8abc3be..7c49ca63fd 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java @@ -35,7 +35,6 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -190,10 +189,7 @@ public void removeConsumer(String groupId, TopicPartition topicPartition) { if (consumerFuture != null) { consumerFuture.whenComplete((consumer, e) -> { if (e == null) { - try { - consumer.close(); - } catch (PulsarClientException ignored) { - } + consumer.closeAsync(); } else { log.error("Failed to create consumer for [group={}] [topic={}]: {}", groupId, topicPartition, e.getMessage()); From 52aaed94df72f125d980c68819beae2f557183c8 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 6 Apr 2021 17:35:57 +0800 Subject: [PATCH 04/11] Avoid potential NPE --- .../pulsar/handlers/kop/coordinator/group/OffsetAcker.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java index 7c49ca63fd..6feb11bc5e 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java @@ -138,7 +138,12 @@ public void ackOffsets(String groupId, Map of public void close(Set groupIds) { for (String groupId : groupIds) { - consumers.remove(groupId).forEach((topicPartition, consumerFuture) -> { + final Map>> + consumersToRemove = consumers.remove(groupId); + if (consumersToRemove == null) { + continue; + } + consumersToRemove.forEach((topicPartition, consumerFuture) -> { if (!consumerFuture.isDone()) { log.warn("Consumer of [group={}] [topic={}] is not done while being closed", groupId, topicPartition); From 5691ab227067456e7f6d4f2dead353c15af58990 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 6 Apr 2021 19:15:50 +0800 Subject: [PATCH 05/11] Add logs for flaky testDeleteClosedTopics --- .../pulsar/handlers/kop/BasicEndToEndKafkaTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java index 198702e6c7..bf18e21489 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java @@ -71,6 +71,7 @@ public void testDeleteClosedTopics() throws Exception { try { admin.topics().deletePartitionedTopic(topic); } catch (PulsarAdminException e) { + log.info("Failed to delete topic: {}", e.getMessage()); assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")); } @@ -79,6 +80,7 @@ public void testDeleteClosedTopics() throws Exception { try { admin.topics().deletePartitionedTopic(topic); } catch (PulsarAdminException e) { + log.info("Failed to delete topic: {}", e.getMessage()); assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")); } @@ -90,6 +92,7 @@ public void testDeleteClosedTopics() throws Exception { try { admin.topics().deletePartitionedTopic(topic); } catch (PulsarAdminException e) { + log.info("Failed to delete topic: {}", e.getMessage()); assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")); } From 89ba4f54c334397893ba5e0a1d04da8912bf6182 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 6 Apr 2021 20:20:00 +0800 Subject: [PATCH 06/11] Fix flaky testDeleteClosedTopics --- .../pulsar/handlers/kop/BasicEndToEndKafkaTest.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java index bf18e21489..d3b309f473 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java @@ -15,6 +15,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import io.streamnative.pulsar.handlers.kop.utils.KopTopic; import java.util.Arrays; @@ -70,18 +71,22 @@ public void testDeleteClosedTopics() throws Exception { try { admin.topics().deletePartitionedTopic(topic); + fail(); } catch (PulsarAdminException e) { log.info("Failed to delete topic: {}", e.getMessage()); - assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")); + assertTrue(e.getMessage().contains("Topic has active producers/subscriptions") + || e.getMessage().contains("Partitioned topic does not exist")); } final KafkaConsumer kafkaConsumer1 = newKafkaConsumer(topic, "sub-1"); assertEquals(receiveMessages(kafkaConsumer1, expectedMessages.size()), expectedMessages); try { admin.topics().deletePartitionedTopic(topic); + fail(); } catch (PulsarAdminException e) { log.info("Failed to delete topic: {}", e.getMessage()); - assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")); + assertTrue(e.getMessage().contains("Topic has active producers/subscriptions") + || e.getMessage().contains("Partitioned topic does not exist")); } final KafkaConsumer kafkaConsumer2 = newKafkaConsumer(topic, "sub-2"); @@ -91,9 +96,11 @@ public void testDeleteClosedTopics() throws Exception { kafkaConsumer1.close(); try { admin.topics().deletePartitionedTopic(topic); + fail(); } catch (PulsarAdminException e) { log.info("Failed to delete topic: {}", e.getMessage()); - assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")); + assertTrue(e.getMessage().contains("Topic has active producers/subscriptions") + || e.getMessage().contains("Partitioned topic does not exist")); } kafkaConsumer2.close(); From 7aee7cd49dad3a122dcd2547ef8b7287f94c1880 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 6 Apr 2021 22:33:47 +0800 Subject: [PATCH 07/11] Change log level to debug --- .../pulsar/handlers/kop/KafkaTopicManager.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java index bb44aa1296..ee41cc7a01 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java @@ -383,7 +383,9 @@ public void deReference(String topicName) { public CompletableFuture getGroupConsumers(String groupId, TopicPartition kafkaPartition) { if (StringUtils.isEmpty(groupId)) { - log.warn("Try to get group consumers with an empty group id"); + if (log.isDebugEnabled()) { + log.debug("Try to get group consumers with an empty group id"); + } return CompletableFuture.completedFuture(null); } @@ -392,7 +394,9 @@ public CompletableFuture getGroupConsumers(String groupId, TopicPartit final CompletableFuture> offsetConsumerFuture = offsetAcker.getConsumer(groupId, kafkaPartition); if (offsetConsumerFuture == null) { - log.warn("No offset consumer for [group={}] [topic={}]", groupId, kafkaPartition); + if (log.isDebugEnabled()) { + log.debug("No offset consumer for [group={}] [topic={}]", groupId, kafkaPartition); + } return CompletableFuture.completedFuture(null); } From 25274e518ab3eaefcaa54900ce0514d318fbd926 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 8 Apr 2021 11:46:26 +0800 Subject: [PATCH 08/11] Revert "Fix flaky testDeleteClosedTopics" This reverts commit 89ba4f54c334397893ba5e0a1d04da8912bf6182. --- .../pulsar/handlers/kop/BasicEndToEndKafkaTest.java | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java index d3b309f473..bf18e21489 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java @@ -15,7 +15,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; import io.streamnative.pulsar.handlers.kop.utils.KopTopic; import java.util.Arrays; @@ -71,22 +70,18 @@ public void testDeleteClosedTopics() throws Exception { try { admin.topics().deletePartitionedTopic(topic); - fail(); } catch (PulsarAdminException e) { log.info("Failed to delete topic: {}", e.getMessage()); - assertTrue(e.getMessage().contains("Topic has active producers/subscriptions") - || e.getMessage().contains("Partitioned topic does not exist")); + assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")); } final KafkaConsumer kafkaConsumer1 = newKafkaConsumer(topic, "sub-1"); assertEquals(receiveMessages(kafkaConsumer1, expectedMessages.size()), expectedMessages); try { admin.topics().deletePartitionedTopic(topic); - fail(); } catch (PulsarAdminException e) { log.info("Failed to delete topic: {}", e.getMessage()); - assertTrue(e.getMessage().contains("Topic has active producers/subscriptions") - || e.getMessage().contains("Partitioned topic does not exist")); + assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")); } final KafkaConsumer kafkaConsumer2 = newKafkaConsumer(topic, "sub-2"); @@ -96,11 +91,9 @@ public void testDeleteClosedTopics() throws Exception { kafkaConsumer1.close(); try { admin.topics().deletePartitionedTopic(topic); - fail(); } catch (PulsarAdminException e) { log.info("Failed to delete topic: {}", e.getMessage()); - assertTrue(e.getMessage().contains("Topic has active producers/subscriptions") - || e.getMessage().contains("Partitioned topic does not exist")); + assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")); } kafkaConsumer2.close(); From 08866d34300848b8dbca35da762ad0c7b47d4483 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 8 Apr 2021 11:46:42 +0800 Subject: [PATCH 09/11] Revert "Add logs for flaky testDeleteClosedTopics" This reverts commit 5691ab227067456e7f6d4f2dead353c15af58990. --- .../pulsar/handlers/kop/BasicEndToEndKafkaTest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java index bf18e21489..198702e6c7 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java @@ -71,7 +71,6 @@ public void testDeleteClosedTopics() throws Exception { try { admin.topics().deletePartitionedTopic(topic); } catch (PulsarAdminException e) { - log.info("Failed to delete topic: {}", e.getMessage()); assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")); } @@ -80,7 +79,6 @@ public void testDeleteClosedTopics() throws Exception { try { admin.topics().deletePartitionedTopic(topic); } catch (PulsarAdminException e) { - log.info("Failed to delete topic: {}", e.getMessage()); assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")); } @@ -92,7 +90,6 @@ public void testDeleteClosedTopics() throws Exception { try { admin.topics().deletePartitionedTopic(topic); } catch (PulsarAdminException e) { - log.info("Failed to delete topic: {}", e.getMessage()); assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")); } From c145b32987c4dcc110611f32339ee8fd5857e85f Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 8 Apr 2021 15:29:17 +0800 Subject: [PATCH 10/11] Fix CI failure --- .../streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java index 6b8867d48a..9225b3a5c6 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java @@ -66,6 +66,7 @@ public void testDeleteClosedTopics() throws Exception { final String topic = "test-delete-closed-topics"; final List expectedMessages = Collections.singletonList("msg"); + admin.topics().createPartitionedTopic(topic, 1); final KafkaProducer kafkaProducer = newKafkaProducer(); sendSingleMessages(kafkaProducer, topic, expectedMessages); From cb310d6c496e51f57a62006151f86973665b1c25 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 8 Apr 2021 15:51:24 +0800 Subject: [PATCH 11/11] Fix CI failure --- .../streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java index 9225b3a5c6..0e67acac88 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java @@ -105,6 +105,7 @@ public void testDeleteClosedTopics() throws Exception { } kafkaConsumer2.close(); + Thread.sleep(500); // Wait for consumers closed admin.topics().deletePartitionedTopic(topic); }