Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

import static com.google.common.base.Preconditions.checkState;

import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetAcker;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -27,6 +29,7 @@

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -379,14 +382,25 @@ public void deReference(String topicName) {
}

public CompletableFuture<Consumer> getGroupConsumers(String groupId, TopicPartition kafkaPartition) {
// make sure internal consumer existed
CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
if (groupId == null || groupId.isEmpty() || !requestHandler.getGroupCoordinator()
.getOffsetAcker().getConsumer(groupId, kafkaPartition).isDone()) {
log.warn("not get consumer for group {} this time", groupId);
consumerFuture.complete(null);
return consumerFuture;
if (StringUtils.isEmpty(groupId)) {
if (log.isDebugEnabled()) {
log.debug("Try to get group consumers with an empty group id");
}
return CompletableFuture.completedFuture(null);
}

// The future of the offset consumer should be created before in `GroupCoordinator#handleSyncGroup`
final OffsetAcker offsetAcker = requestHandler.getGroupCoordinator().getOffsetAcker();
final CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> offsetConsumerFuture =
offsetAcker.getConsumer(groupId, kafkaPartition);
if (offsetConsumerFuture == null) {
if (log.isDebugEnabled()) {
log.debug("No offset consumer for [group={}] [topic={}]", groupId, kafkaPartition);
}
return CompletableFuture.completedFuture(null);
}

CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
return CONSUMERS_CACHE.computeIfAbsent(groupId, group -> {
try {
TopicName topicName = TopicName.get(KopTopic.toString(kafkaPartition));
Expand All @@ -396,10 +410,28 @@ public CompletableFuture<Consumer> getGroupConsumers(String groupId, TopicPartit
.getBrokerService().getMultiLayerTopicsMap()
.get(topicName.getNamespace()).get(namespaceBundle.toString())
.get(topicName.toString());
// only one consumer existed for internal subscription
Consumer consumer = persistentTopic.getSubscriptions()
.get(groupId).getDispatcher().getConsumers().get(0);
consumerFuture.complete(consumer);
// The `Consumer` in broker side won't be created until the `Consumer` in client side subscribes
// successfully, so we should wait until offset consumer's future is completed.
offsetConsumerFuture.whenComplete((ignored, e) -> {
if (e != null) {
log.warn("Failed to create offset consumer for [group={}] [topic={}]: {}",
groupId, kafkaPartition, e.getMessage());
offsetAcker.removeConsumer(groupId, kafkaPartition);
// Here we don't return because the `Consumer` in broker side may be created already
}
// Double check for if the `Consumer` in broker side has been created
final List<Consumer> consumers =
persistentTopic.getSubscriptions().get(groupId).getDispatcher().getConsumers();
if (consumers.isEmpty()) {
log.error("There's no internal consumer for [group={}]", groupId);
consumerFuture.complete(null);
return;
}
// only one consumer existed for internal subscription
final Consumer consumer = persistentTopic.getSubscriptions()
.get(groupId).getDispatcher().getConsumers().get(0);
consumerFuture.complete(consumer);
});
} catch (Exception e) {
log.error("get topic error", e);
consumerFuture.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import io.streamnative.pulsar.handlers.kop.utils.OffsetSearchPredicate;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
Expand All @@ -43,9 +45,21 @@
@Slf4j
public class OffsetAcker implements Closeable {

private static final Map<TopicPartition, CompletableFuture<Consumer<byte[]>>> EMPTY_CONSUMERS = new HashMap<>();

private final ConsumerBuilder<byte[]> consumerBuilder;
private final BrokerService brokerService;

// A map whose
// key is group id,
// value is a map whose
// key is the partition,
// value is the created future of consumer.
// The consumer, whose subscription is the group id, is used for acknowledging message id cumulatively.
// This behavior is equivalent to committing offsets in Kafka.
private final Map<String, Map<TopicPartition, CompletableFuture<Consumer<byte[]>>>>
consumers = new ConcurrentHashMap<>();

public OffsetAcker(PulsarClientImpl pulsarClient) {
this.consumerBuilder = pulsarClient.newConsumer()
.receiverQueueSize(0)
Expand All @@ -60,16 +74,13 @@ public OffsetAcker(PulsarClientImpl pulsarClient, BrokerService brokerService) {
this.brokerService = brokerService;
}

// map off consumser: <groupId, consumers>
Map<String, Map<TopicPartition, CompletableFuture<Consumer<byte[]>>>> consumers = new ConcurrentHashMap<>();

public void addOffsetsTracker(String groupId, byte[] assignment) {
ByteBuffer assignBuffer = ByteBuffer.wrap(assignment);
Assignment assign = ConsumerProtocol.deserializeAssignment(assignBuffer);
if (log.isDebugEnabled()) {
log.debug(" Add offsets after sync group: {}", assign.toString());
}
assign.partitions().forEach(topicPartition -> getConsumer(groupId, topicPartition));
assign.partitions().forEach(topicPartition -> getOrCreateConsumer(groupId, topicPartition));
}

public void ackOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsetMetadata) {
Expand All @@ -81,11 +92,13 @@ public void ackOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> of
}
offsetMetadata.forEach(((topicPartition, offsetAndMetadata) -> {
// 1. get consumer, then do ackCumulative
CompletableFuture<Consumer<byte[]>> consumerFuture = getConsumer(groupId, topicPartition);
CompletableFuture<Consumer<byte[]>> consumerFuture = getOrCreateConsumer(groupId, topicPartition);

consumerFuture.whenComplete((consumer, throwable) -> {
if (throwable != null) {
log.warn("Error when get consumer for offset ack:", throwable);
log.warn("Failed to create offset consumer for [group={}] [topic={}]: {}",
groupId, topicPartition, throwable.getMessage());
removeConsumer(groupId, topicPartition);
return;
}
KopTopic kopTopic = new KopTopic(topicPartition.topic());
Expand Down Expand Up @@ -125,7 +138,12 @@ public void ackOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> of

public void close(Set<String> groupIds) {
for (String groupId : groupIds) {
consumers.remove(groupId).forEach((topicPartition, consumerFuture) -> {
final Map<TopicPartition, CompletableFuture<Consumer<byte[]>>>
consumersToRemove = consumers.remove(groupId);
if (consumersToRemove == null) {
continue;
}
consumersToRemove.forEach((topicPartition, consumerFuture) -> {
if (!consumerFuture.isDone()) {
log.warn("Consumer of [group={}] [topic={}] is not done while being closed",
groupId, topicPartition);
Expand All @@ -148,14 +166,16 @@ public void close() {
close(consumers.keySet());
}

public CompletableFuture<Consumer<byte[]>> getConsumer(String groupId, TopicPartition topicPartition) {
@NonNull
public CompletableFuture<Consumer<byte[]>> getOrCreateConsumer(String groupId, TopicPartition topicPartition) {
Map<TopicPartition, CompletableFuture<Consumer<byte[]>>> group = consumers
.computeIfAbsent(groupId, gid -> new ConcurrentHashMap<>());
return group.computeIfAbsent(
topicPartition,
partition -> createConsumer(groupId, partition));
}

@NonNull
private CompletableFuture<Consumer<byte[]>> createConsumer(String groupId, TopicPartition topicPartition) {
KopTopic kopTopic = new KopTopic(topicPartition.topic());
return consumerBuilder.clone()
Expand All @@ -164,4 +184,22 @@ private CompletableFuture<Consumer<byte[]>> createConsumer(String groupId, Topic
.subscribeAsync();
}

public CompletableFuture<Consumer<byte[]>> getConsumer(String groupId, TopicPartition topicPartition) {
return consumers.getOrDefault(groupId, EMPTY_CONSUMERS).get(topicPartition);
}

public void removeConsumer(String groupId, TopicPartition topicPartition) {
final CompletableFuture<Consumer<byte[]>> consumerFuture =
consumers.getOrDefault(groupId, EMPTY_CONSUMERS).remove(topicPartition);
if (consumerFuture != null) {
consumerFuture.whenComplete((consumer, e) -> {
if (e == null) {
consumer.closeAsync();
} else {
log.error("Failed to create consumer for [group={}] [topic={}]: {}",
groupId, topicPartition, e.getMessage());
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.testng.annotations.Test;

/**
Expand Down Expand Up @@ -59,6 +66,7 @@ public void testDeleteClosedTopics() throws Exception {
final String topic = "test-delete-closed-topics";
final List<String> expectedMessages = Collections.singletonList("msg");

admin.topics().createPartitionedTopic(topic, 1);
final KafkaProducer<String, String> kafkaProducer = newKafkaProducer();
sendSingleMessages(kafkaProducer, topic, expectedMessages);

Expand Down Expand Up @@ -97,6 +105,39 @@ public void testDeleteClosedTopics() throws Exception {
}

kafkaConsumer2.close();
Thread.sleep(500); // Wait for consumers closed
admin.topics().deletePartitionedTopic(topic);
}

@Test(timeOut = 20000)
public void testKafkaConsumerMetrics() throws Exception {
final String topic = "test-kafka-consumer-metrics";
final String group = "group-test-kafka-consumer-metrics";
final List<String> expectedMessages = Arrays.asList("A", "B", "C");

@Cleanup
final KafkaProducer<String, String> kafkaProducer = newKafkaProducer();
sendSingleMessages(kafkaProducer, topic, expectedMessages);

final Properties consumerProps = newKafkaConsumerProperties();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, group);
@Cleanup
final KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
kafkaConsumer.subscribe(Collections.singleton(topic));
List<String> kafkaReceives = receiveMessages(kafkaConsumer, expectedMessages.size());
assertEquals(kafkaReceives, expectedMessages);

// Check stats
final TopicName topicName = TopicName.get(KopTopic.toString(new TopicPartition(topic, 0)));
final PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getMultiLayerTopicsMap()
.get(topicName.getNamespace())
.get(pulsar.getNamespaceService().getBundle(topicName).toString())
.get(topicName.toString());
final ConsumerStats stats =
persistentTopic.getSubscriptions().get(group).getDispatcher().getConsumers().get(0).getStats();
log.info("Consumer stats: [msgOutCounter={}] [bytesOutCounter={}]",
stats.msgOutCounter, stats.bytesOutCounter);
assertEquals(stats.msgOutCounter, expectedMessages.size());
assertTrue(stats.bytesOutCounter > 0);
}
}