From c011e05cf53ec56e3a475b3ce32d5dd45e303eca Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 21 Jul 2022 19:20:03 +0800 Subject: [PATCH 1/6] Fix topic name subscribe error. --- .../client/impl/PatternTopicsConsumerImplTest.java | 10 ---------- .../pulsar/client/impl/MultiTopicsConsumerImpl.java | 7 ++++--- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index 9b3ff40113d78..299fe1070f480 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -469,11 +469,6 @@ public void testStartEmptyPatternConsumer() throws Exception { .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); - List topicNames = Lists.newArrayList(topicName1, topicName2, topicName3); - NamespaceService nss = pulsar.getNamespaceService(); - doReturn(CompletableFuture.completedFuture(topicNames)).when(nss) - .getListOfPersistentTopics(NamespaceName.get("my-property/my-ns")); - // 5. call recheckTopics to subscribe each added topics above log.debug("recheck topics change"); PatternMultiTopicsConsumerImpl consumer1 = ((PatternMultiTopicsConsumerImpl) consumer); @@ -590,11 +585,6 @@ public void testAutoSubscribePatternConsumer() throws Exception { .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); - List topicNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4); - NamespaceService nss = pulsar.getNamespaceService(); - doReturn(CompletableFuture.completedFuture(topicNames)).when(nss) - .getListOfPersistentTopics(NamespaceName.get("my-property/my-ns")); - // 7. call recheckTopics to subscribe each added topics above, verify topics number: 10=1+2+3+4 log.debug("recheck topics change"); PatternMultiTopicsConsumerImpl consumer1 = ((PatternMultiTopicsConsumerImpl) consumer); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 986a13e446a8c..d18f9347be162 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -932,9 +932,10 @@ public CompletableFuture subscribeAsync(String topicName, boolean createTo CompletableFuture subscribeResult = new CompletableFuture<>(); - client.getPartitionedTopicMetadata(topicName) - .thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, fullTopicName, metadata.partitions, - createTopicIfDoesNotExist)) + client.getPartitionedTopicMetadata(topicNameInstance.getPartitionedTopicName()) + .thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, + topicNameInstance.getPartitionedTopicName(), + metadata.partitions, createTopicIfDoesNotExist)) .exceptionally(ex1 -> { log.warn("[{}] Failed to get partitioned topic metadata: {}", fullTopicName, ex1.getMessage()); subscribeResult.completeExceptionally(ex1); From 2e689201e9350f9e8f8910cb6fcbb02b10136205 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Fri, 22 Jul 2022 15:44:46 +0800 Subject: [PATCH 2/6] Fix pattern topics change onTopicsAdded. --- .../impl/PatternTopicsConsumerImplTest.java | 34 +++++++++++++++++++ .../client/impl/MultiTopicsConsumerImpl.java | 7 ++-- .../impl/PatternMultiTopicsConsumerImpl.java | 11 ++++-- 3 files changed, 46 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index 299fe1070f480..0839a30a0e301 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -509,6 +509,40 @@ public void testStartEmptyPatternConsumer() throws Exception { producer3.close(); } + @Test(timeOut = testTimeout) + public void testAutoSubscribePatterConsumerFromBrokerWatcher() throws Exception { + String key = "AutoSubscribePatternConsumer"; + String subscriptionName = "my-ex-subscription-" + key; + + Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*"); + Consumer consumer = pulsarClient.newConsumer() + .topicsPattern(pattern) + // Disable automatic discovery. + .patternAutoDiscoveryPeriod(1000) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .receiverQueueSize(4) + .subscribe(); + + + + // 1. create partition + String topicName = "persistent://my-property/my-ns/pattern-topic-1-" + key; + TenantInfoImpl tenantInfo = createDefaultTenantInfo(); + admin.tenants().createTenant("prop", tenantInfo); + admin.topics().createPartitionedTopic(topicName, 4); + + // 2. verify consumer get methods. There is no need to trigger discovery, because the broker will push the + // changes to update(CommandWatchTopicUpdate). + assertSame(pattern, ((PatternMultiTopicsConsumerImpl) consumer).getPattern()); + Awaitility.await().untilAsserted(() -> { + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitions().size(), 4); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getConsumers().size(), 4); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 1); + }); + } + // simulate subscribe a pattern which has 3 topics, but then matched topic added in. @Test(timeOut = testTimeout) public void testAutoSubscribePatternConsumer() throws Exception { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index d18f9347be162..4f8342b56c8d0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -932,10 +932,9 @@ public CompletableFuture subscribeAsync(String topicName, boolean createTo CompletableFuture subscribeResult = new CompletableFuture<>(); - client.getPartitionedTopicMetadata(topicNameInstance.getPartitionedTopicName()) - .thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, - topicNameInstance.getPartitionedTopicName(), - metadata.partitions, createTopicIfDoesNotExist)) + client.getPartitionedTopicMetadata(topicName) + .thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, fullTopicName, metadata.partitions, + createTopicIfDoesNotExist)) .exceptionally(ex1 -> { log.warn("[{}] Failed to get partitioned topic metadata: {}", fullTopicName, ex1.getMessage()); subscribeResult.completeExceptionally(ex1); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java index 79a7c6a3ae62b..34cc8ee6caafa 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java @@ -28,9 +28,11 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; @@ -203,9 +205,14 @@ public CompletableFuture onTopicsAdded(Collection addedTopics) { return addFuture; } + Set addTopicPartitionedName = addedTopics.stream() + .map(addTopicName -> TopicName.get(addTopicName).getPartitionedTopicName()) + .collect(Collectors.toSet()); + List> futures = Lists.newArrayListWithExpectedSize(partitionedTopics.size()); - addedTopics.stream().forEach(topic -> futures.add( - subscribeAsync(topic, false /* createTopicIfDoesNotExist */))); + addTopicPartitionedName.stream().forEach(partitionedTopic -> futures.add( + subscribeAsync(partitionedTopic, + false /* createTopicIfDoesNotExist */))); FutureUtil.waitForAll(futures) .thenAccept(finalFuture -> addFuture.complete(null)) .exceptionally(ex -> { From 278f9b9db3bd3bd0d7ea9a90f4fa498f7ec94483 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Fri, 22 Jul 2022 15:46:21 +0800 Subject: [PATCH 3/6] code format --- .../org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 4f8342b56c8d0..986a13e446a8c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -934,7 +934,7 @@ public CompletableFuture subscribeAsync(String topicName, boolean createTo client.getPartitionedTopicMetadata(topicName) .thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, fullTopicName, metadata.partitions, - createTopicIfDoesNotExist)) + createTopicIfDoesNotExist)) .exceptionally(ex1 -> { log.warn("[{}] Failed to get partitioned topic metadata: {}", fullTopicName, ex1.getMessage()); subscribeResult.completeExceptionally(ex1); From bc3260d68585e72cdab68faf4d859b3accd3a262 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Fri, 22 Jul 2022 15:47:03 +0800 Subject: [PATCH 4/6] format code --- .../pulsar/client/impl/PatternTopicsConsumerImplTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index 0839a30a0e301..989516a3e0640 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -525,8 +525,6 @@ public void testAutoSubscribePatterConsumerFromBrokerWatcher() throws Exception .receiverQueueSize(4) .subscribe(); - - // 1. create partition String topicName = "persistent://my-property/my-ns/pattern-topic-1-" + key; TenantInfoImpl tenantInfo = createDefaultTenantInfo(); From 2d4d36a920be5eae0ead65403b336b85b2f1999f Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Fri, 22 Jul 2022 16:53:48 +0800 Subject: [PATCH 5/6] Close consumer after the test. --- .../pulsar/client/impl/PatternTopicsConsumerImplTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index 989516a3e0640..38f2ca366bb98 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -539,6 +539,8 @@ public void testAutoSubscribePatterConsumerFromBrokerWatcher() throws Exception assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getConsumers().size(), 4); assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 1); }); + + consumer.close(); } // simulate subscribe a pattern which has 3 topics, but then matched topic added in. From bead707b2ea49df1e0f2d981a4530cfa267ca6b4 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Fri, 22 Jul 2022 21:22:07 +0800 Subject: [PATCH 6/6] Update pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java Co-authored-by: Yunze Xu --- .../pulsar/client/impl/PatternMultiTopicsConsumerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java index 34cc8ee6caafa..4433bba15d548 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java @@ -210,7 +210,7 @@ public CompletableFuture onTopicsAdded(Collection addedTopics) { .collect(Collectors.toSet()); List> futures = Lists.newArrayListWithExpectedSize(partitionedTopics.size()); - addTopicPartitionedName.stream().forEach(partitionedTopic -> futures.add( + addTopicPartitionedName.forEach(partitionedTopic -> futures.add( subscribeAsync(partitionedTopic, false /* createTopicIfDoesNotExist */))); FutureUtil.waitForAll(futures)