Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pr-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
run: mvn clean install -DskipTests

- name: tests module
run: mvn test -DfailIfNoTests=false '-Dtest=!KafkaIntegration*Test,!DistributedClusterTest' -pl tests
run: mvn test -DfailIfNoTests=false '-Dtest=!KafkaIntegration*Test' -pl tests

- name: package surefire artifacts
if: failure()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.channel.socket.SocketChannel;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator;
import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetAcker;
import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator;
Expand Down Expand Up @@ -169,9 +170,9 @@ public void unLoad(NamespaceBundle bundle) {
}
groupCoordinator.handleGroupEmigration(name.getPartitionIndex());
}
// remove cache when unload
KafkaTopicManager.removeTopicManagerCache(name.toString());
// deReference topic when unload
KopBrokerLookupManager.removeTopicManagerCache(name.toString());
KafkaTopicManager.deReference(name.toString());
}
} else {
log.error("Failed to get owned topic list for "
Expand Down Expand Up @@ -349,6 +350,10 @@ public void close() {
}
KafkaTopicManager.LOOKUP_CACHE.clear();
KopBrokerLookupManager.clear();
KafkaTopicManager.getConsumerTopicManagers().clear();
KafkaTopicManager.getReferences().clear();
KafkaTopicManager.getTopics().clear();
OffsetAcker.CONSUMERS.clear();
statsProvider.stop();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,17 @@ public class KafkaTopicManager {

// consumerTopicManagers for consumers cache.
@Getter
private final ConcurrentHashMap<String, CompletableFuture<KafkaTopicConsumerManager>> consumerTopicManagers;
private static final ConcurrentHashMap<String, CompletableFuture<KafkaTopicConsumerManager>>
consumerTopicManagers = new ConcurrentHashMap<>();

// cache for topics: <topicName, persistentTopic>, for removing producer
private final ConcurrentHashMap<String, CompletableFuture<PersistentTopic>> topics;
@Getter
private static final ConcurrentHashMap<String, CompletableFuture<PersistentTopic>>
topics = new ConcurrentHashMap<>();
// cache for references in PersistentTopic: <topicName, producer>
private final ConcurrentHashMap<String, Producer> references;
@Getter
private static final ConcurrentHashMap<String, Producer>
references = new ConcurrentHashMap<>();

private InternalServerCnx internalServerCnx;

Expand Down Expand Up @@ -90,10 +95,6 @@ public class KafkaTopicManager {
this.brokerService = pulsarService.getBrokerService();
this.internalServerCnx = new InternalServerCnx(requestHandler);

consumerTopicManagers = new ConcurrentHashMap<>();
topics = new ConcurrentHashMap<>();
references = new ConcurrentHashMap<>();

this.rwLock = new ReentrantReadWriteLock();
this.closed = false;

Expand Down Expand Up @@ -357,27 +358,27 @@ public Producer getReferenceProducer(String topicName) {
return references.get(topicName);
}

public void deReference(String topicName) {
public static void deReference(String topicName) {
try {
removeTopicManagerCache(topicName);

if (consumerTopicManagers.containsKey(topicName)) {
CompletableFuture<KafkaTopicConsumerManager> manager = consumerTopicManagers.get(topicName);
manager.get().close();
consumerTopicManagers.remove(topicName);
consumerTopicManagers.remove(topicName).get().close();
}

if (!topics.containsKey(topicName)) {
return;
}
PersistentTopic persistentTopic = topics.get(topicName).get();
if (persistentTopic != null) {
persistentTopic.removeProducer(references.get(topicName));
Producer producer = references.get(topicName);
if (persistentTopic != null && producer != null) {
persistentTopic.removeProducer(producer);
}
topics.remove(topicName);

OffsetAcker.removeOffsetAcker(topicName);
} catch (Exception e) {
log.error("[{}] Failed to close reference for individual topic {}. exception:",
requestHandler.ctx.channel(), topicName, e);
log.error("Failed to close reference for individual topic {}. exception:", topicName, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public CompletableFuture<AbstractResponse> handleFetch(
tcm = pair.getValue().get();
if (tcm == null) {
// remove null future cache from consumerTopicManagers
requestHandler.getTopicManager().getConsumerTopicManagers()
KafkaTopicManager.getConsumerTopicManagers()
.remove(KopTopic.toString(pair.getKey()));
throw new NullPointerException("topic not owned, and return null TCM in fetch.");
}
Expand Down Expand Up @@ -239,7 +239,7 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch,
"cursor.readEntry fail. deleteCursor");
} else {
// remove null future cache from consumerTopicManagers
requestHandler.getTopicManager().getConsumerTopicManagers()
KafkaTopicManager.getConsumerTopicManagers()
.remove(KopTopic.toString(kafkaTopic));
log.warn("Cursor deleted while TCM close.");
}
Expand Down Expand Up @@ -329,8 +329,8 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch,
cm.add(pair.getRight(), pair);
} else {
// remove null future cache from consumerTopicManagers
requestHandler.getTopicManager().getConsumerTopicManagers()
.remove(KopTopic.toString(kafkaPartition));
KafkaTopicManager.getConsumerTopicManagers()
.remove(KopTopic.toString(kafkaPartition));
log.warn("Cursor deleted while TCM close, failed to add cursor back to TCM.");
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
@Slf4j
public class OffsetAcker implements Closeable {

private static final Map<TopicPartition, CompletableFuture<Consumer<byte[]>>> EMPTY_CONSUMERS = new HashMap<>();
private static final Map<String, CompletableFuture<Consumer<byte[]>>> EMPTY_CONSUMERS = new HashMap<>();

private final ConsumerBuilder<byte[]> consumerBuilder;
private final BrokerService brokerService;
Expand All @@ -57,8 +57,8 @@ public class OffsetAcker implements Closeable {
// value is the created future of consumer.
// The consumer, whose subscription is the group id, is used for acknowledging message id cumulatively.
// This behavior is equivalent to committing offsets in Kafka.
private final Map<String, Map<TopicPartition, CompletableFuture<Consumer<byte[]>>>>
consumers = new ConcurrentHashMap<>();
public static final Map<String, Map<String, CompletableFuture<Consumer<byte[]>>>>
CONSUMERS = new ConcurrentHashMap<>();

public OffsetAcker(PulsarClientImpl pulsarClient) {
this.consumerBuilder = pulsarClient.newConsumer()
Expand Down Expand Up @@ -138,8 +138,8 @@ public void ackOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> of

public void close(Set<String> groupIds) {
for (String groupId : groupIds) {
final Map<TopicPartition, CompletableFuture<Consumer<byte[]>>>
consumersToRemove = consumers.remove(groupId);
final Map<String, CompletableFuture<Consumer<byte[]>>>
consumersToRemove = CONSUMERS.remove(groupId);
if (consumersToRemove == null) {
continue;
}
Expand All @@ -151,9 +151,7 @@ public void close(Set<String> groupIds) {
}
final Consumer<byte[]> consumer = consumerFuture.getNow(null);
if (consumer != null) {
if (log.isDebugEnabled()) {
log.debug("Try to close consumer of [group={}] [topic={}]", groupId, topicPartition.toString());
}
log.debug("Try to close consumer of [group={}] [topic={}]", groupId, topicPartition);
consumer.closeAsync();
}
});
Expand All @@ -162,35 +160,41 @@ public void close(Set<String> groupIds) {

@Override
public void close() {
log.info("close OffsetAcker with {} groupIds", consumers.size());
close(consumers.keySet());
log.info("close OffsetAcker with {} groupIds", CONSUMERS.size());
close(CONSUMERS.keySet());
}

@NonNull
public CompletableFuture<Consumer<byte[]>> getOrCreateConsumer(String groupId, TopicPartition topicPartition) {
Map<TopicPartition, CompletableFuture<Consumer<byte[]>>> group = consumers
Map<String, CompletableFuture<Consumer<byte[]>>> group = CONSUMERS
.computeIfAbsent(groupId, gid -> new ConcurrentHashMap<>());
KopTopic kopTopic = new KopTopic(topicPartition.topic());
String topicName = kopTopic.getPartitionName((topicPartition.partition()));
return group.computeIfAbsent(
topicPartition,
partition -> createConsumer(groupId, partition));
topicName,
name -> createConsumer(groupId, name));
}

@NonNull
private CompletableFuture<Consumer<byte[]>> createConsumer(String groupId, TopicPartition topicPartition) {
KopTopic kopTopic = new KopTopic(topicPartition.topic());
private CompletableFuture<Consumer<byte[]>> createConsumer(String groupId, String topicName) {
return consumerBuilder.clone()
.topic(kopTopic.getPartitionName(topicPartition.partition()))
.topic(topicName)
.subscriptionName(groupId)
.subscribeAsync();
}

public CompletableFuture<Consumer<byte[]>> getConsumer(String groupId, TopicPartition topicPartition) {
return consumers.getOrDefault(groupId, EMPTY_CONSUMERS).get(topicPartition);
KopTopic kopTopic = new KopTopic(topicPartition.topic());
String topicName = kopTopic.getPartitionName((topicPartition.partition()));
return CONSUMERS.getOrDefault(groupId, EMPTY_CONSUMERS).get(topicName);
}

public void removeConsumer(String groupId, TopicPartition topicPartition) {
KopTopic kopTopic = new KopTopic(topicPartition.topic());
String topicName = kopTopic.getPartitionName((topicPartition.partition()));

final CompletableFuture<Consumer<byte[]>> consumerFuture =
consumers.getOrDefault(groupId, EMPTY_CONSUMERS).remove(topicPartition);
CONSUMERS.getOrDefault(groupId, EMPTY_CONSUMERS).remove(topicName);
if (consumerFuture != null) {
consumerFuture.whenComplete((consumer, e) -> {
if (e == null) {
Expand All @@ -202,4 +206,18 @@ public void removeConsumer(String groupId, TopicPartition topicPartition) {
});
}
}

public static void removeOffsetAcker(String topicName) {
CONSUMERS.forEach((groupId, group) -> {
CompletableFuture<Consumer<byte[]> > consumerCompletableFuture = group.remove(topicName);
if (consumerCompletableFuture != null) {
consumerCompletableFuture.thenApply(Consumer::closeAsync).whenCompleteAsync((ignore, t) -> {
if (t != null) {
log.error("Failed to close offsetAcker consumer when remove partition {}.",
topicName);
}
});
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

import static org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME;
import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
import static org.junit.Assert.assertTrue;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand All @@ -31,7 +31,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Cleanup;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand Down Expand Up @@ -195,7 +194,7 @@ public void setup() throws Exception {
}


@AfterMethod
@AfterMethod(timeOut = 30000)
@Override
public void cleanup() throws Exception {
log.info("--- Shutting down ---");
Expand Down Expand Up @@ -298,19 +297,14 @@ public void testMutiBrokerAndCoordinator() throws Exception {
// 1. produce message with Kafka producer.
int totalMsgs = 50;
String messageStrPrefix = "Message_Kop_KafkaProduceKafkaConsume_" + partitionNumber + "_";
@Cleanup
KProducer kProducer = new KProducer(kafkaTopicName, false, getKafkaBrokerPort(), true);
kafkaPublishMessage(kProducer, totalMsgs, messageStrPrefix);

// 2. create 4 kafka consumer from different consumer groups.
// consume data and commit offsets for 4 consumer group.
@Cleanup
KConsumer kConsumer1 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-1");
@Cleanup
KConsumer kConsumer2 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-2");
@Cleanup
KConsumer kConsumer3 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-3");
@Cleanup
KConsumer kConsumer4 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-4");

List<TopicPartition> topicPartitions = IntStream.range(0, partitionNumber)
Expand Down Expand Up @@ -394,6 +388,12 @@ public void testMutiBrokerAndCoordinator() throws Exception {
assertTrue(records.isEmpty());
records = kConsumer4.getConsumer().poll(Duration.ofMillis(200));
assertTrue(records.isEmpty());

kProducer.close();
kConsumer1.close();
kConsumer2.close();
kConsumer3.close();
kConsumer4.close();
}

// Unit test for unload / reload user topic bundle, verify it works well.
Expand Down Expand Up @@ -421,15 +421,12 @@ public void testMutiBrokerUnloadReload() throws Exception {
// 2. produce consume message with Kafka producer.
int totalMsgs = 50;
String messageStrPrefix = "Message_" + kafkaTopicName + "_";
@Cleanup
KProducer kProducer = new KProducer(kafkaTopicName, false, getKafkaBrokerPort(), true);
kafkaPublishMessage(kProducer, totalMsgs, messageStrPrefix);

List<TopicPartition> topicPartitions = IntStream.range(0, partitionNumber)
.mapToObj(i -> new TopicPartition(kafkaTopicName, i)).collect(Collectors.toList());
@Cleanup
KConsumer kConsumer1 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-1");
@Cleanup
KConsumer kConsumer2 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-2");
log.info("Partition size: {}, will consume and commitOffset for 2 consumers",
topicPartitions.size());
Expand All @@ -445,6 +442,10 @@ public void testMutiBrokerUnloadReload() throws Exception {
kafkaPublishMessage(kProducer, totalMsgs, messageStrPrefix);
kafkaConsumeCommitMessage(kConsumer1, totalMsgs, messageStrPrefix, topicPartitions);
kafkaConsumeCommitMessage(kConsumer2, totalMsgs, messageStrPrefix, topicPartitions);

kProducer.close();
kConsumer1.close();
kConsumer2.close();
}

@Test(timeOut = 30000)
Expand All @@ -470,15 +471,12 @@ public void testOneBrokerShutdown() throws Exception {
// 2. produce consume message with Kafka producer.
int totalMsgs = 50;
String messageStrPrefix = "Message_" + kafkaTopicName + "_";
@Cleanup
KProducer kProducer = new KProducer(kafkaTopicName, false, getKafkaBrokerPort(), true);
kafkaPublishMessage(kProducer, totalMsgs, messageStrPrefix);

List<TopicPartition> topicPartitions = IntStream.range(0, partitionNumber)
.mapToObj(i -> new TopicPartition(kafkaTopicName, i)).collect(Collectors.toList());
@Cleanup
KConsumer kConsumer1 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-1");
@Cleanup
KConsumer kConsumer2 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-2");
log.info("Partition size: {}, will consume and commitOffset for 2 consumers",
topicPartitions.size());
Expand All @@ -494,5 +492,9 @@ public void testOneBrokerShutdown() throws Exception {
kafkaPublishMessage(kProducer, totalMsgs, messageStrPrefix);
kafkaConsumeCommitMessage(kConsumer1, totalMsgs, messageStrPrefix, topicPartitions);
kafkaConsumeCommitMessage(kConsumer2, totalMsgs, messageStrPrefix, topicPartitions);

kProducer.close();
kConsumer1.close();
kConsumer2.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
doReturn(new ZKMetadataStore(mockZooKeeper)).when(pulsar).createLocalMetadataStore();
doReturn(new ZKMetadataStore(mockZooKeeper)).when(pulsar).createConfigurationMetadataStore();
doReturn(mockZooKeeper).when(pulsar).getZkClient();

Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar));
doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
Expand Down Expand Up @@ -463,7 +464,7 @@ public KProducer(String topic, Boolean isAsync, String host,
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoKafkaOnPulsarProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySer);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSer);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);

if (retry) {
props.put(ProducerConfig.RETRIES_CONFIG, 3);
Expand Down