diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 2a1584df961f7..ec4c907234ab6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -40,6 +40,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -55,6 +56,7 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.ListUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -104,6 +106,7 @@ import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; import org.apache.pulsar.common.stats.MetricsUtil; +import org.apache.pulsar.common.topics.TopicList; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.metadata.api.MetadataCache; @@ -187,6 +190,9 @@ public class NamespaceService implements AutoCloseable { .register(); private final DoubleHistogram lookupLatencyHistogram; + private ConcurrentHashMap>> inProgressQueryUserTopics = + new ConcurrentHashMap<>(); + /** * Default constructor. */ @@ -1509,6 +1515,23 @@ public CompletableFuture> getListOfTopics(NamespaceName namespaceNa } } + public CompletableFuture> getListOfUserTopics(NamespaceName namespaceName, Mode mode) { + String key = String.format("%s://%s", mode, namespaceName); + final MutableBoolean initializedByCurrentThread = new MutableBoolean(); + CompletableFuture> queryRes = inProgressQueryUserTopics.computeIfAbsent(key, k -> { + initializedByCurrentThread.setTrue(); + return getListOfTopics(namespaceName, mode).thenApplyAsync(list -> { + return TopicList.filterSystemTopic(list); + }, pulsar.getExecutor()); + }); + if (initializedByCurrentThread.getValue()) { + queryRes.whenComplete((ignore, ex) -> { + inProgressQueryUserTopics.remove(key, queryRes); + }); + } + return queryRes; + } + public CompletableFuture> getAllPartitions(NamespaceName namespaceName) { return getPartitions(namespaceName, TopicDomain.persistent) .thenCombine(getPartitions(namespaceName, TopicDomain.non_persistent), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 4933aee974d08..9bca80c41bb49 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2459,11 +2459,11 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet if (lookupSemaphore.tryAcquire()) { isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> { if (isAuthorized) { - getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode) + getBrokerService().pulsar().getNamespaceService().getListOfUserTopics(namespaceName, mode) .thenAccept(topics -> { boolean filterTopics = false; // filter system topic - List filteredTopics = TopicList.filterSystemTopic(topics); + List filteredTopics = topics; if (enableSubscriptionPatternEvaluation && topicsPattern.isPresent()) { if (topicsPattern.get().length() <= maxSubscriptionPatternLength) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 27afedd6b101e..58c6b96a0f346 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -229,6 +229,8 @@ public void setup() throws Exception { doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkTopicOwnership(any()); doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics( NamespaceName.get("use", "ns-abc"), CommandGetTopicsOfNamespace.Mode.ALL); + doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfUserTopics( + NamespaceName.get("use", "ns-abc"), CommandGetTopicsOfNamespace.Mode.ALL); doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfPersistentTopics( NamespaceName.get("use", "ns-abc"));