From da3f93d6f77d78f80b03c89fa6cdfc493e93e583 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 7 Dec 2020 11:41:58 +0800 Subject: [PATCH 1/4] Add a method to remove the default namespace prefix --- .../pulsar/handlers/kop/utils/KopTopic.java | 9 +++++++++ .../pulsar/handlers/kop/utils/KopTopicTest.java | 10 ++++++++++ 2 files changed, 19 insertions(+) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KopTopic.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KopTopic.java index ca2b6b8a3b..c5dd625539 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KopTopic.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KopTopic.java @@ -29,6 +29,15 @@ public class KopTopic { private static final String persistentDomain = "persistent://"; private static volatile String namespacePrefix; // the full namespace prefix, e.g. "public/default" + public static String removeDefaultNamespacePrefix(String fullTopicName) { + final String topicPrefix = persistentDomain + namespacePrefix + "/"; + if (fullTopicName.startsWith(topicPrefix)) { + return fullTopicName.substring(topicPrefix.length()); + } else { + return fullTopicName; + } + } + public static void initialize(String namespace) { if (namespace.split("/").length != 2) { throw new IllegalArgumentException("Invalid namespace: " + namespace); diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/utils/KopTopicTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/utils/KopTopicTest.java index 46c5a19b1f..ea70a83360 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/utils/KopTopicTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/utils/KopTopicTest.java @@ -74,4 +74,14 @@ public void testGetPartitionName() { assertTrue(e.getMessage().contains("Invalid partition")); } } + + @Test + public void testRemoveDefaultNamespacePrefix() { + KopTopic.initialize("my-tenant/my-ns"); + + final String topic1 = "persistent://my-tenant/my-ns/my-topic"; + final String topic2 = "persistent://my-tenant/another-ns/my-topic"; + assertEquals(KopTopic.removeDefaultNamespacePrefix(topic1), "my-topic"); + assertEquals(KopTopic.removeDefaultNamespacePrefix(topic2), topic2); + } } From 3d584364e74fba2102e0ddb2b8b0d76624515791 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 7 Dec 2020 11:59:48 +0800 Subject: [PATCH 2/4] Use short topic name if the topic is in the default namespace --- .../handlers/kop/KafkaRequestHandler.java | 2 +- .../handlers/kop/DifferentNamespaceTest.java | 20 +++++++++++++++---- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 95016e53cc..992c28d006 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -334,7 +334,7 @@ private void getAllTopicsAsync(CompletableFuture>> t if (key.equals(offsetsTopicName)) { continue; } - topicMap.computeIfAbsent(key, ignored -> + topicMap.computeIfAbsent(KopTopic.removeDefaultNamespacePrefix(key), ignored -> Collections.synchronizedList(new ArrayList<>()) ).add(topicName); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DifferentNamespaceTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DifferentNamespaceTest.java index 5382ebd1cc..94c35bcb80 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DifferentNamespaceTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DifferentNamespaceTest.java @@ -26,11 +26,15 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Properties; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.ProducerRecord; @@ -122,7 +126,8 @@ void testSimpleProduceAndConsume(String topic) { @Test(timeOut = 20000) void testListTopics() throws Exception { - final String topic1 = "list-topics-1"; + final String topic1ShortName = "list-topics-1"; + final String topic1 = DEFAULT_TENANT + "/" + DEFAULT_NAMESPACE + "/" + topic1ShortName; final int numPartitions1 = 3; final String topic2 = ANOTHER_TENANT + "/" + ANOTHER_NAMESPACE + "/list-topics-2"; final int numPartitions2 = 5; @@ -135,13 +140,20 @@ void testListTopics() throws Exception { Map> topicMap = kConsumer.getConsumer().listTopics(Duration.ofSeconds(5)); log.info("topicMap: {}", topicMap); - final String key1 = new KopTopic(topic1).getFullName(); - assertTrue(topicMap.containsKey(key1)); - assertEquals(topicMap.get(key1).size(), numPartitions1); + assertTrue(topicMap.containsKey(topic1ShortName)); + assertEquals(topicMap.get(topic1ShortName).size(), numPartitions1); final String key2 = new KopTopic(topic2).getFullName(); assertTrue(topicMap.containsKey(key2)); assertEquals(topicMap.get(key2).size(), numPartitions2); + + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); + AdminClient admin = AdminClient.create(props); + Set topicSet = admin.listTopics().names().get(); + log.info("topicSet: {}", topicSet); + assertTrue(topicSet.contains(topic1ShortName)); + assertTrue(topicSet.contains(key2)); } @Test(timeOut = 30000) From b939b0bb5e70c98b4c8612b1dc89da0282fda7ff Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 7 Dec 2020 12:02:16 +0800 Subject: [PATCH 3/4] Change variable name to avoid conflict --- .../pulsar/handlers/kop/DifferentNamespaceTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DifferentNamespaceTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DifferentNamespaceTest.java index 94c35bcb80..8642f2ea00 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DifferentNamespaceTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DifferentNamespaceTest.java @@ -149,8 +149,8 @@ void testListTopics() throws Exception { Properties props = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); - AdminClient admin = AdminClient.create(props); - Set topicSet = admin.listTopics().names().get(); + AdminClient kafkaAdmin = AdminClient.create(props); + Set topicSet = kafkaAdmin.listTopics().names().get(); log.info("topicSet: {}", topicSet); assertTrue(topicSet.contains(topic1ShortName)); assertTrue(topicSet.contains(key2)); From b24c649db9a29086153f6b73ca98460f775747c8 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 7 Dec 2020 14:14:46 +0800 Subject: [PATCH 4/4] Fix simpleProduceAndConsumeWithPulsarAuthed test error --- .../pulsar/handlers/kop/PulsarAuthEnabledTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/PulsarAuthEnabledTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/PulsarAuthEnabledTest.java index 06a497ed26..ca865895bf 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/PulsarAuthEnabledTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/PulsarAuthEnabledTest.java @@ -53,7 +53,8 @@ public class PulsarAuthEnabledTest extends KopProtocolHandlerTestBase { private static final String TENANT = "PulsarAuthEnabledTest"; private static final String ADMIN_USER = "admin_user"; private static final String NAMESPACE = "ns2"; - private static final String TOPIC = "persistent://" + TENANT + "/" + NAMESPACE + "/topic2"; + private static final String SHORT_TOPIC = "topic2"; + private static final String TOPIC = "persistent://" + TENANT + "/" + NAMESPACE + "/" + SHORT_TOPIC; private String adminToken; @BeforeClass @@ -149,8 +150,8 @@ void simpleProduceAndConsumeWithPulsarAuthed() throws Exception { Map> result = kConsumer .getConsumer().listTopics(Duration.ofSeconds(1)); assertEquals(result.size(), 1); - assertTrue(result.containsKey(TOPIC), - "list of topics " + result.keySet().toString() + " does not contains " + TOPIC); + assertTrue(result.containsKey(SHORT_TOPIC), + "list of topics " + result.keySet().toString() + " does not contains " + SHORT_TOPIC); } }