From fee35ea1701bb660a5c3d681378f013e45a5008e Mon Sep 17 00:00:00 2001 From: hangc0276 Date: Tue, 16 Mar 2021 19:49:37 +0800 Subject: [PATCH 01/15] fix issue #252 bundle unload bug --- .github/workflows/pr-tests.yml | 2 +- .../handlers/kop/KafkaProtocolHandler.java | 5 ++--- .../pulsar/handlers/kop/KafkaTopicManager.java | 18 ++++++++---------- 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/.github/workflows/pr-tests.yml b/.github/workflows/pr-tests.yml index 1ad98e03e0..ac6f7082e2 100644 --- a/.github/workflows/pr-tests.yml +++ b/.github/workflows/pr-tests.yml @@ -25,7 +25,7 @@ jobs: run: mvn clean install -DskipTests - name: tests module - run: mvn test -DfailIfNoTests=false '-Dtest=!KafkaIntegration*Test,!DistributedClusterTest' -pl tests + run: mvn test -DfailIfNoTests=false '-Dtest=!KafkaIntegration*Test' -pl tests - name: package surefire artifacts if: failure() diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java index 62bfa7f543..d321aa230f 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java @@ -169,9 +169,8 @@ public void unLoad(NamespaceBundle bundle) { } groupCoordinator.handleGroupEmigration(name.getPartitionIndex()); } - // remove cache when unload - KafkaTopicManager.removeTopicManagerCache(name.toString()); - KopBrokerLookupManager.removeTopicManagerCache(name.toString()); + // deReference topic when unload + KafkaTopicManager.deReference(name.toString()); } } else { log.error("Failed to get owned topic list for " diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java index ee41cc7a01..c04ffba85e 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java @@ -55,12 +55,15 @@ public class KafkaTopicManager { // consumerTopicManagers for consumers cache. @Getter - private final ConcurrentHashMap> consumerTopicManagers; + private static final ConcurrentHashMap> + consumerTopicManagers = new ConcurrentHashMap<>(); // cache for topics: , for removing producer - private final ConcurrentHashMap> topics; + private static final ConcurrentHashMap> + topics = new ConcurrentHashMap<>(); // cache for references in PersistentTopic: - private final ConcurrentHashMap references; + private static final ConcurrentHashMap + references = new ConcurrentHashMap<>(); private InternalServerCnx internalServerCnx; @@ -90,10 +93,6 @@ public class KafkaTopicManager { this.brokerService = pulsarService.getBrokerService(); this.internalServerCnx = new InternalServerCnx(requestHandler); - consumerTopicManagers = new ConcurrentHashMap<>(); - topics = new ConcurrentHashMap<>(); - references = new ConcurrentHashMap<>(); - this.rwLock = new ReentrantReadWriteLock(); this.closed = false; @@ -357,7 +356,7 @@ public Producer getReferenceProducer(String topicName) { return references.get(topicName); } - public void deReference(String topicName) { + public static void deReference(String topicName) { try { removeTopicManagerCache(topicName); @@ -376,8 +375,7 @@ public void deReference(String topicName) { } topics.remove(topicName); } catch (Exception e) { - log.error("[{}] Failed to close reference for individual topic {}. exception:", - requestHandler.ctx.channel(), topicName, e); + log.error("Failed to close reference for individual topic {}. exception:", topicName, e); } } From 144c4c74990424254e28633ce504bf741500f804 Mon Sep 17 00:00:00 2001 From: hangc0276 Date: Tue, 16 Mar 2021 20:22:27 +0800 Subject: [PATCH 02/15] format code --- .../pulsar/handlers/kop/MessageFetchContext.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java index d161278f7a..44035a596b 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java @@ -123,7 +123,7 @@ public CompletableFuture handleFetch( tcm = pair.getValue().get(); if (tcm == null) { // remove null future cache from consumerTopicManagers - requestHandler.getTopicManager().getConsumerTopicManagers() + KafkaTopicManager.getConsumerTopicManagers() .remove(KopTopic.toString(pair.getKey())); throw new NullPointerException("topic not owned, and return null TCM in fetch."); } @@ -239,7 +239,7 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch, "cursor.readEntry fail. deleteCursor"); } else { // remove null future cache from consumerTopicManagers - requestHandler.getTopicManager().getConsumerTopicManagers() + KafkaTopicManager.getConsumerTopicManagers() .remove(KopTopic.toString(kafkaTopic)); log.warn("Cursor deleted while TCM close."); } @@ -329,8 +329,8 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch, cm.add(pair.getRight(), pair); } else { // remove null future cache from consumerTopicManagers - requestHandler.getTopicManager().getConsumerTopicManagers() - .remove(KopTopic.toString(kafkaPartition)); + KafkaTopicManager.getConsumerTopicManagers() + .remove(KopTopic.toString(kafkaPartition)); log.warn("Cursor deleted while TCM close, failed to add cursor back to TCM."); } }); From 78dbf2ab34d3d90fd2f76692e729ca901d8efaeb Mon Sep 17 00:00:00 2001 From: hangc0276 Date: Tue, 16 Mar 2021 23:00:28 +0800 Subject: [PATCH 03/15] clean up static cache --- .../pulsar/handlers/kop/KafkaProtocolHandler.java | 4 ++++ .../streamnative/pulsar/handlers/kop/KafkaTopicManager.java | 2 ++ 2 files changed, 6 insertions(+) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java index d321aa230f..b36997c76e 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java @@ -170,6 +170,7 @@ public void unLoad(NamespaceBundle bundle) { groupCoordinator.handleGroupEmigration(name.getPartitionIndex()); } // deReference topic when unload + KopBrokerLookupManager.removeTopicManagerCache(name.toString()); KafkaTopicManager.deReference(name.toString()); } } else { @@ -348,6 +349,9 @@ public void close() { } KafkaTopicManager.LOOKUP_CACHE.clear(); KopBrokerLookupManager.clear(); + KafkaTopicManager.getConsumerTopicManagers().clear(); + KafkaTopicManager.getReferences().clear(); + KafkaTopicManager.getTopics().clear(); statsProvider.stop(); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java index c04ffba85e..be843e4922 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java @@ -59,9 +59,11 @@ public class KafkaTopicManager { consumerTopicManagers = new ConcurrentHashMap<>(); // cache for topics: , for removing producer + @Getter private static final ConcurrentHashMap> topics = new ConcurrentHashMap<>(); // cache for references in PersistentTopic: + @Getter private static final ConcurrentHashMap references = new ConcurrentHashMap<>(); From f11c3e977c7732486c386eb79cdb68d17dba638f Mon Sep 17 00:00:00 2001 From: hangc0276 Date: Wed, 24 Mar 2021 18:08:26 +0800 Subject: [PATCH 04/15] fix test fail --- .../pulsar/handlers/kop/KafkaProtocolHandler.java | 2 ++ .../pulsar/handlers/kop/DistributedClusterTest.java | 12 ------------ 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java index b36997c76e..710468ef92 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java @@ -117,6 +117,7 @@ public void onLoad(NamespaceBundle bundle) { name, service.pulsar().getBrokerServiceUrl()); } groupCoordinator.handleGroupImmigration(name.getPartitionIndex()); + continue; } KafkaTopicManager.removeTopicManagerCache(name.toString()); KopBrokerLookupManager.removeTopicManagerCache(name.toString()); @@ -168,6 +169,7 @@ public void unLoad(NamespaceBundle bundle) { name, service.pulsar().getBrokerServiceUrl()); } groupCoordinator.handleGroupEmigration(name.getPartitionIndex()); + continue; } // deReference topic when unload KopBrokerLookupManager.removeTopicManagerCache(name.toString()); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java index dac5373a23..fb250664ba 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java @@ -31,7 +31,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; -import lombok.Cleanup; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.ProducerRecord; @@ -298,19 +297,14 @@ public void testMutiBrokerAndCoordinator() throws Exception { // 1. produce message with Kafka producer. int totalMsgs = 50; String messageStrPrefix = "Message_Kop_KafkaProduceKafkaConsume_" + partitionNumber + "_"; - @Cleanup KProducer kProducer = new KProducer(kafkaTopicName, false, getKafkaBrokerPort(), true); kafkaPublishMessage(kProducer, totalMsgs, messageStrPrefix); // 2. create 4 kafka consumer from different consumer groups. // consume data and commit offsets for 4 consumer group. - @Cleanup KConsumer kConsumer1 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-1"); - @Cleanup KConsumer kConsumer2 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-2"); - @Cleanup KConsumer kConsumer3 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-3"); - @Cleanup KConsumer kConsumer4 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-4"); List topicPartitions = IntStream.range(0, partitionNumber) @@ -421,15 +415,12 @@ public void testMutiBrokerUnloadReload() throws Exception { // 2. produce consume message with Kafka producer. int totalMsgs = 50; String messageStrPrefix = "Message_" + kafkaTopicName + "_"; - @Cleanup KProducer kProducer = new KProducer(kafkaTopicName, false, getKafkaBrokerPort(), true); kafkaPublishMessage(kProducer, totalMsgs, messageStrPrefix); List topicPartitions = IntStream.range(0, partitionNumber) .mapToObj(i -> new TopicPartition(kafkaTopicName, i)).collect(Collectors.toList()); - @Cleanup KConsumer kConsumer1 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-1"); - @Cleanup KConsumer kConsumer2 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-2"); log.info("Partition size: {}, will consume and commitOffset for 2 consumers", topicPartitions.size()); @@ -470,15 +461,12 @@ public void testOneBrokerShutdown() throws Exception { // 2. produce consume message with Kafka producer. int totalMsgs = 50; String messageStrPrefix = "Message_" + kafkaTopicName + "_"; - @Cleanup KProducer kProducer = new KProducer(kafkaTopicName, false, getKafkaBrokerPort(), true); kafkaPublishMessage(kProducer, totalMsgs, messageStrPrefix); List topicPartitions = IntStream.range(0, partitionNumber) .mapToObj(i -> new TopicPartition(kafkaTopicName, i)).collect(Collectors.toList()); - @Cleanup KConsumer kConsumer1 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-1"); - @Cleanup KConsumer kConsumer2 = new KConsumer(kafkaTopicName, getKafkaBrokerPort(), "consumer-group-2"); log.info("Partition size: {}, will consume and commitOffset for 2 consumers", topicPartitions.size()); From ab2d2e85d9b94ccb517035133362ec675505ec0d Mon Sep 17 00:00:00 2001 From: hangc0276 Date: Wed, 24 Mar 2021 19:59:27 +0800 Subject: [PATCH 05/15] add timeout for cleanup in test --- .../handlers/kop/DistributedClusterTest.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java index fb250664ba..9d16f1c68c 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java @@ -194,7 +194,7 @@ public void setup() throws Exception { } - @AfterMethod + @AfterMethod(timeOut = 30000) @Override public void cleanup() throws Exception { log.info("--- Shutting down ---"); @@ -388,6 +388,12 @@ public void testMutiBrokerAndCoordinator() throws Exception { assertTrue(records.isEmpty()); records = kConsumer4.getConsumer().poll(Duration.ofMillis(200)); assertTrue(records.isEmpty()); + + kProducer.close(); + kConsumer1.close(); + kConsumer2.close(); + kConsumer3.close(); + kConsumer4.close(); } // Unit test for unload / reload user topic bundle, verify it works well. @@ -436,6 +442,10 @@ public void testMutiBrokerUnloadReload() throws Exception { kafkaPublishMessage(kProducer, totalMsgs, messageStrPrefix); kafkaConsumeCommitMessage(kConsumer1, totalMsgs, messageStrPrefix, topicPartitions); kafkaConsumeCommitMessage(kConsumer2, totalMsgs, messageStrPrefix, topicPartitions); + + kProducer.close(); + kConsumer1.close(); + kConsumer2.close(); } @Test(timeOut = 30000) @@ -482,5 +492,9 @@ public void testOneBrokerShutdown() throws Exception { kafkaPublishMessage(kProducer, totalMsgs, messageStrPrefix); kafkaConsumeCommitMessage(kConsumer1, totalMsgs, messageStrPrefix, topicPartitions); kafkaConsumeCommitMessage(kConsumer2, totalMsgs, messageStrPrefix, topicPartitions); + + kProducer.close(); + kConsumer1.close(); + kConsumer2.close(); } } From 44376230548b2242f8d513ddcedbd468651504d1 Mon Sep 17 00:00:00 2001 From: hangc0276 Date: Fri, 26 Mar 2021 21:24:17 +0800 Subject: [PATCH 06/15] disable loadbalancer for test --- .../streamnative/pulsar/handlers/kop/DistributedClusterTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java index 9d16f1c68c..1cd9993d8d 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java @@ -91,6 +91,7 @@ protected KafkaServiceConfiguration resetConfig(int brokerPort, int webPort, int kConfig.setAllowAutoTopicCreation(true); kConfig.setAllowAutoTopicCreationType("partitioned"); kConfig.setBrokerDeleteInactiveTopicsEnabled(false); + kConfig.setLoadBalancerEnabled(false); // set protocol related config URL testHandlerUrl = this.getClass().getClassLoader().getResource("test-protocol-handler.nar"); From 850c333594d41b120ced09afb88568aad9ed65ec Mon Sep 17 00:00:00 2001 From: hangc0276 Date: Fri, 26 Mar 2021 21:25:38 +0800 Subject: [PATCH 07/15] update producer config for test --- .../pulsar/handlers/kop/KopProtocolHandlerTestBase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java index 66ef4d20a6..bd2bc8939c 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java @@ -463,11 +463,11 @@ public KProducer(String topic, Boolean isAsync, String host, props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoKafkaOnPulsarProducer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySer); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSer); - props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000); + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000); if (retry) { props.put(ProducerConfig.RETRIES_CONFIG, 3); - props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 3); } if (null != username && null != password) { From 63c9bc8dd5987c413275ae7e96946b0e2c614534 Mon Sep 17 00:00:00 2001 From: hangc0276 Date: Fri, 26 Mar 2021 23:21:09 +0800 Subject: [PATCH 08/15] fix unload bug --- .../pulsar/handlers/kop/KafkaTopicManager.java | 9 ++++----- .../pulsar/handlers/kop/DistributedClusterTest.java | 3 +++ .../pulsar/handlers/kop/KopProtocolHandlerTestBase.java | 4 ++-- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java index be843e4922..960fa96783 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java @@ -363,17 +363,16 @@ public static void deReference(String topicName) { removeTopicManagerCache(topicName); if (consumerTopicManagers.containsKey(topicName)) { - CompletableFuture manager = consumerTopicManagers.get(topicName); - manager.get().close(); - consumerTopicManagers.remove(topicName); + consumerTopicManagers.remove(topicName).get().close(); } if (!topics.containsKey(topicName)) { return; } PersistentTopic persistentTopic = topics.get(topicName).get(); - if (persistentTopic != null) { - persistentTopic.removeProducer(references.get(topicName)); + Producer producer = references.get(topicName); + if (persistentTopic != null && producer != null) { + persistentTopic.removeProducer(producer); } topics.remove(topicName); } catch (Exception e) { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java index 1cd9993d8d..d14a8d1c68 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java @@ -92,6 +92,9 @@ protected KafkaServiceConfiguration resetConfig(int brokerPort, int webPort, int kConfig.setAllowAutoTopicCreationType("partitioned"); kConfig.setBrokerDeleteInactiveTopicsEnabled(false); kConfig.setLoadBalancerEnabled(false); + kConfig.setLoadBalancerSheddingEnabled(false); + kConfig.setLoadBalancerAutoBundleSplitEnabled(false); + kConfig.setMessagingProtocols(Sets.newHashSet("kafka")); // set protocol related config URL testHandlerUrl = this.getClass().getClassLoader().getResource("test-protocol-handler.nar"); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java index bd2bc8939c..e4773ddfc2 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java @@ -463,11 +463,11 @@ public KProducer(String topic, Boolean isAsync, String host, props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoKafkaOnPulsarProducer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySer); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSer); - props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000); + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000); if (retry) { props.put(ProducerConfig.RETRIES_CONFIG, 3); - props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 3); + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); } if (null != username && null != password) { From 78bed1f620bc348d1d9a546389a37d4ee46062fe Mon Sep 17 00:00:00 2001 From: hangc0276 Date: Fri, 2 Apr 2021 14:35:41 +0800 Subject: [PATCH 09/15] fix offsetAcker consumer not close on bundle unload --- .../handlers/kop/KafkaProtocolHandler.java | 4 +- .../handlers/kop/KafkaTopicManager.java | 2 + .../kop/coordinator/group/OffsetAcker.java | 55 +++++++++++++------ .../handlers/kop/DistributedClusterTest.java | 1 - 4 files changed, 41 insertions(+), 21 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java index 710468ef92..127a7587b4 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java @@ -23,6 +23,7 @@ import io.netty.channel.socket.SocketChannel; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupConfig; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator; +import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetAcker; import io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetConfig; import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionConfig; import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator; @@ -117,7 +118,6 @@ public void onLoad(NamespaceBundle bundle) { name, service.pulsar().getBrokerServiceUrl()); } groupCoordinator.handleGroupImmigration(name.getPartitionIndex()); - continue; } KafkaTopicManager.removeTopicManagerCache(name.toString()); KopBrokerLookupManager.removeTopicManagerCache(name.toString()); @@ -169,7 +169,6 @@ public void unLoad(NamespaceBundle bundle) { name, service.pulsar().getBrokerServiceUrl()); } groupCoordinator.handleGroupEmigration(name.getPartitionIndex()); - continue; } // deReference topic when unload KopBrokerLookupManager.removeTopicManagerCache(name.toString()); @@ -354,6 +353,7 @@ public void close() { KafkaTopicManager.getConsumerTopicManagers().clear(); KafkaTopicManager.getReferences().clear(); KafkaTopicManager.getTopics().clear(); + OffsetAcker.CONSUMERS.clear(); statsProvider.stop(); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java index 960fa96783..cec1ab46c1 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java @@ -375,6 +375,8 @@ public static void deReference(String topicName) { persistentTopic.removeProducer(producer); } topics.remove(topicName); + + OffsetAcker.removeOffsetAcker(topicName); } catch (Exception e) { log.error("Failed to close reference for individual topic {}. exception:", topicName, e); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java index 6feb11bc5e..20b94b9036 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import lombok.Getter; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -45,7 +46,7 @@ @Slf4j public class OffsetAcker implements Closeable { - private static final Map>> EMPTY_CONSUMERS = new HashMap<>(); + private static final Map>> EMPTY_CONSUMERS = new HashMap<>(); private final ConsumerBuilder consumerBuilder; private final BrokerService brokerService; @@ -57,8 +58,8 @@ public class OffsetAcker implements Closeable { // value is the created future of consumer. // The consumer, whose subscription is the group id, is used for acknowledging message id cumulatively. // This behavior is equivalent to committing offsets in Kafka. - private final Map>>> - consumers = new ConcurrentHashMap<>(); + public static final Map>>> + CONSUMERS = new ConcurrentHashMap<>(); public OffsetAcker(PulsarClientImpl pulsarClient) { this.consumerBuilder = pulsarClient.newConsumer() @@ -138,8 +139,8 @@ public void ackOffsets(String groupId, Map of public void close(Set groupIds) { for (String groupId : groupIds) { - final Map>> - consumersToRemove = consumers.remove(groupId); + final Map>> + consumersToRemove = CONSUMERS.remove(groupId); if (consumersToRemove == null) { continue; } @@ -151,9 +152,7 @@ public void close(Set groupIds) { } final Consumer consumer = consumerFuture.getNow(null); if (consumer != null) { - if (log.isDebugEnabled()) { - log.debug("Try to close consumer of [group={}] [topic={}]", groupId, topicPartition.toString()); - } + log.debug("Try to close consumer of [group={}] [topic={}]", groupId, topicPartition); consumer.closeAsync(); } }); @@ -162,35 +161,41 @@ public void close(Set groupIds) { @Override public void close() { - log.info("close OffsetAcker with {} groupIds", consumers.size()); - close(consumers.keySet()); + log.info("close OffsetAcker with {} groupIds", CONSUMERS.size()); + close(CONSUMERS.keySet()); } @NonNull public CompletableFuture> getOrCreateConsumer(String groupId, TopicPartition topicPartition) { - Map>> group = consumers + Map>> group = CONSUMERS .computeIfAbsent(groupId, gid -> new ConcurrentHashMap<>()); + KopTopic kopTopic = new KopTopic(topicPartition.topic()); + String topicName = kopTopic.getPartitionName((topicPartition.partition())); return group.computeIfAbsent( - topicPartition, - partition -> createConsumer(groupId, partition)); + topicName, + name -> createConsumer(groupId, name)); } @NonNull - private CompletableFuture> createConsumer(String groupId, TopicPartition topicPartition) { - KopTopic kopTopic = new KopTopic(topicPartition.topic()); + private CompletableFuture> createConsumer(String groupId, String topicName) { return consumerBuilder.clone() - .topic(kopTopic.getPartitionName(topicPartition.partition())) + .topic(topicName) .subscriptionName(groupId) .subscribeAsync(); } public CompletableFuture> getConsumer(String groupId, TopicPartition topicPartition) { - return consumers.getOrDefault(groupId, EMPTY_CONSUMERS).get(topicPartition); + KopTopic kopTopic = new KopTopic(topicPartition.topic()); + String topicName = kopTopic.getPartitionName((topicPartition.partition())); + return CONSUMERS.getOrDefault(groupId, EMPTY_CONSUMERS).get(topicName); } public void removeConsumer(String groupId, TopicPartition topicPartition) { + KopTopic kopTopic = new KopTopic(topicPartition.topic()); + String topicName = kopTopic.getPartitionName((topicPartition.partition())); + final CompletableFuture> consumerFuture = - consumers.getOrDefault(groupId, EMPTY_CONSUMERS).remove(topicPartition); + CONSUMERS.getOrDefault(groupId, EMPTY_CONSUMERS).remove(topicName); if (consumerFuture != null) { consumerFuture.whenComplete((consumer, e) -> { if (e == null) { @@ -202,4 +207,18 @@ public void removeConsumer(String groupId, TopicPartition topicPartition) { }); } } + + public static void removeOffsetAcker(String topicName) { + CONSUMERS.forEach((groupId, group) -> { + CompletableFuture > consumerCompletableFuture = group.remove(topicName); + if (consumerCompletableFuture != null) { + consumerCompletableFuture.thenApply(Consumer::closeAsync).whenCompleteAsync((ignore, t) -> { + if (t != null) { + log.error("Failed to close offsetAcker consumer when remove partition {}.", + topicName); + } + }); + } + }); + } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java index d14a8d1c68..5e00384620 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java @@ -94,7 +94,6 @@ protected KafkaServiceConfiguration resetConfig(int brokerPort, int webPort, int kConfig.setLoadBalancerEnabled(false); kConfig.setLoadBalancerSheddingEnabled(false); kConfig.setLoadBalancerAutoBundleSplitEnabled(false); - kConfig.setMessagingProtocols(Sets.newHashSet("kafka")); // set protocol related config URL testHandlerUrl = this.getClass().getClassLoader().getResource("test-protocol-handler.nar"); From 9b84372374b432b03a4fb13aafbc6a6aba2df297 Mon Sep 17 00:00:00 2001 From: hangc0276 Date: Fri, 2 Apr 2021 15:55:21 +0800 Subject: [PATCH 10/15] add zookeeper client mock --- .../pulsar/handlers/kop/KopProtocolHandlerTestBase.java | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java index e4773ddfc2..7054f40b3d 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java @@ -338,6 +338,7 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception { doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory(); doReturn(new ZKMetadataStore(mockZooKeeper)).when(pulsar).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeper)).when(pulsar).createConfigurationMetadataStore(); + doReturn(mockZooKeeper).when(pulsar).getZkClient(); Supplier namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar)); doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider(); From ea38ad9e057072b11e0cb9752f42300598809ca8 Mon Sep 17 00:00:00 2001 From: hangc0276 Date: Wed, 7 Apr 2021 20:18:37 +0800 Subject: [PATCH 11/15] remove kafka consumer poll test in DistributedClusterTest --- .../pulsar/handlers/kop/DistributedClusterTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java index 5e00384620..77f703f005 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java @@ -351,6 +351,7 @@ public void testMutiBrokerAndCoordinator() throws Exception { log.info("Unload offset namespace, this will trigger another reload. After reload verify offset."); pulsarService1.getAdminClient().namespaces().unload(offsetNs); + /* // verify offset be kept and no more records could read. ConsumerRecords records = kConsumer1.getConsumer().poll(Duration.ofMillis(200)); assertTrue(records.isEmpty()); @@ -360,6 +361,7 @@ public void testMutiBrokerAndCoordinator() throws Exception { assertTrue(records.isEmpty()); records = kConsumer4.getConsumer().poll(Duration.ofMillis(200)); assertTrue(records.isEmpty()); + */ // 5. another round publish and consume after ns unload. kafkaPublishMessage(kProducer, totalMsgs, messageStrPrefix); @@ -382,6 +384,7 @@ public void testMutiBrokerAndCoordinator() throws Exception { log.info("Unload offset namespace, this will trigger another reload"); pulsarService1.getAdminClient().namespaces().unload(offsetNs); + /* // verify offset be kept and no more records could read. records = kConsumer1.getConsumer().poll(Duration.ofMillis(200)); assertTrue(records.isEmpty()); @@ -391,6 +394,7 @@ public void testMutiBrokerAndCoordinator() throws Exception { assertTrue(records.isEmpty()); records = kConsumer4.getConsumer().poll(Duration.ofMillis(200)); assertTrue(records.isEmpty()); + */ kProducer.close(); kConsumer1.close(); From e4d6e48d6a86cc4be56abf412e7f72af7cd9b7d9 Mon Sep 17 00:00:00 2001 From: hangc0276 Date: Wed, 7 Apr 2021 20:35:46 +0800 Subject: [PATCH 12/15] format code --- .../streamnative/pulsar/handlers/kop/DistributedClusterTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java index 77f703f005..03cb2bd737 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java @@ -16,7 +16,6 @@ import static org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME; import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; import com.google.common.collect.Lists; import com.google.common.collect.Maps; From 4b38e29a4510a0da530ba6237ff4fcaa699aa653 Mon Sep 17 00:00:00 2001 From: hangc0276 Date: Thu, 8 Apr 2021 17:26:10 +0800 Subject: [PATCH 13/15] format code --- .../pulsar/handlers/kop/coordinator/group/OffsetAcker.java | 1 - 1 file changed, 1 deletion(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java index 20b94b9036..933d434710 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java @@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; -import lombok.Getter; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.impl.PositionImpl; From be40db72aef9d4fab246accd7dbef807d0ea8623 Mon Sep 17 00:00:00 2001 From: hangc0276 Date: Tue, 13 Apr 2021 23:36:48 +0800 Subject: [PATCH 14/15] turn on poll operation for test --- .../pulsar/handlers/kop/DistributedClusterTest.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java index 03cb2bd737..76685e75bc 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java @@ -15,6 +15,7 @@ import static org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME; import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX; +import static org.junit.Assert.assertTrue; import static org.testng.Assert.assertEquals; import com.google.common.collect.Lists; @@ -350,7 +351,6 @@ public void testMutiBrokerAndCoordinator() throws Exception { log.info("Unload offset namespace, this will trigger another reload. After reload verify offset."); pulsarService1.getAdminClient().namespaces().unload(offsetNs); - /* // verify offset be kept and no more records could read. ConsumerRecords records = kConsumer1.getConsumer().poll(Duration.ofMillis(200)); assertTrue(records.isEmpty()); @@ -360,7 +360,6 @@ public void testMutiBrokerAndCoordinator() throws Exception { assertTrue(records.isEmpty()); records = kConsumer4.getConsumer().poll(Duration.ofMillis(200)); assertTrue(records.isEmpty()); - */ // 5. another round publish and consume after ns unload. kafkaPublishMessage(kProducer, totalMsgs, messageStrPrefix); @@ -383,7 +382,6 @@ public void testMutiBrokerAndCoordinator() throws Exception { log.info("Unload offset namespace, this will trigger another reload"); pulsarService1.getAdminClient().namespaces().unload(offsetNs); - /* // verify offset be kept and no more records could read. records = kConsumer1.getConsumer().poll(Duration.ofMillis(200)); assertTrue(records.isEmpty()); @@ -393,7 +391,6 @@ public void testMutiBrokerAndCoordinator() throws Exception { assertTrue(records.isEmpty()); records = kConsumer4.getConsumer().poll(Duration.ofMillis(200)); assertTrue(records.isEmpty()); - */ kProducer.close(); kConsumer1.close(); From ef4f94c9b8b01db40fa4fd160a68c80425a00ad2 Mon Sep 17 00:00:00 2001 From: chenhang Date: Thu, 22 Apr 2021 13:00:22 +0800 Subject: [PATCH 15/15] merge master --- .../pulsar/handlers/kop/DistributedClusterTest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java index 76685e75bc..2f4ead35de 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DistributedClusterTest.java @@ -91,9 +91,6 @@ protected KafkaServiceConfiguration resetConfig(int brokerPort, int webPort, int kConfig.setAllowAutoTopicCreation(true); kConfig.setAllowAutoTopicCreationType("partitioned"); kConfig.setBrokerDeleteInactiveTopicsEnabled(false); - kConfig.setLoadBalancerEnabled(false); - kConfig.setLoadBalancerSheddingEnabled(false); - kConfig.setLoadBalancerAutoBundleSplitEnabled(false); // set protocol related config URL testHandlerUrl = this.getClass().getClassLoader().getResource("test-protocol-handler.nar");