From 3c6539dae618c8cdb72a02fb75ff493f9e88376d Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Mon, 27 Jun 2022 18:26:18 +0800 Subject: [PATCH] Fix get non-persistent topics issue in Namespaces. (#16170) --- .../broker/admin/impl/NamespacesBase.java | 44 +++++++++++++++++++ .../pulsar/broker/admin/v1/Namespaces.java | 10 ++--- .../pulsar/broker/admin/v2/Namespaces.java | 10 ++--- 3 files changed, 52 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 5cdbdc9930ab1..06e54831461e4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -47,6 +47,7 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.UriBuilder; +import org.apache.commons.collections4.ListUtils; import org.apache.commons.lang.mutable.MutableObject; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; @@ -60,6 +61,7 @@ import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; @@ -164,6 +166,48 @@ protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean auth } } + protected CompletableFuture> internalGetListOfTopics(Policies policies, + CommandGetTopicsOfNamespace.Mode mode) { + switch (mode) { + case ALL: + return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName) + .thenCombine(internalGetNonPersistentTopics(policies), + (persistentTopics, nonPersistentTopics) -> + ListUtils.union(persistentTopics, nonPersistentTopics)); + case NON_PERSISTENT: + return internalGetNonPersistentTopics(policies); + case PERSISTENT: + default: + return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName); + } + } + + protected CompletableFuture> internalGetNonPersistentTopics(Policies policies) { + final List>> futures = Lists.newArrayList(); + final List boundaries = policies.bundles.getBoundaries(); + for (int i = 0; i < boundaries.size() - 1; i++) { + final String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1)); + try { + futures.add(pulsar().getAdminClient().topics() + .getListInBundleAsync(namespaceName.toString(), bundle)); + } catch (PulsarServerException e) { + throw new RestException(e); + } + } + return FutureUtil.waitForAll(futures) + .thenApply(__ -> { + final List topics = Lists.newArrayList(); + for (int i = 0; i < futures.size(); i++) { + List topicList = futures.get(i).join(); + if (topicList != null) { + topics.addAll(topicList); + } + } + return topics.stream().filter(name -> !TopicName.get(name).isPersistent()) + .collect(Collectors.toList()); + }); + } + @SuppressWarnings("deprecation") protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative) { validateTenantOperation(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 276ed51a1ab73..e978f9050fa57 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -130,12 +130,10 @@ public void getTopics(@PathParam("property") String property, @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode, @Suspended AsyncResponse asyncResponse) { validateNamespaceName(property, cluster, namespace); - validateNamespaceOperation(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS); - - // Validate that namespace exists, throws 404 if it doesn't exist - getNamespacePolicies(namespaceName); - - pulsar().getNamespaceService().getListOfTopics(namespaceName, mode) + validateNamespaceOperationAsync(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS) + // Validate that namespace exists, throws 404 if it doesn't exist + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenCompose(policies -> internalGetListOfTopics(policies, mode)) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { log.error("Failed to get topics list for namespace {}", namespaceName, ex); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 18a8eff4a9a40..fbe0918b8a836 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -101,12 +101,10 @@ public void getTopics(@PathParam("tenant") String tenant, @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode, @Suspended AsyncResponse asyncResponse) { validateNamespaceName(tenant, namespace); - validateNamespaceOperation(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_TOPICS); - - // Validate that namespace exists, throws 404 if it doesn't exist - getNamespacePolicies(namespaceName); - - pulsar().getNamespaceService().getListOfTopics(namespaceName, mode) + validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_TOPICS) + // Validate that namespace exists, throws 404 if it doesn't exist + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenCompose(policies -> internalGetListOfTopics(policies, mode)) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { log.error("Failed to get topics list for namespace {}", namespaceName, ex);