diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java index 25aa6c6a65..ee41cc7a01 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java @@ -15,8 +15,10 @@ import static com.google.common.base.Preconditions.checkState; +import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetAcker; import io.streamnative.pulsar.handlers.kop.utils.KopTopic; import java.net.InetSocketAddress; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -27,6 +29,7 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.TopicPartition; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -379,14 +382,25 @@ public void deReference(String topicName) { } public CompletableFuture getGroupConsumers(String groupId, TopicPartition kafkaPartition) { - // make sure internal consumer existed - CompletableFuture consumerFuture = new CompletableFuture<>(); - if (groupId == null || groupId.isEmpty() || !requestHandler.getGroupCoordinator() - .getOffsetAcker().getConsumer(groupId, kafkaPartition).isDone()) { - log.warn("not get consumer for group {} this time", groupId); - consumerFuture.complete(null); - return consumerFuture; + if (StringUtils.isEmpty(groupId)) { + if (log.isDebugEnabled()) { + log.debug("Try to get group consumers with an empty group id"); + } + return CompletableFuture.completedFuture(null); } + + // The future of the offset consumer should be created before in `GroupCoordinator#handleSyncGroup` + final OffsetAcker offsetAcker = requestHandler.getGroupCoordinator().getOffsetAcker(); + final CompletableFuture> offsetConsumerFuture = + offsetAcker.getConsumer(groupId, kafkaPartition); + if (offsetConsumerFuture == null) { + if (log.isDebugEnabled()) { + log.debug("No offset consumer for [group={}] [topic={}]", groupId, kafkaPartition); + } + return CompletableFuture.completedFuture(null); + } + + CompletableFuture consumerFuture = new CompletableFuture<>(); return CONSUMERS_CACHE.computeIfAbsent(groupId, group -> { try { TopicName topicName = TopicName.get(KopTopic.toString(kafkaPartition)); @@ -396,10 +410,28 @@ public CompletableFuture getGroupConsumers(String groupId, TopicPartit .getBrokerService().getMultiLayerTopicsMap() .get(topicName.getNamespace()).get(namespaceBundle.toString()) .get(topicName.toString()); - // only one consumer existed for internal subscription - Consumer consumer = persistentTopic.getSubscriptions() - .get(groupId).getDispatcher().getConsumers().get(0); - consumerFuture.complete(consumer); + // The `Consumer` in broker side won't be created until the `Consumer` in client side subscribes + // successfully, so we should wait until offset consumer's future is completed. + offsetConsumerFuture.whenComplete((ignored, e) -> { + if (e != null) { + log.warn("Failed to create offset consumer for [group={}] [topic={}]: {}", + groupId, kafkaPartition, e.getMessage()); + offsetAcker.removeConsumer(groupId, kafkaPartition); + // Here we don't return because the `Consumer` in broker side may be created already + } + // Double check for if the `Consumer` in broker side has been created + final List consumers = + persistentTopic.getSubscriptions().get(groupId).getDispatcher().getConsumers(); + if (consumers.isEmpty()) { + log.error("There's no internal consumer for [group={}]", groupId); + consumerFuture.complete(null); + return; + } + // only one consumer existed for internal subscription + final Consumer consumer = persistentTopic.getSubscriptions() + .get(groupId).getDispatcher().getConsumers().get(0); + consumerFuture.complete(consumer); + }); } catch (Exception e) { log.error("get topic error", e); consumerFuture.complete(null); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java index d898de0f3c..6feb11bc5e 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java @@ -18,12 +18,14 @@ import io.streamnative.pulsar.handlers.kop.utils.OffsetSearchPredicate; import java.io.Closeable; import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; @@ -43,9 +45,21 @@ @Slf4j public class OffsetAcker implements Closeable { + private static final Map>> EMPTY_CONSUMERS = new HashMap<>(); + private final ConsumerBuilder consumerBuilder; private final BrokerService brokerService; + // A map whose + // key is group id, + // value is a map whose + // key is the partition, + // value is the created future of consumer. + // The consumer, whose subscription is the group id, is used for acknowledging message id cumulatively. + // This behavior is equivalent to committing offsets in Kafka. + private final Map>>> + consumers = new ConcurrentHashMap<>(); + public OffsetAcker(PulsarClientImpl pulsarClient) { this.consumerBuilder = pulsarClient.newConsumer() .receiverQueueSize(0) @@ -60,16 +74,13 @@ public OffsetAcker(PulsarClientImpl pulsarClient, BrokerService brokerService) { this.brokerService = brokerService; } - // map off consumser: - Map>>> consumers = new ConcurrentHashMap<>(); - public void addOffsetsTracker(String groupId, byte[] assignment) { ByteBuffer assignBuffer = ByteBuffer.wrap(assignment); Assignment assign = ConsumerProtocol.deserializeAssignment(assignBuffer); if (log.isDebugEnabled()) { log.debug(" Add offsets after sync group: {}", assign.toString()); } - assign.partitions().forEach(topicPartition -> getConsumer(groupId, topicPartition)); + assign.partitions().forEach(topicPartition -> getOrCreateConsumer(groupId, topicPartition)); } public void ackOffsets(String groupId, Map offsetMetadata) { @@ -81,11 +92,13 @@ public void ackOffsets(String groupId, Map of } offsetMetadata.forEach(((topicPartition, offsetAndMetadata) -> { // 1. get consumer, then do ackCumulative - CompletableFuture> consumerFuture = getConsumer(groupId, topicPartition); + CompletableFuture> consumerFuture = getOrCreateConsumer(groupId, topicPartition); consumerFuture.whenComplete((consumer, throwable) -> { if (throwable != null) { - log.warn("Error when get consumer for offset ack:", throwable); + log.warn("Failed to create offset consumer for [group={}] [topic={}]: {}", + groupId, topicPartition, throwable.getMessage()); + removeConsumer(groupId, topicPartition); return; } KopTopic kopTopic = new KopTopic(topicPartition.topic()); @@ -125,7 +138,12 @@ public void ackOffsets(String groupId, Map of public void close(Set groupIds) { for (String groupId : groupIds) { - consumers.remove(groupId).forEach((topicPartition, consumerFuture) -> { + final Map>> + consumersToRemove = consumers.remove(groupId); + if (consumersToRemove == null) { + continue; + } + consumersToRemove.forEach((topicPartition, consumerFuture) -> { if (!consumerFuture.isDone()) { log.warn("Consumer of [group={}] [topic={}] is not done while being closed", groupId, topicPartition); @@ -148,7 +166,8 @@ public void close() { close(consumers.keySet()); } - public CompletableFuture> getConsumer(String groupId, TopicPartition topicPartition) { + @NonNull + public CompletableFuture> getOrCreateConsumer(String groupId, TopicPartition topicPartition) { Map>> group = consumers .computeIfAbsent(groupId, gid -> new ConcurrentHashMap<>()); return group.computeIfAbsent( @@ -156,6 +175,7 @@ public CompletableFuture> getConsumer(String groupId, TopicPart partition -> createConsumer(groupId, partition)); } + @NonNull private CompletableFuture> createConsumer(String groupId, TopicPartition topicPartition) { KopTopic kopTopic = new KopTopic(topicPartition.topic()); return consumerBuilder.clone() @@ -164,4 +184,22 @@ private CompletableFuture> createConsumer(String groupId, Topic .subscribeAsync(); } + public CompletableFuture> getConsumer(String groupId, TopicPartition topicPartition) { + return consumers.getOrDefault(groupId, EMPTY_CONSUMERS).get(topicPartition); + } + + public void removeConsumer(String groupId, TopicPartition topicPartition) { + final CompletableFuture> consumerFuture = + consumers.getOrDefault(groupId, EMPTY_CONSUMERS).remove(topicPartition); + if (consumerFuture != null) { + consumerFuture.whenComplete((consumer, e) -> { + if (e == null) { + consumer.closeAsync(); + } else { + log.error("Failed to create consumer for [group={}] [topic={}]: {}", + groupId, topicPartition, e.getMessage()); + } + }); + } + } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java index c6639b06a1..0e67acac88 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java @@ -17,14 +17,21 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import io.streamnative.pulsar.handlers.kop.utils.KopTopic; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Properties; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.TopicPartition; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ConsumerStats; import org.testng.annotations.Test; /** @@ -59,6 +66,7 @@ public void testDeleteClosedTopics() throws Exception { final String topic = "test-delete-closed-topics"; final List expectedMessages = Collections.singletonList("msg"); + admin.topics().createPartitionedTopic(topic, 1); final KafkaProducer kafkaProducer = newKafkaProducer(); sendSingleMessages(kafkaProducer, topic, expectedMessages); @@ -97,6 +105,39 @@ public void testDeleteClosedTopics() throws Exception { } kafkaConsumer2.close(); + Thread.sleep(500); // Wait for consumers closed admin.topics().deletePartitionedTopic(topic); } + + @Test(timeOut = 20000) + public void testKafkaConsumerMetrics() throws Exception { + final String topic = "test-kafka-consumer-metrics"; + final String group = "group-test-kafka-consumer-metrics"; + final List expectedMessages = Arrays.asList("A", "B", "C"); + + @Cleanup + final KafkaProducer kafkaProducer = newKafkaProducer(); + sendSingleMessages(kafkaProducer, topic, expectedMessages); + + final Properties consumerProps = newKafkaConsumerProperties(); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, group); + @Cleanup + final KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProps); + kafkaConsumer.subscribe(Collections.singleton(topic)); + List kafkaReceives = receiveMessages(kafkaConsumer, expectedMessages.size()); + assertEquals(kafkaReceives, expectedMessages); + + // Check stats + final TopicName topicName = TopicName.get(KopTopic.toString(new TopicPartition(topic, 0))); + final PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getMultiLayerTopicsMap() + .get(topicName.getNamespace()) + .get(pulsar.getNamespaceService().getBundle(topicName).toString()) + .get(topicName.toString()); + final ConsumerStats stats = + persistentTopic.getSubscriptions().get(group).getDispatcher().getConsumers().get(0).getStats(); + log.info("Consumer stats: [msgOutCounter={}] [bytesOutCounter={}]", + stats.msgOutCounter, stats.bytesOutCounter); + assertEquals(stats.msgOutCounter, expectedMessages.size()); + assertTrue(stats.bytesOutCounter > 0); + } }