Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pr-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
run: mvn spotbugs:check

- name: test after build
run: mvn test -DfailIfNoTests=false '-Dtest=!KafkaIntegrationTest,!DistributedClusterTest'
run: mvn test -DfailIfNoTests=false '-Dtest=!KafkaIntegrationTest'

- name: KafkaIntegrationTest
run: mvn test '-Dtest=KafkaIntegrationTest' -pl tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.channel.socket.SocketChannel;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator;
import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetAcker;
import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetConfig;
import io.streamnative.pulsar.handlers.kop.utils.ConfigurationUtils;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
Expand Down Expand Up @@ -158,6 +159,8 @@ public void unLoad(NamespaceBundle bundle) {
}
// remove cache when unload
KafkaTopicManager.removeTopicManagerCache(name.toString());
// deReference topic when unload
KafkaTopicManager.deReference(name.toString());
}
} else {
log.error("Failed to get owned topic list for "
Expand Down Expand Up @@ -314,6 +317,10 @@ public void close() {
groupCoordinator.shutdown();
}
KafkaTopicManager.LOOKUP_CACHE.clear();
KafkaTopicManager.getConsumerTopicManagers().clear();
KafkaTopicManager.getReferences().clear();
KafkaTopicManager.getTopics().clear();
OffsetAcker.CONSUMERS.clear();
}

public void initGroupCoordinator(BrokerService service) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static com.google.common.base.Preconditions.checkState;

import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetAcker;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -49,12 +50,16 @@ public class KafkaTopicManager {

// consumerTopicManagers for consumers cache.
@Getter
private final ConcurrentHashMap<String, CompletableFuture<KafkaTopicConsumerManager>> consumerTopicManagers;
private static final ConcurrentHashMap<String, CompletableFuture<KafkaTopicConsumerManager>>
consumerTopicManagers = new ConcurrentHashMap<>();

// cache for topics: <topicName, persistentTopic>, for removing producer
private final ConcurrentHashMap<String, CompletableFuture<PersistentTopic>> topics;
@Getter
private static final ConcurrentHashMap<String, CompletableFuture<PersistentTopic>>
topics = new ConcurrentHashMap<>();
// cache for references in PersistentTopic: <topicName, producer>
private final ConcurrentHashMap<String, Producer> references;
@Getter
private static final ConcurrentHashMap<String, Producer> references = new ConcurrentHashMap<>();

private InternalServerCnx internalServerCnx;

Expand All @@ -80,10 +85,6 @@ public class KafkaTopicManager {
this.brokerService = pulsarService.getBrokerService();
this.internalServerCnx = new InternalServerCnx(requestHandler);

consumerTopicManagers = new ConcurrentHashMap<>();
topics = new ConcurrentHashMap<>();
references = new ConcurrentHashMap<>();

this.rwLock = new ReentrantReadWriteLock();
this.closed = false;

Expand Down Expand Up @@ -363,27 +364,27 @@ public Producer getReferenceProducer(String topicName) {
return references.get(topicName);
}

public void deReference(String topicName) {
public static void deReference(String topicName) {
try {
removeTopicManagerCache(topicName);

if (consumerTopicManagers.containsKey(topicName)) {
CompletableFuture<KafkaTopicConsumerManager> manager = consumerTopicManagers.get(topicName);
manager.get().close();
consumerTopicManagers.remove(topicName);
consumerTopicManagers.remove(topicName).get().close();
}

if (!topics.containsKey(topicName)) {
return;
}
PersistentTopic persistentTopic = topics.get(topicName).get();
if (persistentTopic != null) {
persistentTopic.removeProducer(references.get(topicName));
Producer producer = references.get(topicName);
if (persistentTopic != null && producer != null) {
persistentTopic.removeProducer(producer);
}
topics.remove(topicName);

OffsetAcker.removeOffsetAcker(topicName);
} catch (Exception e) {
log.error("[{}] Failed to close reference for individual topic {}. exception:",
requestHandler.ctx.channel(), topicName, e);
log.error("Failed to close reference for individual topic {}. exception:", topicName, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public CompletableFuture<AbstractResponse> handleFetch(
tcm = pair.getValue().get();
if (tcm == null) {
// remove null future cache from consumerTopicManagers
requestHandler.getTopicManager().getConsumerTopicManagers()
KafkaTopicManager.getConsumerTopicManagers()
.remove(KopTopic.toString(pair.getKey()));
throw new NullPointerException("topic not owned, and return null TCM in fetch.");
}
Expand Down Expand Up @@ -230,7 +230,7 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch,
"cursor.readEntry fail. deleteCursor");
} else {
// remove null future cache from consumerTopicManagers
requestHandler.getTopicManager().getConsumerTopicManagers()
KafkaTopicManager.getConsumerTopicManagers()
.remove(KopTopic.toString(kafkaTopic));
log.warn("Cursor deleted while TCM close.");
}
Expand Down Expand Up @@ -292,7 +292,7 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch,
cm.add(pair.getRight(), pair);
} else {
// remove null future cache from consumerTopicManagers
requestHandler.getTopicManager().getConsumerTopicManagers()
KafkaTopicManager.getConsumerTopicManagers()
.remove(KopTopic.toString(kafkaPartition));
log.warn("Cursor deleted while TCM close, failed to add cursor back to TCM.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public OffsetAcker(PulsarClientImpl pulsarClient) {
}

// map off consumser: <groupId, consumers>
Map<String, Map<TopicPartition, CompletableFuture<Consumer<byte[]>>>> consumers = new ConcurrentHashMap<>();
public static final Map<String, Map<String, CompletableFuture<Consumer<byte[]>>>>
CONSUMERS = new ConcurrentHashMap<>();

public void addOffsetsTracker(String groupId, byte[] assignment) {
ByteBuffer assignBuffer = ByteBuffer.wrap(assignment);
Expand Down Expand Up @@ -81,7 +82,7 @@ public void ackOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> of
}

public void close(Set<String> groupIds) {
groupIds.forEach(groupId -> consumers.get(groupId).values().forEach(consumerFuture -> {
groupIds.forEach(groupId -> CONSUMERS.get(groupId).values().forEach(consumerFuture -> {
consumerFuture.whenComplete((consumer, throwable) -> {
if (throwable != null) {
log.warn("Error when get consumer for consumer group close:", throwable);
Expand All @@ -99,24 +100,37 @@ public void close(Set<String> groupIds) {

@Override
public void close() {
log.info("close OffsetAcker with {} groupIds", consumers.size());
close(consumers.keySet());
log.info("close OffsetAcker with {} groupIds", CONSUMERS.size());
close(CONSUMERS.keySet());
}

private CompletableFuture<Consumer<byte[]>> getConsumer(String groupId, TopicPartition topicPartition) {
Map<TopicPartition, CompletableFuture<Consumer<byte[]>>> group = consumers
String topicName = new KopTopic(topicPartition.topic()).getPartitionName(topicPartition.partition());
Map<String, CompletableFuture<Consumer<byte[]>>> group = CONSUMERS
.computeIfAbsent(groupId, gid -> new ConcurrentHashMap<>());
return group.computeIfAbsent(
topicPartition,
partition -> createConsumer(groupId, partition));
topicName,
name -> createConsumer(groupId, name));
}

private CompletableFuture<Consumer<byte[]>> createConsumer(String groupId, TopicPartition topicPartition) {
KopTopic kopTopic = new KopTopic(topicPartition.topic());
private CompletableFuture<Consumer<byte[]>> createConsumer(String groupId, String topicName) {
return consumerBuilder.clone()
.topic(kopTopic.getPartitionName(topicPartition.partition()))
.topic(topicName)
.subscriptionName(groupId)
.subscribeAsync();
}

public static void removeOffsetAcker(String topicName) {
CONSUMERS.forEach((groupId, group) -> {
CompletableFuture<Consumer<byte[]> > consumerCompletableFuture = group.remove(topicName);
if (consumerCompletableFuture != null) {
consumerCompletableFuture.thenApply(Consumer::closeAsync).whenCompleteAsync((ignore, t) -> {
if (t != null) {
log.error("Failed to close offsetAcker consumer when remove partition {}.",
topicName);
}
});
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ public KProducer(String topic, Boolean isAsync, String host,
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoKafkaOnPulsarProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySer);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSer);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);

if (retry) {
props.put(ProducerConfig.RETRIES_CONFIG, 3);
Expand Down