From 954740231800fe0ca937da8a200882880f07eeaf Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 22 Jul 2021 21:28:46 +0800 Subject: [PATCH] Fix topic unload bug --- .github/workflows/pr-test.yml | 2 +- .../handlers/kop/KafkaProtocolHandler.java | 7 ++++ .../handlers/kop/KafkaTopicManager.java | 31 +++++++++-------- .../handlers/kop/MessageFetchContext.java | 6 ++-- .../kop/coordinator/group/OffsetAcker.java | 34 +++++++++++++------ .../kop/KopProtocolHandlerTestBase.java | 2 +- 6 files changed, 52 insertions(+), 30 deletions(-) diff --git a/.github/workflows/pr-test.yml b/.github/workflows/pr-test.yml index f64c139b95..4a5e514fb8 100644 --- a/.github/workflows/pr-test.yml +++ b/.github/workflows/pr-test.yml @@ -35,7 +35,7 @@ jobs: run: mvn spotbugs:check - name: test after build - run: mvn test -DfailIfNoTests=false '-Dtest=!KafkaIntegrationTest,!DistributedClusterTest' + run: mvn test -DfailIfNoTests=false '-Dtest=!KafkaIntegrationTest' - name: KafkaIntegrationTest run: mvn test '-Dtest=KafkaIntegrationTest' -pl tests diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java index 44b5e43970..6a3ed488b3 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java @@ -22,6 +22,7 @@ import io.netty.channel.socket.SocketChannel; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupConfig; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator; +import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetAcker; import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetConfig; import io.streamnative.pulsar.handlers.kop.utils.ConfigurationUtils; import io.streamnative.pulsar.handlers.kop.utils.KopTopic; @@ -158,6 +159,8 @@ public void unLoad(NamespaceBundle bundle) { } // remove cache when unload KafkaTopicManager.removeTopicManagerCache(name.toString()); + // deReference topic when unload + KafkaTopicManager.deReference(name.toString()); } } else { log.error("Failed to get owned topic list for " @@ -314,6 +317,10 @@ public void close() { groupCoordinator.shutdown(); } KafkaTopicManager.LOOKUP_CACHE.clear(); + KafkaTopicManager.getConsumerTopicManagers().clear(); + KafkaTopicManager.getReferences().clear(); + KafkaTopicManager.getTopics().clear(); + OffsetAcker.CONSUMERS.clear(); } public void initGroupCoordinator(BrokerService service) throws Exception { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java index 511778dea8..9bfc1e4a90 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java @@ -15,6 +15,7 @@ import static com.google.common.base.Preconditions.checkState; +import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetAcker; import java.net.InetSocketAddress; import java.util.Map; import java.util.Optional; @@ -49,12 +50,16 @@ public class KafkaTopicManager { // consumerTopicManagers for consumers cache. @Getter - private final ConcurrentHashMap> consumerTopicManagers; + private static final ConcurrentHashMap> + consumerTopicManagers = new ConcurrentHashMap<>(); // cache for topics: , for removing producer - private final ConcurrentHashMap> topics; + @Getter + private static final ConcurrentHashMap> + topics = new ConcurrentHashMap<>(); // cache for references in PersistentTopic: - private final ConcurrentHashMap references; + @Getter + private static final ConcurrentHashMap references = new ConcurrentHashMap<>(); private InternalServerCnx internalServerCnx; @@ -80,10 +85,6 @@ public class KafkaTopicManager { this.brokerService = pulsarService.getBrokerService(); this.internalServerCnx = new InternalServerCnx(requestHandler); - consumerTopicManagers = new ConcurrentHashMap<>(); - topics = new ConcurrentHashMap<>(); - references = new ConcurrentHashMap<>(); - this.rwLock = new ReentrantReadWriteLock(); this.closed = false; @@ -363,27 +364,27 @@ public Producer getReferenceProducer(String topicName) { return references.get(topicName); } - public void deReference(String topicName) { + public static void deReference(String topicName) { try { removeTopicManagerCache(topicName); if (consumerTopicManagers.containsKey(topicName)) { - CompletableFuture manager = consumerTopicManagers.get(topicName); - manager.get().close(); - consumerTopicManagers.remove(topicName); + consumerTopicManagers.remove(topicName).get().close(); } if (!topics.containsKey(topicName)) { return; } PersistentTopic persistentTopic = topics.get(topicName).get(); - if (persistentTopic != null) { - persistentTopic.removeProducer(references.get(topicName)); + Producer producer = references.get(topicName); + if (persistentTopic != null && producer != null) { + persistentTopic.removeProducer(producer); } topics.remove(topicName); + + OffsetAcker.removeOffsetAcker(topicName); } catch (Exception e) { - log.error("[{}] Failed to close reference for individual topic {}. exception:", - requestHandler.ctx.channel(), topicName, e); + log.error("Failed to close reference for individual topic {}. exception:", topicName, e); } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java index f65d8d8757..6e177be3ba 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java @@ -116,7 +116,7 @@ public CompletableFuture handleFetch( tcm = pair.getValue().get(); if (tcm == null) { // remove null future cache from consumerTopicManagers - requestHandler.getTopicManager().getConsumerTopicManagers() + KafkaTopicManager.getConsumerTopicManagers() .remove(KopTopic.toString(pair.getKey())); throw new NullPointerException("topic not owned, and return null TCM in fetch."); } @@ -230,7 +230,7 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch, "cursor.readEntry fail. deleteCursor"); } else { // remove null future cache from consumerTopicManagers - requestHandler.getTopicManager().getConsumerTopicManagers() + KafkaTopicManager.getConsumerTopicManagers() .remove(KopTopic.toString(kafkaTopic)); log.warn("Cursor deleted while TCM close."); } @@ -292,7 +292,7 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch, cm.add(pair.getRight(), pair); } else { // remove null future cache from consumerTopicManagers - requestHandler.getTopicManager().getConsumerTopicManagers() + KafkaTopicManager.getConsumerTopicManagers() .remove(KopTopic.toString(kafkaPartition)); log.warn("Cursor deleted while TCM close, failed to add cursor back to TCM."); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java index 9207567660..767d5842ce 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java @@ -47,7 +47,8 @@ public OffsetAcker(PulsarClientImpl pulsarClient) { } // map off consumser: - Map>>> consumers = new ConcurrentHashMap<>(); + public static final Map>>> + CONSUMERS = new ConcurrentHashMap<>(); public void addOffsetsTracker(String groupId, byte[] assignment) { ByteBuffer assignBuffer = ByteBuffer.wrap(assignment); @@ -81,7 +82,7 @@ public void ackOffsets(String groupId, Map of } public void close(Set groupIds) { - groupIds.forEach(groupId -> consumers.get(groupId).values().forEach(consumerFuture -> { + groupIds.forEach(groupId -> CONSUMERS.get(groupId).values().forEach(consumerFuture -> { consumerFuture.whenComplete((consumer, throwable) -> { if (throwable != null) { log.warn("Error when get consumer for consumer group close:", throwable); @@ -99,24 +100,37 @@ public void close(Set groupIds) { @Override public void close() { - log.info("close OffsetAcker with {} groupIds", consumers.size()); - close(consumers.keySet()); + log.info("close OffsetAcker with {} groupIds", CONSUMERS.size()); + close(CONSUMERS.keySet()); } private CompletableFuture> getConsumer(String groupId, TopicPartition topicPartition) { - Map>> group = consumers + String topicName = new KopTopic(topicPartition.topic()).getPartitionName(topicPartition.partition()); + Map>> group = CONSUMERS .computeIfAbsent(groupId, gid -> new ConcurrentHashMap<>()); return group.computeIfAbsent( - topicPartition, - partition -> createConsumer(groupId, partition)); + topicName, + name -> createConsumer(groupId, name)); } - private CompletableFuture> createConsumer(String groupId, TopicPartition topicPartition) { - KopTopic kopTopic = new KopTopic(topicPartition.topic()); + private CompletableFuture> createConsumer(String groupId, String topicName) { return consumerBuilder.clone() - .topic(kopTopic.getPartitionName(topicPartition.partition())) + .topic(topicName) .subscriptionName(groupId) .subscribeAsync(); } + public static void removeOffsetAcker(String topicName) { + CONSUMERS.forEach((groupId, group) -> { + CompletableFuture > consumerCompletableFuture = group.remove(topicName); + if (consumerCompletableFuture != null) { + consumerCompletableFuture.thenApply(Consumer::closeAsync).whenCompleteAsync((ignore, t) -> { + if (t != null) { + log.error("Failed to close offsetAcker consumer when remove partition {}.", + topicName); + } + }); + } + }); + } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java index c12f1abe9c..e6f30a7a93 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java @@ -448,7 +448,7 @@ public KProducer(String topic, Boolean isAsync, String host, props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoKafkaOnPulsarProducer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySer); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSer); - props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000); + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000); if (retry) { props.put(ProducerConfig.RETRIES_CONFIG, 3);