Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ public void loadNamespaceTopics(NamespaceBundle bundle) {
List<CompletableFuture<Topic>> persistentTopics = Lists.newArrayList();
long topicLoadStart = System.nanoTime();

for (String topic : getNamespaceService().getListOfTopics(nsName)) {
for (String topic : getNamespaceService().getListOfPersistentTopics(nsName)) {
try {
TopicName topicName = TopicName.get(topic);
if (bundle.includes(topicName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ protected void internalDeleteNamespace(boolean authoritative) {

boolean isEmpty;
try {
isEmpty = pulsar().getNamespaceService().getListOfTopics(namespaceName).isEmpty();
isEmpty = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).isEmpty();
} catch (Exception e) {
throw new RestException(e);
}
Expand Down Expand Up @@ -274,7 +274,7 @@ protected void internalDeleteNamespaceBundle(String bundleRange, boolean authori
NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
authoritative, true);
try {
List<String> topics = pulsar().getNamespaceService().getListOfTopics(namespaceName);
List<String> topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
for (String topic : topics) {
NamespaceBundle topicBundle = (NamespaceBundle) pulsar().getNamespaceService()
.getBundle(TopicName.get(topic));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,14 @@
*/
package org.apache.pulsar.broker.admin.v1;

import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

import java.util.List;
import java.util.Map;
import java.util.Set;

import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response.Status;

import com.google.common.collect.Lists;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
Expand All @@ -52,12 +39,23 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response.Status;
import java.util.List;
import java.util.Map;
import java.util.Set;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

@Path("/namespaces")
@Produces(MediaType.APPLICATION_JSON)
Expand Down Expand Up @@ -111,15 +109,16 @@ public List<String> getNamespacesForCluster(@PathParam("property") String proper
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
public List<String> getTopics(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode) {
validateAdminAccessForTenant(property);
validateNamespaceName(property, cluster, namespace);

// Validate that namespace exists, throws 404 if it doesn't exist
getNamespacePolicies(namespaceName);

try {
return pulsar().getNamespaceService().getListOfTopics(namespaceName);
return pulsar().getNamespaceService().getListOfTopics(namespaceName, mode);
} catch (Exception e) {
log.error("Failed to get topics list for namespace {}/{}/{}", property, cluster, namespace, e);
throw new RestException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
Expand Down Expand Up @@ -74,15 +75,16 @@ public List<String> getTenantNamespaces(@PathParam("tenant") String tenant) {
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public List<String> getTopics(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
@PathParam("namespace") String namespace,
@QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode) {
validateAdminAccessForTenant(tenant);
validateNamespaceName(tenant, namespace);

// Validate that namespace exists, throws 404 if it doesn't exist
getNamespacePolicies(namespaceName);

try {
return pulsar().getNamespaceService().getListOfTopics(namespaceName);
return pulsar().getNamespaceService().getListOfTopics(namespaceName, mode);
} catch (Exception e) {
log.error("Failed to get topics list for namespace {}", namespaceName, e);
throw new RestException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,9 @@
*/
package org.apache.pulsar.broker.namespace;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
import static org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData;
import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;

import java.net.URI;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import io.netty.channel.EventLoopGroup;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
Expand All @@ -53,17 +32,26 @@
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.NamespaceIsolationPolicy;
import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
Expand All @@ -77,8 +65,32 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import java.net.URI;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
import static org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData;
import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;

/**
* The <code>NamespaceService</code> provides resource ownership lookup as well as resource ownership claiming services
Expand Down Expand Up @@ -123,6 +135,8 @@ public enum AddressType {

public static final String NAMESPACE_ISOLATION_POLICIES = "namespaceIsolationPolicies";

private final ConcurrentOpenHashMap<ClusterData, PulsarClientImpl> namespaceClients;

/**
* Default constructor.
*
Expand All @@ -136,6 +150,7 @@ public NamespaceService(PulsarService pulsar) {
ServiceUnitZkUtils.initZK(pulsar.getLocalZkCache().getZooKeeper(), pulsar.getBrokerServiceUrl());
this.bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
this.ownershipCache = new OwnershipCache(pulsar, bundleFactory);
this.namespaceClients = new ConcurrentOpenHashMap<>();
}

public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic,
Expand Down Expand Up @@ -808,7 +823,26 @@ public ServiceUnitId getServiceUnitId(TopicName topicName) throws Exception {
return getBundle(topicName);
}

public List<String> getListOfTopics(NamespaceName namespaceName) throws Exception {
public List<String> getFullListOfTopics(NamespaceName namespaceName) throws Exception {
List<String> topics = getListOfPersistentTopics(namespaceName);
topics.addAll(getListOfNonPersistentTopics(namespaceName));
return topics;
}

public List<String> getListOfTopics(NamespaceName namespaceName, Mode mode)
throws Exception {
switch (mode) {
case ALL:
return getFullListOfTopics(namespaceName);
case NON_PERSISTENT:
return getListOfNonPersistentTopics(namespaceName);
case PERSISTENT:
default:
return getListOfPersistentTopics(namespaceName);
}
}

public List<String> getListOfPersistentTopics(NamespaceName namespaceName) throws Exception {
List<String> topics = Lists.newArrayList();

// For every topic there will be a managed ledger created.
Expand All @@ -829,6 +863,86 @@ public List<String> getListOfTopics(NamespaceName namespaceName) throws Exceptio
return topics;
}

public List<String> getListOfNonPersistentTopics(NamespaceName namespaceName) throws Exception {
List<String> topics = Lists.newArrayList();

ClusterData peerClusterData;
try {
peerClusterData = PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName)
.get(cacheTimeOutInSec, SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException("Failed to contact peer replication cluster.", e);
}

// if peer-cluster-data is present it means namespace is owned by that peer-cluster and request should be
// redirect to the peer-cluster
if (peerClusterData != null) {
return getNonPersistentTopicsFromPeerCluster(peerClusterData, namespaceName);
}

// Non-persistent topics don't have managed ledgers so we have to retrieve them from local cache.
synchronized (pulsar.getBrokerService().getMultiLayerTopicMap()) {
if (pulsar.getBrokerService().getMultiLayerTopicMap().containsKey(namespaceName.toString())) {
pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString()).values()
.forEach(bundle -> {
bundle.forEach((topicName, topic) -> {
if (topic instanceof NonPersistentTopic && ((NonPersistentTopic)topic).isActive()) {
topics.add(topicName);
}
});
});
}
}

topics.sort(null);
return topics;
}

private List<String> getNonPersistentTopicsFromPeerCluster(ClusterData peerClusterData,
NamespaceName namespace) throws Exception {
PulsarClientImpl client = getNamespaceClient(peerClusterData);
return client.getLookup().getTopicsUnderNamespace(namespace, Mode.NON_PERSISTENT).get();
}


public PulsarClientImpl getNamespaceClient(ClusterData cluster) {
PulsarClientImpl client = namespaceClients.get(cluster);
if (client != null) {
return client;
}

return namespaceClients.computeIfAbsent(cluster, key -> {
try {
ClientBuilder clientBuilder = PulsarClient.builder()
.enableTcpNoDelay(false)
.statsInterval(0, TimeUnit.SECONDS);

if (pulsar.getConfiguration().isAuthenticationEnabled()) {
clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
}

if (pulsar.getConfiguration().isTlsEnabled()) {
clientBuilder
.serviceUrl(isNotBlank(cluster.getBrokerServiceUrlTls())
? cluster.getBrokerServiceUrlTls() : cluster.getServiceUrlTls())
.enableTls(true)
.tlsTrustCertsFilePath(pulsar.getConfiguration().getBrokerClientTrustCertsFilePath())
.allowTlsInsecureConnection(pulsar.getConfiguration().isTlsAllowInsecureConnection());
} else {
clientBuilder.serviceUrl(isNotBlank(cluster.getBrokerServiceUrl())
? cluster.getBrokerServiceUrl() : cluster.getServiceUrl());
}

// Share all the IO threads across broker and client connections
ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
return new PulsarClientImpl(conf, (EventLoopGroup)pulsar.getBrokerService().executor());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

public Optional<NamespaceEphemeralData> getOwner(NamespaceBundle bundle) throws Exception {
// if there is no znode for the service unit, it is not owned by any broker
return getOwnerAsync(bundle).get(cacheTimeOutInSec, SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
Expand Down Expand Up @@ -1182,11 +1183,13 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId)
protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
final long requestId = commandGetTopicsOfNamespace.getRequestId();
final String namespace = commandGetTopicsOfNamespace.getNamespace();
final CommandGetTopicsOfNamespace.Mode mode = commandGetTopicsOfNamespace.getMode();

try {
List<String> topics = getBrokerService().pulsar()
.getNamespaceService()
.getListOfTopics(NamespaceName.get(namespace));
final NamespaceName namespaceName = NamespaceName.get(namespace);

final List<String> topics = getBrokerService().pulsar().getNamespaceService()
.getListOfTopics(namespaceName, mode);

if (log.isDebugEnabled()) {
log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ protected void validateGlobalNamespaceOwnership(NamespaceName namespace) {
}
}

protected static CompletableFuture<ClusterData> checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService,
public static CompletableFuture<ClusterData> checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService,
NamespaceName namespace) {
if (!namespace.isGlobal()) {
return CompletableFuture.completedFuture(null);
Expand Down
Loading