Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ private void getAllTopicsAsync(CompletableFuture<Map<String, List<TopicName>>> t
if (key.equals(offsetsTopicName)) {
continue;
}
topicMap.computeIfAbsent(key, ignored ->
topicMap.computeIfAbsent(KopTopic.removeDefaultNamespacePrefix(key), ignored ->
Collections.synchronizedList(new ArrayList<>())
).add(topicName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ public class KopTopic {
private static final String persistentDomain = "persistent://";
private static volatile String namespacePrefix; // the full namespace prefix, e.g. "public/default"

public static String removeDefaultNamespacePrefix(String fullTopicName) {
final String topicPrefix = persistentDomain + namespacePrefix + "/";
if (fullTopicName.startsWith(topicPrefix)) {
return fullTopicName.substring(topicPrefix.length());
} else {
return fullTopicName;
}
}

public static void initialize(String namespace) {
if (namespace.split("/").length != 2) {
throw new IllegalArgumentException("Invalid namespace: " + namespace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,14 @@ public void testGetPartitionName() {
assertTrue(e.getMessage().contains("Invalid partition"));
}
}

@Test
public void testRemoveDefaultNamespacePrefix() {
KopTopic.initialize("my-tenant/my-ns");

final String topic1 = "persistent://my-tenant/my-ns/my-topic";
final String topic2 = "persistent://my-tenant/another-ns/my-topic";
assertEquals(KopTopic.removeDefaultNamespacePrefix(topic1), "my-topic");
assertEquals(KopTopic.removeDefaultNamespacePrefix(topic2), topic2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand Down Expand Up @@ -122,7 +126,8 @@ void testSimpleProduceAndConsume(String topic) {

@Test(timeOut = 20000)
void testListTopics() throws Exception {
final String topic1 = "list-topics-1";
final String topic1ShortName = "list-topics-1";
final String topic1 = DEFAULT_TENANT + "/" + DEFAULT_NAMESPACE + "/" + topic1ShortName;
final int numPartitions1 = 3;
final String topic2 = ANOTHER_TENANT + "/" + ANOTHER_NAMESPACE + "/list-topics-2";
final int numPartitions2 = 5;
Expand All @@ -135,13 +140,20 @@ void testListTopics() throws Exception {
Map<String, List<PartitionInfo>> topicMap = kConsumer.getConsumer().listTopics(Duration.ofSeconds(5));
log.info("topicMap: {}", topicMap);

final String key1 = new KopTopic(topic1).getFullName();
assertTrue(topicMap.containsKey(key1));
assertEquals(topicMap.get(key1).size(), numPartitions1);
assertTrue(topicMap.containsKey(topic1ShortName));
assertEquals(topicMap.get(topic1ShortName).size(), numPartitions1);

final String key2 = new KopTopic(topic2).getFullName();
assertTrue(topicMap.containsKey(key2));
assertEquals(topicMap.get(key2).size(), numPartitions2);

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort());
AdminClient kafkaAdmin = AdminClient.create(props);
Set<String> topicSet = kafkaAdmin.listTopics().names().get();
log.info("topicSet: {}", topicSet);
assertTrue(topicSet.contains(topic1ShortName));
assertTrue(topicSet.contains(key2));
}

@Test(timeOut = 30000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public class PulsarAuthEnabledTest extends KopProtocolHandlerTestBase {
private static final String TENANT = "PulsarAuthEnabledTest";
private static final String ADMIN_USER = "admin_user";
private static final String NAMESPACE = "ns2";
private static final String TOPIC = "persistent://" + TENANT + "/" + NAMESPACE + "/topic2";
private static final String SHORT_TOPIC = "topic2";
private static final String TOPIC = "persistent://" + TENANT + "/" + NAMESPACE + "/" + SHORT_TOPIC;
private String adminToken;

@BeforeClass
Expand Down Expand Up @@ -149,8 +150,8 @@ void simpleProduceAndConsumeWithPulsarAuthed() throws Exception {
Map<String, List<PartitionInfo>> result = kConsumer
.getConsumer().listTopics(Duration.ofSeconds(1));
assertEquals(result.size(), 1);
assertTrue(result.containsKey(TOPIC),
"list of topics " + result.keySet().toString() + " does not contains " + TOPIC);
assertTrue(result.containsKey(SHORT_TOPIC),
"list of topics " + result.keySet().toString() + " does not contains " + SHORT_TOPIC);
}

}