Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang.mutable.MutableObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
Expand All @@ -60,6 +61,7 @@
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
Expand Down Expand Up @@ -164,6 +166,48 @@ protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean auth
}
}

protected CompletableFuture<List<String>> internalGetListOfTopics(Policies policies,
CommandGetTopicsOfNamespace.Mode mode) {
switch (mode) {
case ALL:
return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)
.thenCombine(internalGetNonPersistentTopics(policies),
(persistentTopics, nonPersistentTopics) ->
ListUtils.union(persistentTopics, nonPersistentTopics));
case NON_PERSISTENT:
return internalGetNonPersistentTopics(policies);
case PERSISTENT:
default:
return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
}
}

protected CompletableFuture<List<String>> internalGetNonPersistentTopics(Policies policies) {
final List<CompletableFuture<List<String>>> futures = Lists.newArrayList();
final List<String> boundaries = policies.bundles.getBoundaries();
for (int i = 0; i < boundaries.size() - 1; i++) {
final String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1));
try {
futures.add(pulsar().getAdminClient().topics()
.getListInBundleAsync(namespaceName.toString(), bundle));
} catch (PulsarServerException e) {
throw new RestException(e);
}
}
return FutureUtil.waitForAll(futures)
.thenApply(__ -> {
final List<String> topics = Lists.newArrayList();
for (int i = 0; i < futures.size(); i++) {
List<String> topicList = futures.get(i).join();
if (topicList != null) {
topics.addAll(topicList);
}
}
return topics.stream().filter(name -> !TopicName.get(name).isPersistent())
.collect(Collectors.toList());
});
}

@SuppressWarnings("deprecation")
protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative) {
validateTenantOperation(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,10 @@ public void getTopics(@PathParam("property") String property,
@QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
@Suspended AsyncResponse asyncResponse) {
validateNamespaceName(property, cluster, namespace);
validateNamespaceOperation(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS);

// Validate that namespace exists, throws 404 if it doesn't exist
getNamespacePolicies(namespaceName);

pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
validateNamespaceOperationAsync(NamespaceName.get(property, namespace), NamespaceOperation.GET_TOPICS)
// Validate that namespace exists, throws 404 if it doesn't exist
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenCompose(policies -> internalGetListOfTopics(policies, mode))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("Failed to get topics list for namespace {}", namespaceName, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,10 @@ public void getTopics(@PathParam("tenant") String tenant,
@QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
@Suspended AsyncResponse asyncResponse) {
validateNamespaceName(tenant, namespace);
validateNamespaceOperation(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_TOPICS);

// Validate that namespace exists, throws 404 if it doesn't exist
getNamespacePolicies(namespaceName);

pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace), NamespaceOperation.GET_TOPICS)
// Validate that namespace exists, throws 404 if it doesn't exist
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenCompose(policies -> internalGetListOfTopics(policies, mode))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("Failed to get topics list for namespace {}", namespaceName, ex);
Expand Down