From 25319411a07e1e1483d2c0991dcf1761019bf13d Mon Sep 17 00:00:00 2001 From: Gordeev Boris Date: Sun, 24 Jun 2018 22:37:43 +0300 Subject: [PATCH 1/7] Allowed non-pesistent topics to be retrieved along with persistent ones. --- .../broker/namespace/NamespaceService.java | 16 +++ .../impl/PatternTopicsConsumerImplTest.java | 101 +++++++++++++++++- 2 files changed, 115 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index bf44519e9b2ee..4eb93ecf7e857 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -53,6 +53,8 @@ import org.apache.pulsar.broker.lookup.LookupResult; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.naming.TopicName; @@ -825,6 +827,20 @@ public List getListOfTopics(NamespaceName namespaceName) throws Exceptio // NoNode means there are no persistent topics for this namespace } + // Non-persistent topics don't have managed ledges so we have to retrieve them from local cache. + synchronized (pulsar.getBrokerService().getMultiLayerTopicMap()) { + if (pulsar.getBrokerService().getMultiLayerTopicMap().containsKey(namespaceName.toString())) { + pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString()).values() + .forEach(bundle -> { + bundle.forEach((topicName, topic) -> { + if (topic instanceof NonPersistentTopic && ((NonPersistentTopic)topic).isActive()) { + topics.add(topicName); + } + }); + }); + } + } + topics.sort(null); return topics; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index a12c83c70085d..d88b4627ff98a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -74,7 +74,8 @@ public void testPatternTopicsSubscribeWithBuilderFail() throws Exception { final String topicName1 = "persistent://my-property/my-ns/topic-1-" + key; final String topicName2 = "persistent://my-property/my-ns/topic-2-" + key; final String topicName3 = "persistent://my-property/my-ns/topic-3-" + key; - List topicNames = Lists.newArrayList(topicName1, topicName2, topicName3); + final String topicName4 = "non-persistent://my-property/my-ns/topic-4-" + key; + List topicNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4); final String patternString = "persistent://my-property/my-ns/pattern-topic.*"; Pattern pattern = Pattern.compile(patternString); @@ -133,6 +134,7 @@ public void testBinaryProtoToGetTopicsOfNamespace() throws Exception { String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key; String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key; String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key; + final String topicName4 = "non-persistent://my-property/my-ns/pattern-topic-4-" + key; Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*"); // 1. create partition @@ -156,6 +158,9 @@ public void testBinaryProtoToGetTopicsOfNamespace() throws Exception { .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); + Producer producer4 = pulsarClient.newProducer().topic(topicName4) + .enableBatching(false) + .create(); Consumer consumer = pulsarClient.newConsumer() .topicsPattern(pattern) @@ -188,6 +193,7 @@ public void testBinaryProtoToGetTopicsOfNamespace() throws Exception { producer1.send((messagePredicate + "producer1-" + i).getBytes()); producer2.send((messagePredicate + "producer2-" + i).getBytes()); producer3.send((messagePredicate + "producer3-" + i).getBytes()); + producer4.send((messagePredicate + "producer4-" + i).getBytes()); } // 6. should receive all the message @@ -207,15 +213,106 @@ public void testBinaryProtoToGetTopicsOfNamespace() throws Exception { producer1.close(); producer2.close(); producer3.close(); + producer4.close(); } + // verify consumer create success, and works well. + @Test(timeOut = testTimeout) + public void testBinaryProtoToGetTopicsOfNamespaceNonPersistent() throws Exception { + String key = "BinaryProtoToGetTopics"; + String subscriptionName = "my-ex-subscription-" + key; + String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key; + String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key; + String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key; + final String topicName4 = "non-persistent://my-property/my-ns/pattern-topic-4-" + key; + Pattern pattern = Pattern.compile("non-persistent://my-property/my-ns/pattern-topic.*"); + + // 1. create partition + admin.tenants().createTenant("prop", new TenantInfo()); + admin.topics().createPartitionedTopic(topicName2, 2); + admin.topics().createPartitionedTopic(topicName3, 3); + + // 2. create producer + String messagePredicate = "my-message-" + key + "-"; + int totalMessages = 40; + + Producer producer1 = pulsarClient.newProducer().topic(topicName1) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + Producer producer2 = pulsarClient.newProducer().topic(topicName2) + .enableBatching(false) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + Producer producer3 = pulsarClient.newProducer().topic(topicName3) + .enableBatching(false) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + Producer producer4 = pulsarClient.newProducer().topic(topicName4) + .enableBatching(false) + .create(); + + Consumer consumer = pulsarClient.newConsumer() + .topicsPattern(pattern) + .patternAutoDiscoveryPeriod(2) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .subscribe(); + + // 4. verify consumer get methods, to get right number of partitions and topics. + assertSame(pattern, ((PatternMultiTopicsConsumerImpl) consumer).getPattern()); + List topics = ((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics(); + List> consumers = ((PatternMultiTopicsConsumerImpl) consumer).getConsumers(); + + assertEquals(topics.size(), 1); + assertEquals(consumers.size(), 1); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getTopics().size(), 1); + + topics.forEach(topic -> log.debug("topic: {}", topic)); + consumers.forEach(c -> log.debug("consumer: {}", c.getTopic())); + + IntStream.range(0, topics.size()).forEach(index -> + assertTrue(topics.get(index).equals(consumers.get(index).getTopic()))); + + ((PatternMultiTopicsConsumerImpl) consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic)); + + // 5. produce data + for (int i = 0; i < totalMessages / 4; i++) { + producer1.send((messagePredicate + "producer1-" + i).getBytes()); + producer2.send((messagePredicate + "producer2-" + i).getBytes()); + producer3.send((messagePredicate + "producer3-" + i).getBytes()); + producer4.send((messagePredicate + "producer4-" + i).getBytes()); + } + + // 6. should receive all the message + int messageSet = 0; + Message message = consumer.receive(); + do { + assertTrue(message instanceof TopicMessageImpl); + messageSet ++; + consumer.acknowledge(message); + log.debug("Consumer acknowledged : " + new String(message.getData())); + message = consumer.receive(500, TimeUnit.MILLISECONDS); + } while (message != null); + assertEquals(messageSet, totalMessages / 4); + + consumer.unsubscribe(); + consumer.close(); + producer1.close(); + producer2.close(); + producer3.close(); + producer4.close(); + } + @Test(timeOut = testTimeout) public void testTopicsPatternFilter() throws Exception { String topicName1 = "persistent://my-property/my-ns/pattern-topic-1"; String topicName2 = "persistent://my-property/my-ns/pattern-topic-2"; String topicName3 = "persistent://my-property/my-ns/hello-3"; + String topicName4 = "non-persistent://my-property/my-ns/pattern-topic-4"; - List topicsNames = Lists.newArrayList(topicName1, topicName2, topicName3); + List topicsNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4); Pattern pattern1 = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*"); List result1 = PulsarClientImpl.topicsPatternFilter(topicsNames, pattern1); From 6a2a446cbdfdc1cd3422aec0d822466c0ada1a28 Mon Sep 17 00:00:00 2001 From: Gordeev Boris Date: Thu, 28 Jun 2018 23:09:00 +0300 Subject: [PATCH 2/7] Updated Protobuf API to include optional "mode" field for namespaces request. --- .../pulsar/common/api/proto/PulsarApi.java | 108 ++++++++++++++++++ pulsar-common/src/main/proto/PulsarApi.proto | 6 + 2 files changed, 114 insertions(+) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index 5a6329be7b184..abbb255625dbb 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -22620,6 +22620,10 @@ public interface CommandGetTopicsOfNamespaceOrBuilder // required string namespace = 2; boolean hasNamespace(); String getNamespace(); + + // optional .pulsar.proto.CommandGetTopicsOfNamespace.Mode mode = 3 [default = PERSISTENT]; + boolean hasMode(); + org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode getMode(); } public static final class CommandGetTopicsOfNamespace extends org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite @@ -22655,6 +22659,50 @@ public CommandGetTopicsOfNamespace getDefaultInstanceForType() { return defaultInstance; } + public enum Mode + implements org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.EnumLite { + PERSISTENT(0, 0), + NON_PERSISTENT(1, 1), + ALL(2, 2), + ; + + public static final int PERSISTENT_VALUE = 0; + public static final int NON_PERSISTENT_VALUE = 1; + public static final int ALL_VALUE = 2; + + + public final int getNumber() { return value; } + + public static Mode valueOf(int value) { + switch (value) { + case 0: return PERSISTENT; + case 1: return NON_PERSISTENT; + case 2: return ALL; + default: return null; + } + } + + public static org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.EnumLiteMap + internalGetValueMap() { + return internalValueMap; + } + private static org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.EnumLiteMap + internalValueMap = + new org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.EnumLiteMap() { + public Mode findValueByNumber(int number) { + return Mode.valueOf(number); + } + }; + + private final int value; + + private Mode(int index, int value) { + this.value = value; + } + + // @@protoc_insertion_point(enum_scope:pulsar.proto.CommandGetTopicsOfNamespace.Mode) + } + private int bitField0_; // required uint64 request_id = 1; public static final int REQUEST_ID_FIELD_NUMBER = 1; @@ -22698,9 +22746,20 @@ private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString getNamespac } } + // optional .pulsar.proto.CommandGetTopicsOfNamespace.Mode mode = 3 [default = PERSISTENT]; + public static final int MODE_FIELD_NUMBER = 3; + private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode mode_; + public boolean hasMode() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode getMode() { + return mode_; + } + private void initFields() { requestId_ = 0L; namespace_ = ""; + mode_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode.PERSISTENT; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -22733,6 +22792,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeBytes(2, getNamespaceBytes()); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeEnum(3, mode_.getNumber()); + } } private int memoizedSerializedSize = -1; @@ -22749,6 +22811,10 @@ public int getSerializedSize() { size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream .computeBytesSize(2, getNamespaceBytes()); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeEnumSize(3, mode_.getNumber()); + } memoizedSerializedSize = size; return size; } @@ -22866,6 +22932,8 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000001); namespace_ = ""; bitField0_ = (bitField0_ & ~0x00000002); + mode_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode.PERSISTENT; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -22907,6 +22975,10 @@ public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace to_bitField0_ |= 0x00000002; } result.namespace_ = namespace_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.mode_ = mode_; result.bitField0_ = to_bitField0_; return result; } @@ -22919,6 +22991,9 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandGet if (other.hasNamespace()) { setNamespace(other.getNamespace()); } + if (other.hasMode()) { + setMode(other.getMode()); + } return this; } @@ -22966,6 +23041,15 @@ public Builder mergeFrom( namespace_ = input.readBytes(); break; } + case 24: { + int rawValue = input.readEnum(); + org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode value = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode.valueOf(rawValue); + if (value != null) { + bitField0_ |= 0x00000004; + mode_ = value; + } + break; + } } } } @@ -23029,6 +23113,30 @@ void setNamespace(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString v } + // optional .pulsar.proto.CommandGetTopicsOfNamespace.Mode mode = 3 [default = PERSISTENT]; + private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode mode_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode.PERSISTENT; + public boolean hasMode() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode getMode() { + return mode_; + } + public Builder setMode(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000004; + mode_ = value; + + return this; + } + public Builder clearMode() { + bitField0_ = (bitField0_ & ~0x00000004); + mode_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode.PERSISTENT; + + return this; + } + // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandGetTopicsOfNamespace) } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index d2ac4fd512496..bb6c7fb3fb248 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -502,8 +502,14 @@ message CommandGetLastMessageIdResponse { } message CommandGetTopicsOfNamespace { + enum Mode { + PERSISTENT = 0; + NON_PERSISTENT = 1; + ALL = 2; + } required uint64 request_id = 1; required string namespace = 2; + optional Mode mode = 3 [default = PERSISTENT]; } message CommandGetTopicsOfNamespaceResponse { From 8ac2dbff2f56cc77dc41466ff22aa9bf71279537 Mon Sep 17 00:00:00 2001 From: Gordeev Boris Date: Sat, 30 Jun 2018 18:50:53 +0300 Subject: [PATCH 3/7] Separated topics retrieval into two methods and fixed formatting in tests. --- .../apache/pulsar/broker/PulsarService.java | 2 +- .../broker/admin/impl/NamespacesBase.java | 4 +- .../pulsar/broker/admin/v1/Namespaces.java | 8 +- .../pulsar/broker/admin/v2/Namespaces.java | 2 +- .../broker/namespace/NamespaceService.java | 42 ++-- .../pulsar/broker/service/ServerCnx.java | 22 +- .../impl/PatternTopicsConsumerImplTest.java | 188 +++++++++--------- 7 files changed, 146 insertions(+), 122 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 484235fe275d1..62576965fb8e6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -559,7 +559,7 @@ public void loadNamespaceTopics(NamespaceBundle bundle) { List> persistentTopics = Lists.newArrayList(); long topicLoadStart = System.nanoTime(); - for (String topic : getNamespaceService().getListOfTopics(nsName)) { + for (String topic : getNamespaceService().getListOfPersistentTopics(nsName)) { try { TopicName topicName = TopicName.get(topic); if (bundle.includes(topicName)) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index b8a4c53a38435..7fe16c0e98323 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -174,7 +174,7 @@ protected void internalDeleteNamespace(boolean authoritative) { boolean isEmpty; try { - isEmpty = pulsar().getNamespaceService().getListOfTopics(namespaceName).isEmpty(); + isEmpty = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).isEmpty(); } catch (Exception e) { throw new RestException(e); } @@ -273,7 +273,7 @@ protected void internalDeleteNamespaceBundle(String bundleRange, boolean authori NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true); try { - List topics = pulsar().getNamespaceService().getListOfTopics(namespaceName); + List topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName); for (String topic : topics) { NamespaceBundle topicBundle = (NamespaceBundle) pulsar().getNamespaceService() .getBundle(TopicName.get(topic)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index bbc44386261c1..6879508914f24 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -18,13 +18,10 @@ */ package org.apache.pulsar.broker.admin.v1; -import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; -import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import javax.ws.rs.Consumes; @@ -40,10 +37,8 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response.Status; -import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.admin.impl.NamespacesBase; import org.apache.pulsar.broker.web.RestException; -import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; @@ -54,7 +49,6 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,7 +119,7 @@ public List getTopics(@PathParam("property") String property, getNamespacePolicies(namespaceName); try { - return pulsar().getNamespaceService().getListOfTopics(namespaceName); + return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName); } catch (Exception e) { log.error("Failed to get topics list for namespace {}/{}/{}", property, cluster, namespace, e); throw new RestException(e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 0fcd4aa6f9d72..205219aef8d63 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -82,7 +82,7 @@ public List getTopics(@PathParam("tenant") String tenant, getNamespacePolicies(namespaceName); try { - return pulsar().getNamespaceService().getListOfTopics(namespaceName); + return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName); } catch (Exception e) { log.error("Failed to get topics list for namespace {}", namespaceName, e); throw new RestException(e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 4eb93ecf7e857..5542788b5a44e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -24,6 +24,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT; import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace; import static org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData; import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec; @@ -810,7 +811,13 @@ public ServiceUnitId getServiceUnitId(TopicName topicName) throws Exception { return getBundle(topicName); } - public List getListOfTopics(NamespaceName namespaceName) throws Exception { + public List getFullListOfTopics(NamespaceName namespaceName) throws Exception { + List topics = getListOfPersistentTopics(namespaceName); + topics.addAll(getListOfNonPersistentTopics(namespaceName)); + return topics; + } + + public List getListOfPersistentTopics(NamespaceName namespaceName) throws Exception { List topics = Lists.newArrayList(); // For every topic there will be a managed ledger created. @@ -827,19 +834,26 @@ public List getListOfTopics(NamespaceName namespaceName) throws Exceptio // NoNode means there are no persistent topics for this namespace } - // Non-persistent topics don't have managed ledges so we have to retrieve them from local cache. - synchronized (pulsar.getBrokerService().getMultiLayerTopicMap()) { - if (pulsar.getBrokerService().getMultiLayerTopicMap().containsKey(namespaceName.toString())) { - pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString()).values() - .forEach(bundle -> { - bundle.forEach((topicName, topic) -> { - if (topic instanceof NonPersistentTopic && ((NonPersistentTopic)topic).isActive()) { - topics.add(topicName); - } - }); - }); - } - } + topics.sort(null); + return topics; + } + + public List getListOfNonPersistentTopics(NamespaceName namespaceName) { + List topics = Lists.newArrayList(); + + // Non-persistent topics don't have managed ledgers so we have to retrieve them from local cache. + synchronized (pulsar.getBrokerService().getMultiLayerTopicMap()) { + if (pulsar.getBrokerService().getMultiLayerTopicMap().containsKey(namespaceName.toString())) { + pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString()).values() + .forEach(bundle -> { + bundle.forEach((topicName, topic) -> { + if (topic instanceof NonPersistentTopic && ((NonPersistentTopic)topic).isActive()) { + topics.add(topicName); + } + }); + }); + } + } topics.sort(null); return topics; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index fcf40453f080e..2c876553dc7c3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -52,6 +52,7 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.authentication.AuthenticationDataCommand; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; @@ -1176,11 +1177,26 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) { final long requestId = commandGetTopicsOfNamespace.getRequestId(); final String namespace = commandGetTopicsOfNamespace.getNamespace(); + final CommandGetTopicsOfNamespace.Mode mode = commandGetTopicsOfNamespace.getMode(); try { - List topics = getBrokerService().pulsar() - .getNamespaceService() - .getListOfTopics(NamespaceName.get(namespace)); + final List topics; + final NamespaceService service = getBrokerService().pulsar().getNamespaceService(); + final NamespaceName namespaceName = NamespaceName.get(namespace); + + + switch (mode) { + case ALL: + topics = service.getFullListOfTopics(namespaceName); + break; + case NON_PERSISTENT: + topics = service.getListOfNonPersistentTopics(namespaceName); + break; + case PERSISTENT: + default: + topics = service.getListOfPersistentTopics(namespaceName); + break; + } if (log.isDebugEnabled()) { log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index d88b4627ff98a..e4bb47b61939d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -74,7 +74,7 @@ public void testPatternTopicsSubscribeWithBuilderFail() throws Exception { final String topicName1 = "persistent://my-property/my-ns/topic-1-" + key; final String topicName2 = "persistent://my-property/my-ns/topic-2-" + key; final String topicName3 = "persistent://my-property/my-ns/topic-3-" + key; - final String topicName4 = "non-persistent://my-property/my-ns/topic-4-" + key; + final String topicName4 = "non-persistent://my-property/my-ns/topic-4-" + key; List topicNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4); final String patternString = "persistent://my-property/my-ns/pattern-topic.*"; Pattern pattern = Pattern.compile(patternString); @@ -134,7 +134,7 @@ public void testBinaryProtoToGetTopicsOfNamespace() throws Exception { String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key; String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key; String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key; - final String topicName4 = "non-persistent://my-property/my-ns/pattern-topic-4-" + key; + String topicName4 = "non-persistent://my-property/my-ns/pattern-topic-4-" + key; Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*"); // 1. create partition @@ -158,9 +158,9 @@ public void testBinaryProtoToGetTopicsOfNamespace() throws Exception { .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); - Producer producer4 = pulsarClient.newProducer().topic(topicName4) - .enableBatching(false) - .create(); + Producer producer4 = pulsarClient.newProducer().topic(topicName4) + .enableBatching(false) + .create(); Consumer consumer = pulsarClient.newConsumer() .topicsPattern(pattern) @@ -193,7 +193,7 @@ public void testBinaryProtoToGetTopicsOfNamespace() throws Exception { producer1.send((messagePredicate + "producer1-" + i).getBytes()); producer2.send((messagePredicate + "producer2-" + i).getBytes()); producer3.send((messagePredicate + "producer3-" + i).getBytes()); - producer4.send((messagePredicate + "producer4-" + i).getBytes()); + producer4.send((messagePredicate + "producer4-" + i).getBytes()); } // 6. should receive all the message @@ -217,93 +217,93 @@ public void testBinaryProtoToGetTopicsOfNamespace() throws Exception { } // verify consumer create success, and works well. - @Test(timeOut = testTimeout) - public void testBinaryProtoToGetTopicsOfNamespaceNonPersistent() throws Exception { - String key = "BinaryProtoToGetTopics"; - String subscriptionName = "my-ex-subscription-" + key; - String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key; - String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key; - String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key; - final String topicName4 = "non-persistent://my-property/my-ns/pattern-topic-4-" + key; - Pattern pattern = Pattern.compile("non-persistent://my-property/my-ns/pattern-topic.*"); - - // 1. create partition - admin.tenants().createTenant("prop", new TenantInfo()); - admin.topics().createPartitionedTopic(topicName2, 2); - admin.topics().createPartitionedTopic(topicName3, 3); - - // 2. create producer - String messagePredicate = "my-message-" + key + "-"; - int totalMessages = 40; - - Producer producer1 = pulsarClient.newProducer().topic(topicName1) - .enableBatching(false) - .messageRoutingMode(MessageRoutingMode.SinglePartition) - .create(); - Producer producer2 = pulsarClient.newProducer().topic(topicName2) - .enableBatching(false) - .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) - .create(); - Producer producer3 = pulsarClient.newProducer().topic(topicName3) - .enableBatching(false) - .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) - .create(); - Producer producer4 = pulsarClient.newProducer().topic(topicName4) - .enableBatching(false) - .create(); - - Consumer consumer = pulsarClient.newConsumer() - .topicsPattern(pattern) - .patternAutoDiscoveryPeriod(2) - .subscriptionName(subscriptionName) - .subscriptionType(SubscriptionType.Shared) - .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) - .subscribe(); - - // 4. verify consumer get methods, to get right number of partitions and topics. - assertSame(pattern, ((PatternMultiTopicsConsumerImpl) consumer).getPattern()); - List topics = ((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics(); - List> consumers = ((PatternMultiTopicsConsumerImpl) consumer).getConsumers(); - - assertEquals(topics.size(), 1); - assertEquals(consumers.size(), 1); - assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getTopics().size(), 1); - - topics.forEach(topic -> log.debug("topic: {}", topic)); - consumers.forEach(c -> log.debug("consumer: {}", c.getTopic())); - - IntStream.range(0, topics.size()).forEach(index -> - assertTrue(topics.get(index).equals(consumers.get(index).getTopic()))); - - ((PatternMultiTopicsConsumerImpl) consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic)); - - // 5. produce data - for (int i = 0; i < totalMessages / 4; i++) { - producer1.send((messagePredicate + "producer1-" + i).getBytes()); - producer2.send((messagePredicate + "producer2-" + i).getBytes()); - producer3.send((messagePredicate + "producer3-" + i).getBytes()); - producer4.send((messagePredicate + "producer4-" + i).getBytes()); - } - - // 6. should receive all the message - int messageSet = 0; - Message message = consumer.receive(); - do { - assertTrue(message instanceof TopicMessageImpl); - messageSet ++; - consumer.acknowledge(message); - log.debug("Consumer acknowledged : " + new String(message.getData())); - message = consumer.receive(500, TimeUnit.MILLISECONDS); - } while (message != null); - assertEquals(messageSet, totalMessages / 4); - - consumer.unsubscribe(); - consumer.close(); - producer1.close(); - producer2.close(); - producer3.close(); - producer4.close(); - } + @Test(timeOut = testTimeout) + public void testBinaryProtoToGetTopicsOfNamespaceNonPersistent() throws Exception { + String key = "BinaryProtoToGetTopics"; + String subscriptionName = "my-ex-subscription-" + key; + String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key; + String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key; + String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key; + String topicName4 = "non-persistent://my-property/my-ns/pattern-topic-4-" + key; + Pattern pattern = Pattern.compile("non-persistent://my-property/my-ns/pattern-topic.*"); + + // 1. create partition + admin.tenants().createTenant("prop", new TenantInfo()); + admin.topics().createPartitionedTopic(topicName2, 2); + admin.topics().createPartitionedTopic(topicName3, 3); + + // 2. create producer + String messagePredicate = "my-message-" + key + "-"; + int totalMessages = 40; + + Producer producer1 = pulsarClient.newProducer().topic(topicName1) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + Producer producer2 = pulsarClient.newProducer().topic(topicName2) + .enableBatching(false) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + Producer producer3 = pulsarClient.newProducer().topic(topicName3) + .enableBatching(false) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + Producer producer4 = pulsarClient.newProducer().topic(topicName4) + .enableBatching(false) + .create(); + + Consumer consumer = pulsarClient.newConsumer() + .topicsPattern(pattern) + .patternAutoDiscoveryPeriod(2) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .subscribe(); + + // 4. verify consumer get methods, to get right number of partitions and topics. + assertSame(pattern, ((PatternMultiTopicsConsumerImpl) consumer).getPattern()); + List topics = ((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics(); + List> consumers = ((PatternMultiTopicsConsumerImpl) consumer).getConsumers(); + + assertEquals(topics.size(), 1); + assertEquals(consumers.size(), 1); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getTopics().size(), 1); + + topics.forEach(topic -> log.debug("topic: {}", topic)); + consumers.forEach(c -> log.debug("consumer: {}", c.getTopic())); + + IntStream.range(0, topics.size()).forEach(index -> + assertTrue(topics.get(index).equals(consumers.get(index).getTopic()))); + + ((PatternMultiTopicsConsumerImpl) consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic)); + + // 5. produce data + for (int i = 0; i < totalMessages / 4; i++) { + producer1.send((messagePredicate + "producer1-" + i).getBytes()); + producer2.send((messagePredicate + "producer2-" + i).getBytes()); + producer3.send((messagePredicate + "producer3-" + i).getBytes()); + producer4.send((messagePredicate + "producer4-" + i).getBytes()); + } + + // 6. should receive all the message + int messageSet = 0; + Message message = consumer.receive(); + do { + assertTrue(message instanceof TopicMessageImpl); + messageSet ++; + consumer.acknowledge(message); + log.debug("Consumer acknowledged : " + new String(message.getData())); + message = consumer.receive(500, TimeUnit.MILLISECONDS); + } while (message != null); + assertEquals(messageSet, totalMessages / 4); + + consumer.unsubscribe(); + consumer.close(); + producer1.close(); + producer2.close(); + producer3.close(); + producer4.close(); + } @Test(timeOut = testTimeout) public void testTopicsPatternFilter() throws Exception { @@ -633,7 +633,7 @@ public void testAutoUnbubscribePatternConsumer() throws Exception { // seems no direct way to verify auto-unsubscribe, because this patternConsumer also referenced the topic. List topicNames = Lists.newArrayList(topicName2); NamespaceService nss = pulsar.getNamespaceService(); - doReturn(topicNames).when(nss).getListOfTopics(NamespaceName.get("my-property/my-ns")); + doReturn(topicNames).when(nss).getListOfPersistentTopics(NamespaceName.get("my-property/my-ns")); // 7. call recheckTopics to unsubscribe topic 1,3 , verify topics number: 2=6-1-3 log.debug("recheck topics change"); From 0964211885ab5fd01a26a0b225a0645c355e455f Mon Sep 17 00:00:00 2001 From: Gordeev Boris Date: Sun, 8 Jul 2018 02:36:28 +0300 Subject: [PATCH 4/7] Implemented additional functionality to support requesting non-persistent topics. --- .../pulsar/broker/admin/v1/Namespaces.java | 51 ++++------ .../broker/namespace/NamespaceService.java | 96 +++++++++++++++++-- .../pulsar/broker/service/ServerCnx.java | 17 +--- .../pulsar/broker/web/PulsarWebResource.java | 2 +- .../pulsar/client/api/ConsumerBuilder.java | 10 ++ .../client/impl/BinaryProtoLookupService.java | 13 ++- .../client/impl/ConsumerBuilderImpl.java | 10 +- .../pulsar/client/impl/HttpLookupService.java | 8 +- .../pulsar/client/impl/LookupService.java | 5 +- .../impl/PatternMultiTopicsConsumerImpl.java | 8 +- .../pulsar/client/impl/PulsarClientImpl.java | 7 +- .../impl/conf/ConsumerConfigurationData.java | 3 + .../apache/pulsar/common/api/Commands.java | 5 +- 13 files changed, 161 insertions(+), 74 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 6879508914f24..20b0a7ad25417 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -18,46 +18,28 @@ */ package org.apache.pulsar.broker.admin.v1; -import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; - -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response.Status; - +import com.google.common.collect.Lists; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; import org.apache.pulsar.broker.admin.impl.NamespacesBase; import org.apache.pulsar.broker.web.RestException; -import org.apache.pulsar.common.policies.data.AuthAction; -import org.apache.pulsar.common.policies.data.BacklogQuota; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; +import org.apache.pulsar.common.policies.data.*; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; -import org.apache.pulsar.common.policies.data.BundlesData; -import org.apache.pulsar.common.policies.data.DispatchRate; -import org.apache.pulsar.common.policies.data.PersistencePolicies; -import org.apache.pulsar.common.policies.data.Policies; -import org.apache.pulsar.common.policies.data.RetentionPolicies; -import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; +import javax.ws.rs.*; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response.Status; +import java.util.List; +import java.util.Map; +import java.util.Set; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; +import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; @Path("/namespaces") @Produces(MediaType.APPLICATION_JSON) @@ -111,7 +93,8 @@ public List getNamespacesForCluster(@PathParam("property") String proper @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") }) public List getTopics(@PathParam("property") String property, - @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { + @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, + @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode) { validateAdminAccessForTenant(property); validateNamespaceName(property, cluster, namespace); @@ -119,7 +102,7 @@ public List getTopics(@PathParam("property") String property, getNamespacePolicies(namespaceName); try { - return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName); + return pulsar().getNamespaceService().getListOfTopics(namespaceName, mode); } catch (Exception e) { log.error("Failed to get topics list for namespace {}/{}/{}", property, cluster, namespace, e); throw new RestException(e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 5542788b5a44e..8c1db184357d6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -22,9 +22,17 @@ import static com.google.common.base.Preconditions.checkNotNull; import static java.lang.String.format; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT; import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath; -import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace; + +import io.netty.channel.EventLoopGroup; +import org.apache.pulsar.broker.web.PulsarWebResource; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import static org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData; import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec; @@ -36,7 +44,9 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; @@ -54,9 +64,9 @@ import org.apache.pulsar.broker.lookup.LookupResult; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; -import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -65,10 +75,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.policies.NamespaceIsolationPolicy; -import org.apache.pulsar.common.policies.data.BrokerAssignment; -import org.apache.pulsar.common.policies.data.BundlesData; -import org.apache.pulsar.common.policies.data.LocalPolicies; -import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; +import org.apache.pulsar.common.policies.data.*; import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -126,6 +133,8 @@ public enum AddressType { public static final String NAMESPACE_ISOLATION_POLICIES = "namespaceIsolationPolicies"; + private final ConcurrentOpenHashMap namespaceClients; + /** * Default constructor. * @@ -139,6 +148,7 @@ public NamespaceService(PulsarService pulsar) { ServiceUnitZkUtils.initZK(pulsar.getLocalZkCache().getZooKeeper(), pulsar.getBrokerServiceUrl()); this.bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32()); this.ownershipCache = new OwnershipCache(pulsar, bundleFactory); + this.namespaceClients = new ConcurrentOpenHashMap<>(); } public CompletableFuture> getBrokerServiceUrlAsync(TopicName topic, @@ -817,6 +827,19 @@ public List getFullListOfTopics(NamespaceName namespaceName) throws Exce return topics; } + public List getListOfTopics(NamespaceName namespaceName, Mode mode) + throws Exception { + switch (mode) { + case ALL: + return getFullListOfTopics(namespaceName); + case NON_PERSISTENT: + return getListOfNonPersistentTopics(namespaceName); + case PERSISTENT: + default: + return getListOfPersistentTopics(namespaceName); + } + } + public List getListOfPersistentTopics(NamespaceName namespaceName) throws Exception { List topics = Lists.newArrayList(); @@ -838,9 +861,23 @@ public List getListOfPersistentTopics(NamespaceName namespaceName) throw return topics; } - public List getListOfNonPersistentTopics(NamespaceName namespaceName) { + public List getListOfNonPersistentTopics(NamespaceName namespaceName) throws Exception { List topics = Lists.newArrayList(); + ClusterData peerClusterData; + try { + peerClusterData = PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName) + .get(cacheTimeOutInSec, SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException("Failed to contact peer replication cluster.", e); + } + + // if peer-cluster-data is present it means namespace is owned by that peer-cluster and request should be + // redirect to the peer-cluster + if (peerClusterData != null) { + return getNonPersistentTopicsFromPeerCluster(peerClusterData, namespaceName); + } + // Non-persistent topics don't have managed ledgers so we have to retrieve them from local cache. synchronized (pulsar.getBrokerService().getMultiLayerTopicMap()) { if (pulsar.getBrokerService().getMultiLayerTopicMap().containsKey(namespaceName.toString())) { @@ -859,6 +896,51 @@ public List getListOfNonPersistentTopics(NamespaceName namespaceName) { return topics; } + private List getNonPersistentTopicsFromPeerCluster(ClusterData peerClusterData, + NamespaceName namespace) throws Exception { + PulsarClientImpl client = getNamespaceClient(peerClusterData); + return client.getLookup().getTopicsUnderNamespace(namespace, Mode.NON_PERSISTENT).get(); + } + + + public PulsarClientImpl getNamespaceClient(ClusterData cluster) { + PulsarClientImpl client = namespaceClients.get(cluster); + if (client != null) { + return client; + } + + return namespaceClients.computeIfAbsent(cluster, key -> { + try { + ClientBuilder clientBuilder = PulsarClient.builder() + .enableTcpNoDelay(false) + .statsInterval(0, TimeUnit.SECONDS); + + if (pulsar.getConfiguration().isAuthenticationEnabled()) { + clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(), + pulsar.getConfiguration().getBrokerClientAuthenticationParameters()); + } + + if (pulsar.getConfiguration().isTlsEnabled()) { + clientBuilder + .serviceUrl(isNotBlank(cluster.getBrokerServiceUrlTls()) + ? cluster.getBrokerServiceUrlTls() : cluster.getServiceUrlTls()) + .enableTls(true) + .tlsTrustCertsFilePath(pulsar.getConfiguration().getBrokerClientTrustCertsFilePath()) + .allowTlsInsecureConnection(pulsar.getConfiguration().isTlsAllowInsecureConnection()); + } else { + clientBuilder.serviceUrl(isNotBlank(cluster.getBrokerServiceUrl()) + ? cluster.getBrokerServiceUrl() : cluster.getServiceUrl()); + } + + // Share all the IO threads across broker and client connections + ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData(); + return new PulsarClientImpl(conf, (EventLoopGroup)pulsar.getBrokerService().executor()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + public Optional getOwner(NamespaceBundle bundle) throws Exception { // if there is no znode for the service unit, it is not owned by any broker return getOwnerAsync(bundle).get(cacheTimeOutInSec, SECONDS); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 2c876553dc7c3..04fcbe23ed8b9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1180,23 +1180,10 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet final CommandGetTopicsOfNamespace.Mode mode = commandGetTopicsOfNamespace.getMode(); try { - final List topics; - final NamespaceService service = getBrokerService().pulsar().getNamespaceService(); final NamespaceName namespaceName = NamespaceName.get(namespace); - - switch (mode) { - case ALL: - topics = service.getFullListOfTopics(namespaceName); - break; - case NON_PERSISTENT: - topics = service.getListOfNonPersistentTopics(namespaceName); - break; - case PERSISTENT: - default: - topics = service.getListOfPersistentTopics(namespaceName); - break; - } + final List topics = getBrokerService().pulsar().getNamespaceService() + .getListOfTopics(namespaceName, mode); if (log.isDebugEnabled()) { log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 01dd7f91d42a6..d0b3bf4452c45 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -559,7 +559,7 @@ protected void validateGlobalNamespaceOwnership(NamespaceName namespace) { } } - protected static CompletableFuture checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService, + public static CompletableFuture checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService, NamespaceName namespace) { if (!namespace.isGlobal()) { return CompletableFuture.completedFuture(null); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index f0014a58f27ca..d96ef804dfe16 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -25,6 +25,8 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; + /** * {@link ConsumerBuilder} is used to configure and create instances of {@link Consumer}. * @@ -311,4 +313,12 @@ public interface ConsumerBuilder extends Serializable, Cloneable { * Set subscriptionInitialPosition for the consumer */ ConsumerBuilder subscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition); + + /** + * Determines to which topics this consumer should be subscribed to - Persistent, Non-Persistent, or both. + * Only used with pattern subscriptions. + * + * @param mode Pattern subscription mode + */ + ConsumerBuilder subscriptionTopicsMode(Mode mode); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index 6d8f67850c0d7..0e3f39ab39aa8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -34,6 +34,8 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.api.Commands; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse; import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType; import org.apache.pulsar.common.naming.TopicName; @@ -186,14 +188,14 @@ public String getServiceUrl() { } @Override - public CompletableFuture> getTopicsUnderNamespace(NamespaceName namespace) { + public CompletableFuture> getTopicsUnderNamespace(NamespaceName namespace, Mode mode) { CompletableFuture> topicsFuture = new CompletableFuture>(); AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs()); Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS, opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS, 0 , TimeUnit.MILLISECONDS); - getTopicsUnderNamespace(serviceAddress, namespace, backoff, opTimeoutMs, topicsFuture); + getTopicsUnderNamespace(serviceAddress, namespace, backoff, opTimeoutMs, topicsFuture, mode); return topicsFuture; } @@ -201,11 +203,12 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, NamespaceName namespace, Backoff backoff, AtomicLong remainingTime, - CompletableFuture> topicsFuture) { + CompletableFuture> topicsFuture, + Mode mode) { client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { long requestId = client.newRequestId(); ByteBuf request = Commands.newGetTopicsOfNamespaceRequest( - namespace.toString(), requestId); + namespace.toString(), requestId, mode); clientCnx.newGetTopicsOfNamespace(request, requestId).thenAccept(topicsList -> { if (log.isDebugEnabled()) { @@ -238,7 +241,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, log.warn("[namespace: {}] Could not get connection while getTopicsUnderNamespace -- Will try again in {} ms", namespace, nextDelay); remainingTime.addAndGet(-nextDelay); - getTopicsUnderNamespace(socketAddress, namespace, backoff, remainingTime, topicsFuture); + getTopicsUnderNamespace(socketAddress, namespace, backoff, remainingTime, topicsFuture, mode); }, nextDelay, TimeUnit.MILLISECONDS); return null; }); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index af11ef5945a0a..a2763164ff338 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -40,6 +40,8 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.util.FutureUtil; import com.google.common.collect.Lists; @@ -237,7 +239,13 @@ public ConsumerBuilder subscriptionInitialPosition(SubscriptionInitialPositio return this; } - public ConsumerConfigurationData getConf() { + @Override + public ConsumerBuilder subscriptionTopicsMode(Mode mode) { + conf.setSubscriptionTopicsMode(mode); + return this; + } + + public ConsumerConfigurationData getConf() { return conf; } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index a5b045571ccc4..367239750d9d1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -26,6 +26,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.lookup.data.LookupData; @@ -97,12 +98,13 @@ public String getServiceUrl() { } @Override - public CompletableFuture> getTopicsUnderNamespace(NamespaceName namespace) { + public CompletableFuture> getTopicsUnderNamespace(NamespaceName namespace, Mode mode) { CompletableFuture> future = new CompletableFuture<>(); - String format = namespace.isV2() ? "admin/v2/namespaces/%s/destinations" : "admin/namespaces/%s/destinations"; + String format = namespace.isV2() + ? "admin/v2/namespaces/%s/destinations" : "admin/namespaces/%s/destinations?mode=%s"; httpClient - .get(String.format(format, namespace), String[].class) + .get(String.format(format, namespace, mode.toString()), String[].class) .thenAccept(topics -> { List result = Lists.newArrayList(); // do not keep partition part of topic name diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java index aa00e54b3ce86..45c6e2fe6851f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java @@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; @@ -38,7 +39,7 @@ * * */ -interface LookupService extends AutoCloseable { +public interface LookupService extends AutoCloseable { /** * Calls broker lookup-api to get broker {@link InetSocketAddress} which serves namespace bundle that contains given @@ -71,6 +72,6 @@ interface LookupService extends AutoCloseable { * @param namespace : namespace-name * @return */ - public CompletableFuture> getTopicsUnderNamespace(NamespaceName namespace); + public CompletableFuture> getTopicsUnderNamespace(NamespaceName namespace, Mode mode); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java index d0b0c606c56e1..362985faef1fd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java @@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; @@ -44,6 +45,7 @@ public class PatternMultiTopicsConsumerImpl extends MultiTopicsConsumerImpl implements TimerTask { private final Pattern topicsPattern; private final TopicsChangedListener topicsChangeListener; + private final Mode subscriptionMode; private volatile Timeout recheckPatternTimeout = null; public PatternMultiTopicsConsumerImpl(Pattern topicsPattern, @@ -51,9 +53,11 @@ public PatternMultiTopicsConsumerImpl(Pattern topicsPattern, ConsumerConfigurationData conf, ExecutorService listenerExecutor, CompletableFuture> subscribeFuture, - Schema schema) { + Schema schema, + Mode subscriptionMode) { super(client, conf, listenerExecutor, subscribeFuture, schema); this.topicsPattern = topicsPattern; + this.subscriptionMode = subscriptionMode; if (this.namespaceName == null) { this.namespaceName = getNameSpaceFromPattern(topicsPattern); @@ -78,7 +82,7 @@ public void run(Timeout timeout) throws Exception { CompletableFuture recheckFuture = new CompletableFuture<>(); List> futures = Lists.newArrayListWithExpectedSize(2); - client.getLookup().getTopicsUnderNamespace(namespaceName).thenAccept(topics -> { + client.getLookup().getTopicsUnderNamespace(namespaceName, subscriptionMode).thenAccept(topics -> { if (log.isDebugEnabled()) { log.debug("Get topics under namespace {}, topics.size: {}", namespaceName.toString(), topics.size()); topics.forEach(topicName -> diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index c504aceaf81ce..c26a507e07435 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -59,6 +59,7 @@ import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; import org.apache.pulsar.client.util.ExecutorProvider; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; @@ -423,11 +424,12 @@ public CompletableFuture> patternTopicSubscribeAsync(ConsumerCo private CompletableFuture> patternTopicSubscribeAsync(ConsumerConfigurationData conf, Schema schema) { String regex = conf.getTopicsPattern().pattern(); + Mode subscriptionMode = conf.getSubscriptionTopicsMode(); TopicName destination = TopicName.get(regex); NamespaceName namespaceName = destination.getNamespaceObject(); CompletableFuture> consumerSubscribedFuture = new CompletableFuture<>(); - lookup.getTopicsUnderNamespace(namespaceName) + lookup.getTopicsUnderNamespace(namespaceName, subscriptionMode) .thenAccept(topics -> { if (log.isDebugEnabled()) { log.debug("Get topics under namespace {}, topics.size: {}", namespaceName.toString(), topics.size()); @@ -442,7 +444,8 @@ private CompletableFuture> patternTopicSubscribeAsync(ConsumerCo conf, externalExecutorProvider.getExecutor(), consumerSubscribedFuture, - schema); + schema, + subscriptionMode); synchronized (consumers) { consumers.put(consumer, Boolean.TRUE); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 47ede41de6000..fcf20fc1de4e9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; @Data public class ConsumerConfigurationData implements Serializable, Cloneable { @@ -82,6 +83,8 @@ public class ConsumerConfigurationData implements Serializable, Cloneable { private int patternAutoDiscoveryPeriod = 1; + private Mode subscriptionTopicsMode = Mode.PERSISTENT; + @JsonIgnore public String getSingleTopic() { checkArgument(topicNames.size() == 1); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java index 1fd38a0c1d08e..a3d402acfadfe 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java @@ -53,6 +53,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow; import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId; import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse; import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic; import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse; @@ -716,9 +717,9 @@ public static ByteBuf newConsumerStatsResponse(CommandConsumerStatsResponse.Buil return res; } - public static ByteBuf newGetTopicsOfNamespaceRequest(String namespace, long requestId) { + public static ByteBuf newGetTopicsOfNamespaceRequest(String namespace, long requestId, Mode mode) { CommandGetTopicsOfNamespace.Builder topicsBuilder = CommandGetTopicsOfNamespace.newBuilder(); - topicsBuilder.setNamespace(namespace).setRequestId(requestId); + topicsBuilder.setNamespace(namespace).setRequestId(requestId).setMode(mode); CommandGetTopicsOfNamespace topicsCommand = topicsBuilder.build(); ByteBuf res = serializeWithSize( From 17cbbecd2ef29a1017b2af48ae1116f4c6278ac6 Mon Sep 17 00:00:00 2001 From: Gordeev Boris Date: Wed, 8 Aug 2018 23:16:49 +0300 Subject: [PATCH 5/7] Changed code style concerning imports to adhere to recommended code style. --- .../pulsar/broker/admin/v1/Namespaces.java | 35 ++++++--- .../broker/namespace/NamespaceService.java | 78 ++++++++++--------- 2 files changed, 64 insertions(+), 49 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 3228d08c7a563..013d6a2b413b9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -23,25 +23,38 @@ import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; -import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; - -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.ws.rs.*; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response.Status; - import org.apache.pulsar.broker.admin.impl.NamespacesBase; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; -import org.apache.pulsar.common.policies.data.*; +import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; +import org.apache.pulsar.common.policies.data.BundlesData; +import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.PersistencePolicies; +import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response.Status; +import java.util.List; +import java.util.Map; +import java.util.Set; + import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; @Path("/namespaces") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 8c1db184357d6..ff4a0543b2374 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -18,41 +18,9 @@ */ package org.apache.pulsar.broker.namespace; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static java.lang.String.format; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.commons.lang3.StringUtils.isNotBlank; -import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT; -import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath; - +import com.google.common.collect.Lists; +import com.google.common.hash.Hashing; import io.netty.channel.EventLoopGroup; -import org.apache.pulsar.broker.web.PulsarWebResource; -import org.apache.pulsar.client.api.ClientBuilder; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.impl.ClientBuilderImpl; -import org.apache.pulsar.client.impl.PulsarClientImpl; -import org.apache.pulsar.client.impl.conf.ClientConfigurationData; -import static org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData; -import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec; - -import java.net.URI; -import java.net.URL; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarServerException; @@ -65,17 +33,27 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; +import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.lookup.data.LookupData; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.NamespaceIsolationPolicy; -import org.apache.pulsar.common.policies.data.*; +import org.apache.pulsar.common.policies.data.BrokerAssignment; +import org.apache.pulsar.common.policies.data.BundlesData; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.LocalPolicies; +import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -87,8 +65,32 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; -import com.google.common.hash.Hashing; +import java.net.URI; +import java.net.URL; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static java.lang.String.format; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT; +import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath; +import static org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData; +import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec; /** * The NamespaceService provides resource ownership lookup as well as resource ownership claiming services From a4cdf4bc3bb085e0c49f9de694e9b3e07dc55b9c Mon Sep 17 00:00:00 2001 From: Gordeev Boris Date: Sun, 26 Aug 2018 21:39:05 +0300 Subject: [PATCH 6/7] Fixed old broken tests. Fixed an issue with client impl. --- .../impl/PatternTopicsConsumerImplTest.java | 115 ++++++++++++++++-- .../pulsar/client/impl/PulsarClientImpl.java | 10 +- .../proxy/server/LookupProxyHandler.java | 8 +- 3 files changed, 114 insertions(+), 19 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index e4bb47b61939d..6d56279fe0ff8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import java.util.stream.IntStream; +import java.util.stream.Stream; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.client.api.Consumer; @@ -37,6 +38,7 @@ import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.TenantInfo; import org.slf4j.Logger; @@ -128,14 +130,14 @@ public void testPatternTopicsSubscribeWithBuilderFail() throws Exception { // verify consumer create success, and works well. @Test(timeOut = testTimeout) - public void testBinaryProtoToGetTopicsOfNamespace() throws Exception { + public void testBinaryProtoToGetTopicsOfNamespacePersistent() throws Exception { String key = "BinaryProtoToGetTopics"; String subscriptionName = "my-ex-subscription-" + key; String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key; String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key; String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key; String topicName4 = "non-persistent://my-property/my-ns/pattern-topic-4-" + key; - Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*"); + Pattern pattern = Pattern.compile("my-property/my-ns/pattern-topic.*"); // 1. create partition admin.tenants().createTenant("prop", new TenantInfo()); @@ -221,11 +223,11 @@ public void testBinaryProtoToGetTopicsOfNamespace() throws Exception { public void testBinaryProtoToGetTopicsOfNamespaceNonPersistent() throws Exception { String key = "BinaryProtoToGetTopics"; String subscriptionName = "my-ex-subscription-" + key; - String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key; - String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key; - String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key; - String topicName4 = "non-persistent://my-property/my-ns/pattern-topic-4-" + key; - Pattern pattern = Pattern.compile("non-persistent://my-property/my-ns/pattern-topic.*"); + String topicName1 = "persistent://my-property/my-ns/np-pattern-topic-1-" + key; + String topicName2 = "persistent://my-property/my-ns/np-pattern-topic-2-" + key; + String topicName3 = "persistent://my-property/my-ns/np-pattern-topic-3-" + key; + String topicName4 = "non-persistent://my-property/my-ns/np-pattern-topic-4-" + key; + Pattern pattern = Pattern.compile("my-property/my-ns/np-pattern-topic.*"); // 1. create partition admin.tenants().createTenant("prop", new TenantInfo()); @@ -258,6 +260,7 @@ public void testBinaryProtoToGetTopicsOfNamespaceNonPersistent() throws Exceptio .subscriptionName(subscriptionName) .subscriptionType(SubscriptionType.Shared) .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .subscriptionTopicsMode(PulsarApi.CommandGetTopicsOfNamespace.Mode.NON_PERSISTENT) .subscribe(); // 4. verify consumer get methods, to get right number of partitions and topics. @@ -305,12 +308,102 @@ public void testBinaryProtoToGetTopicsOfNamespaceNonPersistent() throws Exceptio producer4.close(); } + // verify consumer create success, and works well. + @Test(timeOut = testTimeout) + public void testBinaryProtoToGetTopicsOfNamespaceAll() throws Exception { + String key = "BinaryProtoToGetTopics"; + String subscriptionName = "my-ex-subscription-" + key; + String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key; + String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key; + String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key; + String topicName4 = "non-persistent://my-property/my-ns/pattern-topic-4-" + key; + Pattern pattern = Pattern.compile("my-property/my-ns/pattern-topic.*"); + + // 1. create partition + admin.tenants().createTenant("prop", new TenantInfo()); + admin.topics().createPartitionedTopic(topicName2, 2); + admin.topics().createPartitionedTopic(topicName3, 3); + + // 2. create producer + String messagePredicate = "my-message-" + key + "-"; + int totalMessages = 40; + + Producer producer1 = pulsarClient.newProducer().topic(topicName1) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + Producer producer2 = pulsarClient.newProducer().topic(topicName2) + .enableBatching(false) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + Producer producer3 = pulsarClient.newProducer().topic(topicName3) + .enableBatching(false) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + Producer producer4 = pulsarClient.newProducer().topic(topicName4) + .enableBatching(false) + .create(); + + Consumer consumer = pulsarClient.newConsumer() + .topicsPattern(pattern) + .patternAutoDiscoveryPeriod(2) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .subscriptionTopicsMode(PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .subscribe(); + + // 4. verify consumer get methods, to get right number of partitions and topics. + assertSame(pattern, ((PatternMultiTopicsConsumerImpl) consumer).getPattern()); + List topics = ((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics(); + List> consumers = ((PatternMultiTopicsConsumerImpl) consumer).getConsumers(); + + assertEquals(topics.size(), 7); + assertEquals(consumers.size(), 7); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getTopics().size(), 4); + + topics.forEach(topic -> log.debug("topic: {}", topic)); + consumers.forEach(c -> log.debug("consumer: {}", c.getTopic())); + + IntStream.range(0, topics.size()).forEach(index -> + assertTrue(topics.get(index).equals(consumers.get(index).getTopic()))); + + ((PatternMultiTopicsConsumerImpl) consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic)); + + // 5. produce data + for (int i = 0; i < totalMessages / 4; i++) { + producer1.send((messagePredicate + "producer1-" + i).getBytes()); + producer2.send((messagePredicate + "producer2-" + i).getBytes()); + producer3.send((messagePredicate + "producer3-" + i).getBytes()); + producer4.send((messagePredicate + "producer4-" + i).getBytes()); + } + + // 6. should receive all the message + int messageSet = 0; + Message message = consumer.receive(); + do { + assertTrue(message instanceof TopicMessageImpl); + messageSet++; + consumer.acknowledge(message); + log.debug("Consumer acknowledged : " + new String(message.getData())); + message = consumer.receive(500, TimeUnit.MILLISECONDS); + } while (message != null); + assertEquals(messageSet, totalMessages); + + consumer.unsubscribe(); + consumer.close(); + producer1.close(); + producer2.close(); + producer3.close(); + producer4.close(); + } + @Test(timeOut = testTimeout) public void testTopicsPatternFilter() throws Exception { String topicName1 = "persistent://my-property/my-ns/pattern-topic-1"; String topicName2 = "persistent://my-property/my-ns/pattern-topic-2"; String topicName3 = "persistent://my-property/my-ns/hello-3"; - String topicName4 = "non-persistent://my-property/my-ns/pattern-topic-4"; + String topicName4 = "non-persistent://my-property/my-ns/hello-4"; List topicsNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4); @@ -320,10 +413,8 @@ public void testTopicsPatternFilter() throws Exception { Pattern pattern2 = Pattern.compile("persistent://my-property/my-ns/.*"); List result2 = PulsarClientImpl.topicsPatternFilter(topicsNames, pattern2); - assertTrue(result2.size() == 3 && - result2.contains(topicName1) && - result2.contains(topicName2) && - result2.contains(topicName3)); + assertTrue(result2.size() == 4 + && Stream.of(topicName1, topicName2, topicName3, topicName4).allMatch(result2::contains)); } @Test(timeOut = testTimeout) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 469faf43117f8..ce6f69b0fa0ec 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -468,11 +468,13 @@ private CompletableFuture> patternTopicSubscribeAsync(ConsumerCo // get topics that match 'topicsPattern' from original topics list // return result should contain only topic names, without partition part public static List topicsPatternFilter(List original, Pattern topicsPattern) { + final Pattern shortenedTopicsPattern = topicsPattern.toString().contains("://") + ? Pattern.compile(topicsPattern.toString().split("\\:\\/\\/")[1]) : topicsPattern; + return original.stream() - .filter(topic -> { - TopicName destinationName = TopicName.get(topic); - return topicsPattern.matcher(destinationName.toString()).matches(); - }) + .map(TopicName::get) + .map(TopicName::toString) + .filter(topic -> shortenedTopicsPattern.matcher(topic.split("\\:\\/\\/")[1]).matches()) .collect(Collectors.toList()); } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java index 957cff8d93253..0072dc5162e8b 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java @@ -301,13 +301,15 @@ private void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTo serviceUrl = this.connectWithTLS ? service.getConfiguration().getBrokerServiceURLTLS() : service.getConfiguration().getBrokerServiceURL(); } - performGetTopicsOfNamespace(clientRequestId, commandGetTopicsOfNamespace.getNamespace(), serviceUrl, 10); + performGetTopicsOfNamespace(clientRequestId, commandGetTopicsOfNamespace.getNamespace(), serviceUrl, 10, + commandGetTopicsOfNamespace.getMode()); } private void performGetTopicsOfNamespace(long clientRequestId, String namespaceName, String brokerServiceUrl, - int numberOfRetries) { + int numberOfRetries, + CommandGetTopicsOfNamespace.Mode mode) { if (numberOfRetries == 0) { proxyConnection.ctx().writeAndFlush(Commands.newError(clientRequestId, ServerError.ServiceNotReady, "Reached max number of redirections")); @@ -332,7 +334,7 @@ private void performGetTopicsOfNamespace(long clientRequestId, // Connected to backend broker long requestId = proxyConnection.newRequestId(); ByteBuf command; - command = Commands.newGetTopicsOfNamespaceRequest(namespaceName, requestId); + command = Commands.newGetTopicsOfNamespaceRequest(namespaceName, requestId, mode); clientCnx.newGetTopicsOfNamespace(command, requestId).thenAccept(topicList -> proxyConnection.ctx().writeAndFlush( Commands.newGetTopicsOfNamespaceResponse(topicList, clientRequestId)) From b406f6f499fdeb891854be863f4b3472127bbcbe Mon Sep 17 00:00:00 2001 From: Gordeev Boris Date: Wed, 5 Sep 2018 00:35:07 +0300 Subject: [PATCH 7/7] Enabled http pattern lookup with v2 API. --- .../java/org/apache/pulsar/broker/admin/v2/Namespaces.java | 6 ++++-- .../org/apache/pulsar/client/impl/HttpLookupService.java | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 1acecb94d2648..7e5b4e0663316 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -36,6 +36,7 @@ import org.apache.pulsar.broker.admin.impl.NamespacesBase; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; @@ -74,7 +75,8 @@ public List getTenantNamespaces(@PathParam("tenant") String tenant) { @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") }) public List getTopics(@PathParam("tenant") String tenant, - @PathParam("namespace") String namespace) { + @PathParam("namespace") String namespace, + @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode) { validateAdminAccessForTenant(tenant); validateNamespaceName(tenant, namespace); @@ -82,7 +84,7 @@ public List getTopics(@PathParam("tenant") String tenant, getNamespacePolicies(namespaceName); try { - return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName); + return pulsar().getNamespaceService().getListOfTopics(namespaceName, mode); } catch (Exception e) { log.error("Failed to get topics list for namespace {}", namespaceName, e); throw new RestException(e); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index 2aa1f8816d09f..3abdf2531cd0f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -107,7 +107,7 @@ public CompletableFuture> getTopicsUnderNamespace(NamespaceName nam CompletableFuture> future = new CompletableFuture<>(); String format = namespace.isV2() - ? "admin/v2/namespaces/%s/topics" : "admin/namespaces/%s/destinations?mode=%s"; + ? "admin/v2/namespaces/%s/topics?mode=%s" : "admin/namespaces/%s/destinations?mode=%s"; httpClient .get(String.format(format, namespace, mode.toString()), String[].class) .thenAccept(topics -> {