From c4d4322cce0668bd28d19747cc8e1b911fa7eb82 Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Mon, 27 Jun 2022 18:26:18 +0800 Subject: [PATCH 1/2] Fix get non-persistent topics issue in Namespaces. (#16170) --- .../broker/admin/impl/NamespacesBase.java | 44 +++++++++++++++++++ .../pulsar/broker/admin/v1/Namespaces.java | 10 ++--- .../pulsar/broker/admin/v2/Namespaces.java | 10 ++--- 3 files changed, 52 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index caf75a99d1d3e..2be1e32ba3201 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -48,6 +48,7 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.UriBuilder; +import org.apache.commons.collections4.ListUtils; import org.apache.commons.lang.mutable.MutableObject; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; @@ -61,6 +62,7 @@ import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; @@ -163,6 +165,48 @@ protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean auth } } + protected CompletableFuture> internalGetListOfTopics(Policies policies, + CommandGetTopicsOfNamespace.Mode mode) { + switch (mode) { + case ALL: + return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName) + .thenCombine(internalGetNonPersistentTopics(policies), + (persistentTopics, nonPersistentTopics) -> + ListUtils.union(persistentTopics, nonPersistentTopics)); + case NON_PERSISTENT: + return internalGetNonPersistentTopics(policies); + case PERSISTENT: + default: + return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName); + } + } + + protected CompletableFuture> internalGetNonPersistentTopics(Policies policies) { + final List>> futures = Lists.newArrayList(); + final List boundaries = policies.bundles.getBoundaries(); + for (int i = 0; i < boundaries.size() - 1; i++) { + final String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1)); + try { + futures.add(pulsar().getAdminClient().topics() + .getListInBundleAsync(namespaceName.toString(), bundle)); + } catch (PulsarServerException e) { + throw new RestException(e); + } + } + return FutureUtil.waitForAll(futures) + .thenApply(__ -> { + final List topics = Lists.newArrayList(); + for (int i = 0; i < futures.size(); i++) { + List topicList = futures.get(i).join(); + if (topicList != null) { + topics.addAll(topicList); + } + } + return topics.stream().filter(name -> !TopicName.get(name).isPersistent()) + .collect(Collectors.toList()); + }); + } + @SuppressWarnings("deprecation") protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative) { validateTenantOperation(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index afb6174c5508e..ab8ca2d65f140 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -129,12 +129,10 @@ public void getTopics(@PathParam("property") String property, @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode, @Suspended AsyncResponse asyncResponse) { validateNamespaceName(property, cluster, namespace); - validateNamespaceOperation(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS); - - // Validate that namespace exists, throws 404 if it doesn't exist - getNamespacePolicies(namespaceName); - - pulsar().getNamespaceService().getListOfTopics(namespaceName, mode) + validateNamespaceOperationAsync(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS) + // Validate that namespace exists, throws 404 if it doesn't exist + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenCompose(policies -> internalGetListOfTopics(policies, mode)) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { log.error("Failed to get topics list for namespace {}", namespaceName, ex); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 4fe4cf315607a..89efd8f83de8d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -99,12 +99,10 @@ public void getTopics(@PathParam("tenant") String tenant, @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode, @Suspended AsyncResponse asyncResponse) { validateNamespaceName(tenant, namespace); - validateNamespaceOperation(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_TOPICS); - - // Validate that namespace exists, throws 404 if it doesn't exist - getNamespacePolicies(namespaceName); - - pulsar().getNamespaceService().getListOfTopics(namespaceName, mode) + validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_TOPICS) + // Validate that namespace exists, throws 404 if it doesn't exist + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenCompose(policies -> internalGetListOfTopics(policies, mode)) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { log.error("Failed to get topics list for namespace {}", namespaceName, ex); From f8ce50fe124188fd3280f46a8f1daa1f6eabb7c8 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Mon, 11 Jul 2022 10:33:15 +0800 Subject: [PATCH 2/2] add validateNamespaceOperationAsync --- .../pulsar/broker/web/PulsarWebResource.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 20652d14a539c..da9a3f060abd2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -946,6 +946,29 @@ && pulsar().getBrokerService().isAuthorizationEnabled()) { } } + public CompletableFuture validateNamespaceOperationAsync(NamespaceName namespaceName, + NamespaceOperation operation) { + if (pulsar().getConfiguration().isAuthenticationEnabled() + && pulsar().getBrokerService().isAuthorizationEnabled()) { + if (!isClientAuthenticated(clientAppId())) { + return FutureUtil.failedFuture( + new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request")); + } + + return pulsar().getBrokerService().getAuthorizationService() + .allowNamespaceOperationAsync(namespaceName, operation, originalPrincipal(), + clientAppId(), clientAuthData()) + .thenAccept(isAuthorized -> { + if (!isAuthorized) { + throw new RestException(Status.FORBIDDEN, + String.format("Unauthorized to validateNamespaceOperation for" + + " operation [%s] on namespace [%s]", operation.toString(), namespaceName)); + } + }); + } + return CompletableFuture.completedFuture(null); + } + public void validateNamespacePolicyOperation(NamespaceName namespaceName, PolicyName policy, PolicyOperation operation) {