diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index 9b3ff40113d78..38f2ca366bb98 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -469,11 +469,6 @@ public void testStartEmptyPatternConsumer() throws Exception { .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); - List topicNames = Lists.newArrayList(topicName1, topicName2, topicName3); - NamespaceService nss = pulsar.getNamespaceService(); - doReturn(CompletableFuture.completedFuture(topicNames)).when(nss) - .getListOfPersistentTopics(NamespaceName.get("my-property/my-ns")); - // 5. call recheckTopics to subscribe each added topics above log.debug("recheck topics change"); PatternMultiTopicsConsumerImpl consumer1 = ((PatternMultiTopicsConsumerImpl) consumer); @@ -514,6 +509,40 @@ public void testStartEmptyPatternConsumer() throws Exception { producer3.close(); } + @Test(timeOut = testTimeout) + public void testAutoSubscribePatterConsumerFromBrokerWatcher() throws Exception { + String key = "AutoSubscribePatternConsumer"; + String subscriptionName = "my-ex-subscription-" + key; + + Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*"); + Consumer consumer = pulsarClient.newConsumer() + .topicsPattern(pattern) + // Disable automatic discovery. + .patternAutoDiscoveryPeriod(1000) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .receiverQueueSize(4) + .subscribe(); + + // 1. create partition + String topicName = "persistent://my-property/my-ns/pattern-topic-1-" + key; + TenantInfoImpl tenantInfo = createDefaultTenantInfo(); + admin.tenants().createTenant("prop", tenantInfo); + admin.topics().createPartitionedTopic(topicName, 4); + + // 2. verify consumer get methods. There is no need to trigger discovery, because the broker will push the + // changes to update(CommandWatchTopicUpdate). + assertSame(pattern, ((PatternMultiTopicsConsumerImpl) consumer).getPattern()); + Awaitility.await().untilAsserted(() -> { + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitions().size(), 4); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getConsumers().size(), 4); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 1); + }); + + consumer.close(); + } + // simulate subscribe a pattern which has 3 topics, but then matched topic added in. @Test(timeOut = testTimeout) public void testAutoSubscribePatternConsumer() throws Exception { @@ -590,11 +619,6 @@ public void testAutoSubscribePatternConsumer() throws Exception { .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); - List topicNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4); - NamespaceService nss = pulsar.getNamespaceService(); - doReturn(CompletableFuture.completedFuture(topicNames)).when(nss) - .getListOfPersistentTopics(NamespaceName.get("my-property/my-ns")); - // 7. call recheckTopics to subscribe each added topics above, verify topics number: 10=1+2+3+4 log.debug("recheck topics change"); PatternMultiTopicsConsumerImpl consumer1 = ((PatternMultiTopicsConsumerImpl) consumer); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java index 79a7c6a3ae62b..4433bba15d548 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java @@ -28,9 +28,11 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; @@ -203,9 +205,14 @@ public CompletableFuture onTopicsAdded(Collection addedTopics) { return addFuture; } + Set addTopicPartitionedName = addedTopics.stream() + .map(addTopicName -> TopicName.get(addTopicName).getPartitionedTopicName()) + .collect(Collectors.toSet()); + List> futures = Lists.newArrayListWithExpectedSize(partitionedTopics.size()); - addedTopics.stream().forEach(topic -> futures.add( - subscribeAsync(topic, false /* createTopicIfDoesNotExist */))); + addTopicPartitionedName.forEach(partitionedTopic -> futures.add( + subscribeAsync(partitionedTopic, + false /* createTopicIfDoesNotExist */))); FutureUtil.waitForAll(futures) .thenAccept(finalFuture -> addFuture.complete(null)) .exceptionally(ex -> {