diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index ca5ac5b968c9a..21451d6bbac0c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -567,7 +567,7 @@ public void loadNamespaceTopics(NamespaceBundle bundle) { List> persistentTopics = Lists.newArrayList(); long topicLoadStart = System.nanoTime(); - for (String topic : getNamespaceService().getListOfTopics(nsName)) { + for (String topic : getNamespaceService().getListOfPersistentTopics(nsName)) { try { TopicName topicName = TopicName.get(topic); if (bundle.includes(topicName)) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 0d7e0eeab95ec..4beba60cf3085 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -175,7 +175,7 @@ protected void internalDeleteNamespace(boolean authoritative) { boolean isEmpty; try { - isEmpty = pulsar().getNamespaceService().getListOfTopics(namespaceName).isEmpty(); + isEmpty = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).isEmpty(); } catch (Exception e) { throw new RestException(e); } @@ -274,7 +274,7 @@ protected void internalDeleteNamespaceBundle(String bundleRange, boolean authori NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true); try { - List topics = pulsar().getNamespaceService().getListOfTopics(namespaceName); + List topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName); for (String topic : topics) { NamespaceBundle topicBundle = (NamespaceBundle) pulsar().getNamespaceService() .getBundle(TopicName.get(topic)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index b08bfb5763ca3..42b89fcf15414 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -18,27 +18,14 @@ */ package org.apache.pulsar.broker.admin.v1; -import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; - -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response.Status; - +import com.google.common.collect.Lists; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; import org.apache.pulsar.broker.admin.impl.NamespacesBase; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; @@ -52,12 +39,23 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response.Status; +import java.util.List; +import java.util.Map; +import java.util.Set; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; +import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; @Path("/namespaces") @Produces(MediaType.APPLICATION_JSON) @@ -111,7 +109,8 @@ public List getNamespacesForCluster(@PathParam("property") String proper @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") }) public List getTopics(@PathParam("property") String property, - @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { + @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, + @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode) { validateAdminAccessForTenant(property); validateNamespaceName(property, cluster, namespace); @@ -119,7 +118,7 @@ public List getTopics(@PathParam("property") String property, getNamespacePolicies(namespaceName); try { - return pulsar().getNamespaceService().getListOfTopics(namespaceName); + return pulsar().getNamespaceService().getListOfTopics(namespaceName, mode); } catch (Exception e) { log.error("Failed to get topics list for namespace {}/{}/{}", property, cluster, namespace, e); throw new RestException(e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index f58ec1916c1c8..1e6e8c2ab299a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -36,6 +36,7 @@ import org.apache.pulsar.broker.admin.impl.NamespacesBase; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; @@ -74,7 +75,8 @@ public List getTenantNamespaces(@PathParam("tenant") String tenant) { @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") }) public List getTopics(@PathParam("tenant") String tenant, - @PathParam("namespace") String namespace) { + @PathParam("namespace") String namespace, + @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode) { validateAdminAccessForTenant(tenant); validateNamespaceName(tenant, namespace); @@ -82,7 +84,7 @@ public List getTopics(@PathParam("tenant") String tenant, getNamespacePolicies(namespaceName); try { - return pulsar().getNamespaceService().getListOfTopics(namespaceName); + return pulsar().getNamespaceService().getListOfTopics(namespaceName, mode); } catch (Exception e) { log.error("Failed to get topics list for namespace {}", namespaceName, e); throw new RestException(e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index d9a1ccf600177..ab8aef7368deb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -18,30 +18,9 @@ */ package org.apache.pulsar.broker.namespace; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static java.lang.String.format; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT; -import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath; -import static org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData; -import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec; - -import java.net.URI; -import java.net.URL; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - +import com.google.common.collect.Lists; +import com.google.common.hash.Hashing; +import io.netty.channel.EventLoopGroup; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarServerException; @@ -53,17 +32,26 @@ import org.apache.pulsar.broker.lookup.LookupResult; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; +import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.lookup.data.LookupData; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.NamespaceIsolationPolicy; import org.apache.pulsar.common.policies.data.BrokerAssignment; import org.apache.pulsar.common.policies.data.BundlesData; +import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; @@ -77,8 +65,32 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; -import com.google.common.hash.Hashing; +import java.net.URI; +import java.net.URL; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static java.lang.String.format; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT; +import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath; +import static org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData; +import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec; /** * The NamespaceService provides resource ownership lookup as well as resource ownership claiming services @@ -123,6 +135,8 @@ public enum AddressType { public static final String NAMESPACE_ISOLATION_POLICIES = "namespaceIsolationPolicies"; + private final ConcurrentOpenHashMap namespaceClients; + /** * Default constructor. * @@ -136,6 +150,7 @@ public NamespaceService(PulsarService pulsar) { ServiceUnitZkUtils.initZK(pulsar.getLocalZkCache().getZooKeeper(), pulsar.getBrokerServiceUrl()); this.bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32()); this.ownershipCache = new OwnershipCache(pulsar, bundleFactory); + this.namespaceClients = new ConcurrentOpenHashMap<>(); } public CompletableFuture> getBrokerServiceUrlAsync(TopicName topic, @@ -808,7 +823,26 @@ public ServiceUnitId getServiceUnitId(TopicName topicName) throws Exception { return getBundle(topicName); } - public List getListOfTopics(NamespaceName namespaceName) throws Exception { + public List getFullListOfTopics(NamespaceName namespaceName) throws Exception { + List topics = getListOfPersistentTopics(namespaceName); + topics.addAll(getListOfNonPersistentTopics(namespaceName)); + return topics; + } + + public List getListOfTopics(NamespaceName namespaceName, Mode mode) + throws Exception { + switch (mode) { + case ALL: + return getFullListOfTopics(namespaceName); + case NON_PERSISTENT: + return getListOfNonPersistentTopics(namespaceName); + case PERSISTENT: + default: + return getListOfPersistentTopics(namespaceName); + } + } + + public List getListOfPersistentTopics(NamespaceName namespaceName) throws Exception { List topics = Lists.newArrayList(); // For every topic there will be a managed ledger created. @@ -829,6 +863,86 @@ public List getListOfTopics(NamespaceName namespaceName) throws Exceptio return topics; } + public List getListOfNonPersistentTopics(NamespaceName namespaceName) throws Exception { + List topics = Lists.newArrayList(); + + ClusterData peerClusterData; + try { + peerClusterData = PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName) + .get(cacheTimeOutInSec, SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException("Failed to contact peer replication cluster.", e); + } + + // if peer-cluster-data is present it means namespace is owned by that peer-cluster and request should be + // redirect to the peer-cluster + if (peerClusterData != null) { + return getNonPersistentTopicsFromPeerCluster(peerClusterData, namespaceName); + } + + // Non-persistent topics don't have managed ledgers so we have to retrieve them from local cache. + synchronized (pulsar.getBrokerService().getMultiLayerTopicMap()) { + if (pulsar.getBrokerService().getMultiLayerTopicMap().containsKey(namespaceName.toString())) { + pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString()).values() + .forEach(bundle -> { + bundle.forEach((topicName, topic) -> { + if (topic instanceof NonPersistentTopic && ((NonPersistentTopic)topic).isActive()) { + topics.add(topicName); + } + }); + }); + } + } + + topics.sort(null); + return topics; + } + + private List getNonPersistentTopicsFromPeerCluster(ClusterData peerClusterData, + NamespaceName namespace) throws Exception { + PulsarClientImpl client = getNamespaceClient(peerClusterData); + return client.getLookup().getTopicsUnderNamespace(namespace, Mode.NON_PERSISTENT).get(); + } + + + public PulsarClientImpl getNamespaceClient(ClusterData cluster) { + PulsarClientImpl client = namespaceClients.get(cluster); + if (client != null) { + return client; + } + + return namespaceClients.computeIfAbsent(cluster, key -> { + try { + ClientBuilder clientBuilder = PulsarClient.builder() + .enableTcpNoDelay(false) + .statsInterval(0, TimeUnit.SECONDS); + + if (pulsar.getConfiguration().isAuthenticationEnabled()) { + clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(), + pulsar.getConfiguration().getBrokerClientAuthenticationParameters()); + } + + if (pulsar.getConfiguration().isTlsEnabled()) { + clientBuilder + .serviceUrl(isNotBlank(cluster.getBrokerServiceUrlTls()) + ? cluster.getBrokerServiceUrlTls() : cluster.getServiceUrlTls()) + .enableTls(true) + .tlsTrustCertsFilePath(pulsar.getConfiguration().getBrokerClientTrustCertsFilePath()) + .allowTlsInsecureConnection(pulsar.getConfiguration().isTlsAllowInsecureConnection()); + } else { + clientBuilder.serviceUrl(isNotBlank(cluster.getBrokerServiceUrl()) + ? cluster.getBrokerServiceUrl() : cluster.getServiceUrl()); + } + + // Share all the IO threads across broker and client connections + ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData(); + return new PulsarClientImpl(conf, (EventLoopGroup)pulsar.getBrokerService().executor()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + public Optional getOwner(NamespaceBundle bundle) throws Exception { // if there is no znode for the service unit, it is not owned by any broker return getOwnerAsync(bundle).get(cacheTimeOutInSec, SECONDS); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index aacd7bcbf6f5d..51c3ee01a462c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -53,6 +53,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.authentication.AuthenticationDataCommand; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; @@ -1182,11 +1183,13 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) { final long requestId = commandGetTopicsOfNamespace.getRequestId(); final String namespace = commandGetTopicsOfNamespace.getNamespace(); + final CommandGetTopicsOfNamespace.Mode mode = commandGetTopicsOfNamespace.getMode(); try { - List topics = getBrokerService().pulsar() - .getNamespaceService() - .getListOfTopics(NamespaceName.get(namespace)); + final NamespaceName namespaceName = NamespaceName.get(namespace); + + final List topics = getBrokerService().pulsar().getNamespaceService() + .getListOfTopics(namespaceName, mode); if (log.isDebugEnabled()) { log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index c13b12fd168ee..4b465b7b605a1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -616,7 +616,7 @@ protected void validateGlobalNamespaceOwnership(NamespaceName namespace) { } } - protected static CompletableFuture checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService, + public static CompletableFuture checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService, NamespaceName namespace) { if (!namespace.isGlobal()) { return CompletableFuture.completedFuture(null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index a12c83c70085d..6d56279fe0ff8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import java.util.stream.IntStream; +import java.util.stream.Stream; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.client.api.Consumer; @@ -37,6 +38,7 @@ import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.TenantInfo; import org.slf4j.Logger; @@ -74,7 +76,8 @@ public void testPatternTopicsSubscribeWithBuilderFail() throws Exception { final String topicName1 = "persistent://my-property/my-ns/topic-1-" + key; final String topicName2 = "persistent://my-property/my-ns/topic-2-" + key; final String topicName3 = "persistent://my-property/my-ns/topic-3-" + key; - List topicNames = Lists.newArrayList(topicName1, topicName2, topicName3); + final String topicName4 = "non-persistent://my-property/my-ns/topic-4-" + key; + List topicNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4); final String patternString = "persistent://my-property/my-ns/pattern-topic.*"; Pattern pattern = Pattern.compile(patternString); @@ -127,13 +130,14 @@ public void testPatternTopicsSubscribeWithBuilderFail() throws Exception { // verify consumer create success, and works well. @Test(timeOut = testTimeout) - public void testBinaryProtoToGetTopicsOfNamespace() throws Exception { + public void testBinaryProtoToGetTopicsOfNamespacePersistent() throws Exception { String key = "BinaryProtoToGetTopics"; String subscriptionName = "my-ex-subscription-" + key; String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key; String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key; String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key; - Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*"); + String topicName4 = "non-persistent://my-property/my-ns/pattern-topic-4-" + key; + Pattern pattern = Pattern.compile("my-property/my-ns/pattern-topic.*"); // 1. create partition admin.tenants().createTenant("prop", new TenantInfo()); @@ -156,6 +160,9 @@ public void testBinaryProtoToGetTopicsOfNamespace() throws Exception { .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); + Producer producer4 = pulsarClient.newProducer().topic(topicName4) + .enableBatching(false) + .create(); Consumer consumer = pulsarClient.newConsumer() .topicsPattern(pattern) @@ -188,6 +195,97 @@ public void testBinaryProtoToGetTopicsOfNamespace() throws Exception { producer1.send((messagePredicate + "producer1-" + i).getBytes()); producer2.send((messagePredicate + "producer2-" + i).getBytes()); producer3.send((messagePredicate + "producer3-" + i).getBytes()); + producer4.send((messagePredicate + "producer4-" + i).getBytes()); + } + + // 6. should receive all the message + int messageSet = 0; + Message message = consumer.receive(); + do { + assertTrue(message instanceof TopicMessageImpl); + messageSet ++; + consumer.acknowledge(message); + log.debug("Consumer acknowledged : " + new String(message.getData())); + message = consumer.receive(500, TimeUnit.MILLISECONDS); + } while (message != null); + assertEquals(messageSet, totalMessages); + + consumer.unsubscribe(); + consumer.close(); + producer1.close(); + producer2.close(); + producer3.close(); + producer4.close(); + } + + // verify consumer create success, and works well. + @Test(timeOut = testTimeout) + public void testBinaryProtoToGetTopicsOfNamespaceNonPersistent() throws Exception { + String key = "BinaryProtoToGetTopics"; + String subscriptionName = "my-ex-subscription-" + key; + String topicName1 = "persistent://my-property/my-ns/np-pattern-topic-1-" + key; + String topicName2 = "persistent://my-property/my-ns/np-pattern-topic-2-" + key; + String topicName3 = "persistent://my-property/my-ns/np-pattern-topic-3-" + key; + String topicName4 = "non-persistent://my-property/my-ns/np-pattern-topic-4-" + key; + Pattern pattern = Pattern.compile("my-property/my-ns/np-pattern-topic.*"); + + // 1. create partition + admin.tenants().createTenant("prop", new TenantInfo()); + admin.topics().createPartitionedTopic(topicName2, 2); + admin.topics().createPartitionedTopic(topicName3, 3); + + // 2. create producer + String messagePredicate = "my-message-" + key + "-"; + int totalMessages = 40; + + Producer producer1 = pulsarClient.newProducer().topic(topicName1) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + Producer producer2 = pulsarClient.newProducer().topic(topicName2) + .enableBatching(false) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + Producer producer3 = pulsarClient.newProducer().topic(topicName3) + .enableBatching(false) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + Producer producer4 = pulsarClient.newProducer().topic(topicName4) + .enableBatching(false) + .create(); + + Consumer consumer = pulsarClient.newConsumer() + .topicsPattern(pattern) + .patternAutoDiscoveryPeriod(2) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .subscriptionTopicsMode(PulsarApi.CommandGetTopicsOfNamespace.Mode.NON_PERSISTENT) + .subscribe(); + + // 4. verify consumer get methods, to get right number of partitions and topics. + assertSame(pattern, ((PatternMultiTopicsConsumerImpl) consumer).getPattern()); + List topics = ((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics(); + List> consumers = ((PatternMultiTopicsConsumerImpl) consumer).getConsumers(); + + assertEquals(topics.size(), 1); + assertEquals(consumers.size(), 1); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getTopics().size(), 1); + + topics.forEach(topic -> log.debug("topic: {}", topic)); + consumers.forEach(c -> log.debug("consumer: {}", c.getTopic())); + + IntStream.range(0, topics.size()).forEach(index -> + assertTrue(topics.get(index).equals(consumers.get(index).getTopic()))); + + ((PatternMultiTopicsConsumerImpl) consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic)); + + // 5. produce data + for (int i = 0; i < totalMessages / 4; i++) { + producer1.send((messagePredicate + "producer1-" + i).getBytes()); + producer2.send((messagePredicate + "producer2-" + i).getBytes()); + producer3.send((messagePredicate + "producer3-" + i).getBytes()); + producer4.send((messagePredicate + "producer4-" + i).getBytes()); } // 6. should receive all the message @@ -200,6 +298,96 @@ public void testBinaryProtoToGetTopicsOfNamespace() throws Exception { log.debug("Consumer acknowledged : " + new String(message.getData())); message = consumer.receive(500, TimeUnit.MILLISECONDS); } while (message != null); + assertEquals(messageSet, totalMessages / 4); + + consumer.unsubscribe(); + consumer.close(); + producer1.close(); + producer2.close(); + producer3.close(); + producer4.close(); + } + + // verify consumer create success, and works well. + @Test(timeOut = testTimeout) + public void testBinaryProtoToGetTopicsOfNamespaceAll() throws Exception { + String key = "BinaryProtoToGetTopics"; + String subscriptionName = "my-ex-subscription-" + key; + String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key; + String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key; + String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key; + String topicName4 = "non-persistent://my-property/my-ns/pattern-topic-4-" + key; + Pattern pattern = Pattern.compile("my-property/my-ns/pattern-topic.*"); + + // 1. create partition + admin.tenants().createTenant("prop", new TenantInfo()); + admin.topics().createPartitionedTopic(topicName2, 2); + admin.topics().createPartitionedTopic(topicName3, 3); + + // 2. create producer + String messagePredicate = "my-message-" + key + "-"; + int totalMessages = 40; + + Producer producer1 = pulsarClient.newProducer().topic(topicName1) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + Producer producer2 = pulsarClient.newProducer().topic(topicName2) + .enableBatching(false) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + Producer producer3 = pulsarClient.newProducer().topic(topicName3) + .enableBatching(false) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + Producer producer4 = pulsarClient.newProducer().topic(topicName4) + .enableBatching(false) + .create(); + + Consumer consumer = pulsarClient.newConsumer() + .topicsPattern(pattern) + .patternAutoDiscoveryPeriod(2) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .subscriptionTopicsMode(PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .subscribe(); + + // 4. verify consumer get methods, to get right number of partitions and topics. + assertSame(pattern, ((PatternMultiTopicsConsumerImpl) consumer).getPattern()); + List topics = ((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics(); + List> consumers = ((PatternMultiTopicsConsumerImpl) consumer).getConsumers(); + + assertEquals(topics.size(), 7); + assertEquals(consumers.size(), 7); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getTopics().size(), 4); + + topics.forEach(topic -> log.debug("topic: {}", topic)); + consumers.forEach(c -> log.debug("consumer: {}", c.getTopic())); + + IntStream.range(0, topics.size()).forEach(index -> + assertTrue(topics.get(index).equals(consumers.get(index).getTopic()))); + + ((PatternMultiTopicsConsumerImpl) consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic)); + + // 5. produce data + for (int i = 0; i < totalMessages / 4; i++) { + producer1.send((messagePredicate + "producer1-" + i).getBytes()); + producer2.send((messagePredicate + "producer2-" + i).getBytes()); + producer3.send((messagePredicate + "producer3-" + i).getBytes()); + producer4.send((messagePredicate + "producer4-" + i).getBytes()); + } + + // 6. should receive all the message + int messageSet = 0; + Message message = consumer.receive(); + do { + assertTrue(message instanceof TopicMessageImpl); + messageSet++; + consumer.acknowledge(message); + log.debug("Consumer acknowledged : " + new String(message.getData())); + message = consumer.receive(500, TimeUnit.MILLISECONDS); + } while (message != null); assertEquals(messageSet, totalMessages); consumer.unsubscribe(); @@ -207,6 +395,7 @@ public void testBinaryProtoToGetTopicsOfNamespace() throws Exception { producer1.close(); producer2.close(); producer3.close(); + producer4.close(); } @Test(timeOut = testTimeout) @@ -214,8 +403,9 @@ public void testTopicsPatternFilter() throws Exception { String topicName1 = "persistent://my-property/my-ns/pattern-topic-1"; String topicName2 = "persistent://my-property/my-ns/pattern-topic-2"; String topicName3 = "persistent://my-property/my-ns/hello-3"; + String topicName4 = "non-persistent://my-property/my-ns/hello-4"; - List topicsNames = Lists.newArrayList(topicName1, topicName2, topicName3); + List topicsNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4); Pattern pattern1 = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*"); List result1 = PulsarClientImpl.topicsPatternFilter(topicsNames, pattern1); @@ -223,10 +413,8 @@ public void testTopicsPatternFilter() throws Exception { Pattern pattern2 = Pattern.compile("persistent://my-property/my-ns/.*"); List result2 = PulsarClientImpl.topicsPatternFilter(topicsNames, pattern2); - assertTrue(result2.size() == 3 && - result2.contains(topicName1) && - result2.contains(topicName2) && - result2.contains(topicName3)); + assertTrue(result2.size() == 4 + && Stream.of(topicName1, topicName2, topicName3, topicName4).allMatch(result2::contains)); } @Test(timeOut = testTimeout) @@ -536,7 +724,7 @@ public void testAutoUnbubscribePatternConsumer() throws Exception { // seems no direct way to verify auto-unsubscribe, because this patternConsumer also referenced the topic. List topicNames = Lists.newArrayList(topicName2); NamespaceService nss = pulsar.getNamespaceService(); - doReturn(topicNames).when(nss).getListOfTopics(NamespaceName.get("my-property/my-ns")); + doReturn(topicNames).when(nss).getListOfPersistentTopics(NamespaceName.get("my-property/my-ns")); // 7. call recheckTopics to unsubscribe topic 1,3 , verify topics number: 2=6-1-3 log.debug("recheck topics change"); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index d94175846ad34..05d8f14c59139 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -24,6 +24,8 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; + /** * {@link ConsumerBuilder} is used to configure and create instances of {@link Consumer}. * @@ -331,6 +333,14 @@ public interface ConsumerBuilder extends Cloneable { */ ConsumerBuilder subscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition); + /** + * Determines to which topics this consumer should be subscribed to - Persistent, Non-Persistent, or both. + * Only used with pattern subscriptions. + * + * @param mode Pattern subscription mode + */ + ConsumerBuilder subscriptionTopicsMode(Mode mode); + /** * Intercept {@link Consumer}. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index 29ae1f40f1c5e..e4cb7bdac6c69 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -38,6 +38,8 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.api.Commands; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse; import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType; import org.apache.pulsar.common.naming.NamespaceName; @@ -199,14 +201,14 @@ public String getServiceUrl() { } @Override - public CompletableFuture> getTopicsUnderNamespace(NamespaceName namespace) { + public CompletableFuture> getTopicsUnderNamespace(NamespaceName namespace, Mode mode) { CompletableFuture> topicsFuture = new CompletableFuture>(); AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs()); Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS, opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS, 0 , TimeUnit.MILLISECONDS); - getTopicsUnderNamespace(serviceAddress, namespace, backoff, opTimeoutMs, topicsFuture); + getTopicsUnderNamespace(serviceAddress, namespace, backoff, opTimeoutMs, topicsFuture, mode); return topicsFuture; } @@ -214,11 +216,12 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, NamespaceName namespace, Backoff backoff, AtomicLong remainingTime, - CompletableFuture> topicsFuture) { + CompletableFuture> topicsFuture, + Mode mode) { client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { long requestId = client.newRequestId(); ByteBuf request = Commands.newGetTopicsOfNamespaceRequest( - namespace.toString(), requestId); + namespace.toString(), requestId, mode); clientCnx.newGetTopicsOfNamespace(request, requestId).thenAccept(topicsList -> { if (log.isDebugEnabled()) { @@ -251,7 +254,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, log.warn("[namespace: {}] Could not get connection while getTopicsUnderNamespace -- Will try again in {} ms", namespace, nextDelay); remainingTime.addAndGet(-nextDelay); - getTopicsUnderNamespace(socketAddress, namespace, backoff, remainingTime, topicsFuture); + getTopicsUnderNamespace(socketAddress, namespace, backoff, remainingTime, topicsFuture, mode); }, nextDelay, TimeUnit.MILLISECONDS); return null; }); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 103bb5e847aec..b9622642f0121 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.ConsumerEventListener; import org.apache.pulsar.client.api.ConsumerInterceptor; import org.apache.pulsar.client.api.CryptoKeyReader; +import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException; @@ -44,7 +45,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; -import org.apache.pulsar.client.api.DeadLetterPolicy; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.util.FutureUtil; import com.google.common.collect.Lists; @@ -248,6 +249,12 @@ public ConsumerBuilder subscriptionInitialPosition(SubscriptionInitialPositio return this; } + @Override + public ConsumerBuilder subscriptionTopicsMode(Mode mode) { + conf.setSubscriptionTopicsMode(mode); + return this; + } + @Override public ConsumerBuilder intercept(ConsumerInterceptor... interceptors) { if (interceptorList == null) { @@ -264,6 +271,6 @@ public ConsumerBuilder deadLetterPolicy(DeadLetterPolicy deadLetterPolicy) { } public ConsumerConfigurationData getConf() { - return conf; - } + return conf; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index a427fbdf38038..3abdf2531cd0f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.NotFoundException; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; @@ -102,12 +103,13 @@ public String getServiceUrl() { } @Override - public CompletableFuture> getTopicsUnderNamespace(NamespaceName namespace) { + public CompletableFuture> getTopicsUnderNamespace(NamespaceName namespace, Mode mode) { CompletableFuture> future = new CompletableFuture<>(); - String format = namespace.isV2() ? "admin/v2/namespaces/%s/topics" : "admin/namespaces/%s/destinations"; + String format = namespace.isV2() + ? "admin/v2/namespaces/%s/topics?mode=%s" : "admin/namespaces/%s/destinations?mode=%s"; httpClient - .get(String.format(format, namespace), String[].class) + .get(String.format(format, namespace, mode.toString()), String[].class) .thenAccept(topics -> { List result = Lists.newArrayList(); // do not keep partition part of topic name diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java index 568c703309876..a2af17c3911b1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java @@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; @@ -75,6 +76,6 @@ public interface LookupService extends AutoCloseable { * @param namespace : namespace-name * @return */ - public CompletableFuture> getTopicsUnderNamespace(NamespaceName namespace); + public CompletableFuture> getTopicsUnderNamespace(NamespaceName namespace, Mode mode); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java index 3f6dfecdb30d9..a33b79059e584 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java @@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; @@ -44,6 +45,7 @@ public class PatternMultiTopicsConsumerImpl extends MultiTopicsConsumerImpl implements TimerTask { private final Pattern topicsPattern; private final TopicsChangedListener topicsChangeListener; + private final Mode subscriptionMode; private volatile Timeout recheckPatternTimeout = null; public PatternMultiTopicsConsumerImpl(Pattern topicsPattern, @@ -51,9 +53,10 @@ public PatternMultiTopicsConsumerImpl(Pattern topicsPattern, ConsumerConfigurationData conf, ExecutorService listenerExecutor, CompletableFuture> subscribeFuture, - Schema schema, ConsumerInterceptors interceptors) { + Schema schema, Mode subscriptionMode, ConsumerInterceptors interceptors) { super(client, conf, listenerExecutor, subscribeFuture, schema, interceptors); this.topicsPattern = topicsPattern; + this.subscriptionMode = subscriptionMode; if (this.namespaceName == null) { this.namespaceName = getNameSpaceFromPattern(topicsPattern); @@ -78,7 +81,7 @@ public void run(Timeout timeout) throws Exception { CompletableFuture recheckFuture = new CompletableFuture<>(); List> futures = Lists.newArrayListWithExpectedSize(2); - client.getLookup().getTopicsUnderNamespace(namespaceName).thenAccept(topics -> { + client.getLookup().getTopicsUnderNamespace(namespaceName, subscriptionMode).thenAccept(topics -> { if (log.isDebugEnabled()) { log.debug("Get topics under namespace {}, topics.size: {}", namespaceName.toString(), topics.size()); topics.forEach(topicName -> diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 5dd0e3d6f4c71..5752486f1a4d0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -66,6 +66,7 @@ import org.apache.pulsar.client.impl.schema.AutoSchema; import org.apache.pulsar.client.impl.schema.generic.GenericSchema; import org.apache.pulsar.client.util.ExecutorProvider; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; @@ -466,11 +467,12 @@ public CompletableFuture> patternTopicSubscribeAsync(ConsumerCo private CompletableFuture> patternTopicSubscribeAsync(ConsumerConfigurationData conf, Schema schema, ConsumerInterceptors interceptors) { String regex = conf.getTopicsPattern().pattern(); + Mode subscriptionMode = conf.getSubscriptionTopicsMode(); TopicName destination = TopicName.get(regex); NamespaceName namespaceName = destination.getNamespaceObject(); CompletableFuture> consumerSubscribedFuture = new CompletableFuture<>(); - lookup.getTopicsUnderNamespace(namespaceName) + lookup.getTopicsUnderNamespace(namespaceName, subscriptionMode) .thenAccept(topics -> { if (log.isDebugEnabled()) { log.debug("Get topics under namespace {}, topics.size: {}", namespaceName.toString(), topics.size()); @@ -485,7 +487,7 @@ private CompletableFuture> patternTopicSubscribeAsync(ConsumerCo conf, externalExecutorProvider.getExecutor(), consumerSubscribedFuture, - schema, interceptors); + schema, subscriptionMode, interceptors); synchronized (consumers) { consumers.put(consumer, Boolean.TRUE); @@ -503,11 +505,13 @@ private CompletableFuture> patternTopicSubscribeAsync(ConsumerCo // get topics that match 'topicsPattern' from original topics list // return result should contain only topic names, without partition part public static List topicsPatternFilter(List original, Pattern topicsPattern) { + final Pattern shortenedTopicsPattern = topicsPattern.toString().contains("://") + ? Pattern.compile(topicsPattern.toString().split("\\:\\/\\/")[1]) : topicsPattern; + return original.stream() - .filter(topic -> { - TopicName destinationName = TopicName.get(topic); - return topicsPattern.matcher(destinationName.toString()).matches(); - }) + .map(TopicName::get) + .map(TopicName::toString) + .filter(topic -> shortenedTopicsPattern.matcher(topic.split("\\:\\/\\/")[1]).matches()) .collect(Collectors.toList()); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index a0fd493494c20..7e92d1245d69b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -35,10 +35,11 @@ import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.ConsumerEventListener; import org.apache.pulsar.client.api.CryptoKeyReader; +import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.SubscriptionInitialPosition; -import org.apache.pulsar.client.api.DeadLetterPolicy; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; @Data public class ConsumerConfigurationData implements Serializable, Cloneable { @@ -83,6 +84,8 @@ public class ConsumerConfigurationData implements Serializable, Cloneable { private int patternAutoDiscoveryPeriod = 1; + private Mode subscriptionTopicsMode = Mode.PERSISTENT; + private DeadLetterPolicy deadLetterPolicy; @JsonIgnore diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java index c94482d0227fa..3d62cd3cfc828 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java @@ -54,6 +54,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow; import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId; import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse; import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic; import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse; @@ -738,9 +739,9 @@ public static ByteBuf newConsumerStatsResponse(CommandConsumerStatsResponse.Buil return res; } - public static ByteBuf newGetTopicsOfNamespaceRequest(String namespace, long requestId) { + public static ByteBuf newGetTopicsOfNamespaceRequest(String namespace, long requestId, Mode mode) { CommandGetTopicsOfNamespace.Builder topicsBuilder = CommandGetTopicsOfNamespace.newBuilder(); - topicsBuilder.setNamespace(namespace).setRequestId(requestId); + topicsBuilder.setNamespace(namespace).setRequestId(requestId).setMode(mode); CommandGetTopicsOfNamespace topicsCommand = topicsBuilder.build(); ByteBuf res = serializeWithSize( diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index c6ff6c1668b75..539085358b5cb 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -22680,6 +22680,10 @@ public interface CommandGetTopicsOfNamespaceOrBuilder // required string namespace = 2; boolean hasNamespace(); String getNamespace(); + + // optional .pulsar.proto.CommandGetTopicsOfNamespace.Mode mode = 3 [default = PERSISTENT]; + boolean hasMode(); + org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode getMode(); } public static final class CommandGetTopicsOfNamespace extends org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite @@ -22715,6 +22719,50 @@ public CommandGetTopicsOfNamespace getDefaultInstanceForType() { return defaultInstance; } + public enum Mode + implements org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.EnumLite { + PERSISTENT(0, 0), + NON_PERSISTENT(1, 1), + ALL(2, 2), + ; + + public static final int PERSISTENT_VALUE = 0; + public static final int NON_PERSISTENT_VALUE = 1; + public static final int ALL_VALUE = 2; + + + public final int getNumber() { return value; } + + public static Mode valueOf(int value) { + switch (value) { + case 0: return PERSISTENT; + case 1: return NON_PERSISTENT; + case 2: return ALL; + default: return null; + } + } + + public static org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.EnumLiteMap + internalGetValueMap() { + return internalValueMap; + } + private static org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.EnumLiteMap + internalValueMap = + new org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.EnumLiteMap() { + public Mode findValueByNumber(int number) { + return Mode.valueOf(number); + } + }; + + private final int value; + + private Mode(int index, int value) { + this.value = value; + } + + // @@protoc_insertion_point(enum_scope:pulsar.proto.CommandGetTopicsOfNamespace.Mode) + } + private int bitField0_; // required uint64 request_id = 1; public static final int REQUEST_ID_FIELD_NUMBER = 1; @@ -22758,9 +22806,20 @@ private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString getNamespac } } + // optional .pulsar.proto.CommandGetTopicsOfNamespace.Mode mode = 3 [default = PERSISTENT]; + public static final int MODE_FIELD_NUMBER = 3; + private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode mode_; + public boolean hasMode() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode getMode() { + return mode_; + } + private void initFields() { requestId_ = 0L; namespace_ = ""; + mode_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode.PERSISTENT; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -22793,6 +22852,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeBytes(2, getNamespaceBytes()); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeEnum(3, mode_.getNumber()); + } } private int memoizedSerializedSize = -1; @@ -22809,6 +22871,10 @@ public int getSerializedSize() { size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream .computeBytesSize(2, getNamespaceBytes()); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeEnumSize(3, mode_.getNumber()); + } memoizedSerializedSize = size; return size; } @@ -22926,6 +22992,8 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000001); namespace_ = ""; bitField0_ = (bitField0_ & ~0x00000002); + mode_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode.PERSISTENT; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -22967,6 +23035,10 @@ public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace to_bitField0_ |= 0x00000002; } result.namespace_ = namespace_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.mode_ = mode_; result.bitField0_ = to_bitField0_; return result; } @@ -22979,6 +23051,9 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandGet if (other.hasNamespace()) { setNamespace(other.getNamespace()); } + if (other.hasMode()) { + setMode(other.getMode()); + } return this; } @@ -23026,6 +23101,15 @@ public Builder mergeFrom( namespace_ = input.readBytes(); break; } + case 24: { + int rawValue = input.readEnum(); + org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode value = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode.valueOf(rawValue); + if (value != null) { + bitField0_ |= 0x00000004; + mode_ = value; + } + break; + } } } } @@ -23089,6 +23173,30 @@ void setNamespace(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString v } + // optional .pulsar.proto.CommandGetTopicsOfNamespace.Mode mode = 3 [default = PERSISTENT]; + private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode mode_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode.PERSISTENT; + public boolean hasMode() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode getMode() { + return mode_; + } + public Builder setMode(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000004; + mode_ = value; + + return this; + } + public Builder clearMode() { + bitField0_ = (bitField0_ & ~0x00000004); + mode_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode.PERSISTENT; + + return this; + } + // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandGetTopicsOfNamespace) } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index c50f0728e4961..8753aaf545c3a 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -504,8 +504,14 @@ message CommandGetLastMessageIdResponse { } message CommandGetTopicsOfNamespace { + enum Mode { + PERSISTENT = 0; + NON_PERSISTENT = 1; + ALL = 2; + } required uint64 request_id = 1; required string namespace = 2; + optional Mode mode = 3 [default = PERSISTENT]; } message CommandGetTopicsOfNamespaceResponse { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java index 957cff8d93253..0072dc5162e8b 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java @@ -301,13 +301,15 @@ private void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTo serviceUrl = this.connectWithTLS ? service.getConfiguration().getBrokerServiceURLTLS() : service.getConfiguration().getBrokerServiceURL(); } - performGetTopicsOfNamespace(clientRequestId, commandGetTopicsOfNamespace.getNamespace(), serviceUrl, 10); + performGetTopicsOfNamespace(clientRequestId, commandGetTopicsOfNamespace.getNamespace(), serviceUrl, 10, + commandGetTopicsOfNamespace.getMode()); } private void performGetTopicsOfNamespace(long clientRequestId, String namespaceName, String brokerServiceUrl, - int numberOfRetries) { + int numberOfRetries, + CommandGetTopicsOfNamespace.Mode mode) { if (numberOfRetries == 0) { proxyConnection.ctx().writeAndFlush(Commands.newError(clientRequestId, ServerError.ServiceNotReady, "Reached max number of redirections")); @@ -332,7 +334,7 @@ private void performGetTopicsOfNamespace(long clientRequestId, // Connected to backend broker long requestId = proxyConnection.newRequestId(); ByteBuf command; - command = Commands.newGetTopicsOfNamespaceRequest(namespaceName, requestId); + command = Commands.newGetTopicsOfNamespaceRequest(namespaceName, requestId, mode); clientCnx.newGetTopicsOfNamespace(command, requestId).thenAccept(topicList -> proxyConnection.ctx().writeAndFlush( Commands.newGetTopicsOfNamespaceResponse(topicList, clientRequestId))