From 2edb2e1f8c94a131f69826f234ca96ab5398b82f Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 25 May 2021 21:19:17 -0700 Subject: [PATCH 1/5] On multi-topic consumer, we shouldn't keep checking the partitioned metadata --- .../client/impl/MultiTopicsConsumerImpl.java | 7 ++- .../impl/MultiTopicsConsumerImplTest.java | 43 +++++++++++++++++++ 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index df6a830609532..351235f40f818 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -955,7 +955,7 @@ private void doSubscribeTopicPartitions(Schema schema, }) .collect(Collectors.toList()); } else { - boolean isTopicBeingSubscribedForInOtherThread = this.topics.putIfAbsent(topicName, 1) != null; + boolean isTopicBeingSubscribedForInOtherThread = this.topics.putIfAbsent(topicName, 0) != null; if (isTopicBeingSubscribedForInOtherThread) { String errorMessage = String.format("[%s] Failed to subscribe for topic [%s] in topics consumer. " + "Topic is already being subscribed for in other thread.", topic, topicName); @@ -1222,6 +1222,10 @@ public CompletableFuture onTopicsExtended(Collection topicsExtende // subscribe increased partitions for a given topic private CompletableFuture subscribeIncreasedTopicPartitions(String topicName) { CompletableFuture future = new CompletableFuture<>(); + int oldPartitionNumber = topics.get(topicName); + if (oldPartitionNumber == 0) { + return CompletableFuture.completedFuture(null); + } //Drop the disconnected consumers to allow the auto discovery consumers.entrySet().removeIf(e -> TopicName.get(e.getKey()).getPartitionedTopicName().equals(topicName) && !e.getValue().isConnected()); @@ -1230,7 +1234,6 @@ private CompletableFuture subscribeIncreasedTopicPartitions(String topicNa topics.put(topicName, connectedPartitions); client.getPartitionsForTopic(topicName).thenCompose(list -> { - int oldPartitionNumber = topics.get(topicName); int currentPartitionNumber = Long.valueOf(list.stream().filter(t -> TopicName.get(t).isPartitioned()).count()).intValue(); if (log.isDebugEnabled()) { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java index dc70d151e9020..745aa1cbeafa2 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java @@ -20,7 +20,11 @@ import com.google.common.collect.Sets; import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timer; import io.netty.util.concurrent.DefaultThreadFactory; +import lombok.Cleanup; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Messages; @@ -31,6 +35,7 @@ import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.util.netty.EventLoopUtil; +import org.mockito.internal.verification.VerificationModeFactory; import org.testng.annotations.Test; import java.util.Arrays; @@ -63,9 +68,14 @@ public void testGetStats() throws Exception { conf.setStatsIntervalSeconds(100); ThreadFactory threadFactory = new DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon()); + + @Cleanup("shutdown") EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), false, threadFactory); + + @Cleanup("shutdownNow") ExecutorProvider executorProvider = new ExecutorProvider(1, "client-test-stats"); + @Cleanup PulsarClientImpl clientImpl = new PulsarClientImpl(conf, eventLoopGroup); ConsumerConfigurationData consumerConfData = new ConsumerConfigurationData(); @@ -169,4 +179,37 @@ public void testConsumerCleanupOnSubscribeFailure() { verify(clientMock, times(1)).cleanupConsumer(any()); } + @Test + public void testDontCheckForPartitionsUpdatesOnNonPartitionedTopics() throws Exception { + ExecutorProvider executorProvider = mock(ExecutorProvider.class); + ConsumerConfigurationData consumerConfData = new ConsumerConfigurationData<>(); + consumerConfData.setSubscriptionName("subscriptionName"); + consumerConfData.setTopicNames(new HashSet<>(Arrays.asList("a", "b", "c"))); + consumerConfData.setAutoUpdatePartitionsIntervalSeconds(1); + consumerConfData.setAutoUpdatePartitions(true); + + @Cleanup("stop") + Timer timer = new HashedWheelTimer(); + + PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx(); + when(clientMock.timer()).thenReturn(timer); + when(clientMock.preProcessSchemaBeforeSubscribe(any(), any(), any())) + .thenReturn(CompletableFuture.completedFuture(null)); + + // Simulate non partitioned topics + PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(0); + when(clientMock.getPartitionedTopicMetadata(any())).thenReturn(CompletableFuture.completedFuture(metadata)); + CompletableFuture> completeFuture = new CompletableFuture<>(); + + MultiTopicsConsumerImpl impl = new MultiTopicsConsumerImpl<>( + clientMock, consumerConfData, executorProvider, + completeFuture, Schema.BYTES, null, true); + impl.setState(HandlerState.State.Ready); + Thread.sleep(5000); + + // getPartitionedTopicMetadata should have been called only the first time, for each of the 3 topics, + // but not anymore since the topics are not partitioned. + verify(clientMock, times(3)).getPartitionedTopicMetadata(any()); + } + } From 4cf500c754de1005c8fb0f8f66bd6f4098673886 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 26 May 2021 08:54:36 -0700 Subject: [PATCH 2/5] Added NON_PARTITIONED constant --- .../pulsar/client/impl/MultiTopicsConsumerImpl.java | 8 +++++--- .../pulsar/common/partition/PartitionedTopicMetadata.java | 5 +++++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 351235f40f818..8e646814c79e8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -41,6 +41,7 @@ import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.util.CompletableFutureCancellationHandler; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; @@ -916,7 +917,7 @@ private void doSubscribeTopicPartitions(Schema schema, } List>> futureList; - if (numPartitions > 0) { + if (numPartitions != PartitionedTopicMetadata.NON_PARTITIONED) { // Below condition is true if subscribeAsync() has been invoked second time with same // topicName before the first invocation had reached this point. boolean isTopicBeingSubscribedForInOtherThread = this.topics.putIfAbsent(topicName, numPartitions) != null; @@ -955,7 +956,8 @@ private void doSubscribeTopicPartitions(Schema schema, }) .collect(Collectors.toList()); } else { - boolean isTopicBeingSubscribedForInOtherThread = this.topics.putIfAbsent(topicName, 0) != null; + boolean isTopicBeingSubscribedForInOtherThread = this.topics.putIfAbsent(topicName, + PartitionedTopicMetadata.NON_PARTITIONED) != null; if (isTopicBeingSubscribedForInOtherThread) { String errorMessage = String.format("[%s] Failed to subscribe for topic [%s] in topics consumer. " + "Topic is already being subscribed for in other thread.", topic, topicName); @@ -1223,7 +1225,7 @@ public CompletableFuture onTopicsExtended(Collection topicsExtende private CompletableFuture subscribeIncreasedTopicPartitions(String topicName) { CompletableFuture future = new CompletableFuture<>(); int oldPartitionNumber = topics.get(topicName); - if (oldPartitionNumber == 0) { + if (oldPartitionNumber == PartitionedTopicMetadata.NON_PARTITIONED) { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/partition/PartitionedTopicMetadata.java b/pulsar-common/src/main/java/org/apache/pulsar/common/partition/PartitionedTopicMetadata.java index 1f023de460232..024ca0a32cef0 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/partition/PartitionedTopicMetadata.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/partition/PartitionedTopicMetadata.java @@ -34,4 +34,9 @@ public PartitionedTopicMetadata(int partitions) { this.partitions = partitions; } + /** + * A topic with '0' partitions is treated like non-partitioned topic. + */ + public static final int NON_PARTITIONED = 0; + } From bb3c84a7b2880c6f1353f37b22e732589db4cd1a Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 26 May 2021 09:40:47 -0700 Subject: [PATCH 3/5] Removed assertion that is now invalid --- .../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 8e646814c79e8..a11f8aa78f12d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -987,11 +987,6 @@ private void doSubscribeTopicPartitions(Schema schema, if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) { setMaxReceiverQueueSize(allTopicPartitionsNumber.get()); } - int numTopics = this.topics.values().stream().mapToInt(Integer::intValue).sum(); - int currentAllTopicsPartitionsNumber = allTopicPartitionsNumber.get(); - checkState(currentAllTopicsPartitionsNumber == numTopics, - "allTopicPartitionsNumber " + currentAllTopicsPartitionsNumber - + " not equals expected: " + numTopics); // We have successfully created new consumers, so we can start receiving messages for them startReceivingMessages( From 1634ad70e905b8e1c8db5890c4db4494ee546bd1 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Fri, 28 May 2021 12:30:58 -0700 Subject: [PATCH 4/5] Fixed handling of deleted partitioned topic --- .../impl/PatternTopicsConsumerImplTest.java | 44 ++++----- .../client/impl/TopicsConsumerImplTest.java | 18 ++-- .../client/impl/MultiTopicsConsumerImpl.java | 89 +++++++++---------- .../impl/PatternMultiTopicsConsumerImpl.java | 13 ++- 4 files changed, 82 insertions(+), 82 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index 55649aba13a8f..bf6edf434c693 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -183,12 +183,12 @@ public void testBinaryProtoToGetTopicsOfNamespacePersistent() throws Exception { // 4. verify consumer get methods, to get right number of partitions and topics. assertSame(pattern, ((PatternMultiTopicsConsumerImpl) consumer).getPattern()); - List topics = ((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics(); + List topics = ((PatternMultiTopicsConsumerImpl) consumer).getPartitions(); List> consumers = ((PatternMultiTopicsConsumerImpl) consumer).getConsumers(); assertEquals(topics.size(), 6); assertEquals(consumers.size(), 6); - assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getTopics().size(), 3); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 2); topics.forEach(topic -> log.debug("topic: {}", topic)); consumers.forEach(c -> log.debug("consumer: {}", c.getTopic())); @@ -196,7 +196,7 @@ public void testBinaryProtoToGetTopicsOfNamespacePersistent() throws Exception { IntStream.range(0, topics.size()).forEach(index -> assertEquals(consumers.get(index).getTopic(), topics.get(index))); - ((PatternMultiTopicsConsumerImpl) consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic)); + ((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().forEach(topic -> log.debug("getTopics topic: {}", topic)); // 5. produce data for (int i = 0; i < totalMessages / 3; i++) { @@ -286,12 +286,12 @@ public void testBinaryProtoToGetTopicsOfNamespaceNonPersistent() throws Exceptio // 4. verify consumer get methods, to get right number of partitions and topics. assertSame(pattern, ((PatternMultiTopicsConsumerImpl) consumer).getPattern()); - List topics = ((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics(); + List topics = ((PatternMultiTopicsConsumerImpl) consumer).getPartitions(); List> consumers = ((PatternMultiTopicsConsumerImpl) consumer).getConsumers(); assertEquals(topics.size(), 1); assertEquals(consumers.size(), 1); - assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getTopics().size(), 1); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 0); topics.forEach(topic -> log.debug("topic: {}", topic)); consumers.forEach(c -> log.debug("consumer: {}", c.getTopic())); @@ -299,7 +299,7 @@ public void testBinaryProtoToGetTopicsOfNamespaceNonPersistent() throws Exceptio IntStream.range(0, topics.size()).forEach(index -> assertEquals(consumers.get(index).getTopic(), topics.get(index))); - ((PatternMultiTopicsConsumerImpl) consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic)); + ((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().forEach(topic -> log.debug("getTopics topic: {}", topic)); // 5. produce data for (int i = 0; i < totalMessages / 4; i++) { @@ -377,12 +377,12 @@ public void testBinaryProtoToGetTopicsOfNamespaceAll() throws Exception { // 4. verify consumer get methods, to get right number of partitions and topics. assertSame(pattern, ((PatternMultiTopicsConsumerImpl) consumer).getPattern()); - List topics = ((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics(); + List topics = ((PatternMultiTopicsConsumerImpl) consumer).getPartitions(); List> consumers = ((PatternMultiTopicsConsumerImpl) consumer).getConsumers(); assertEquals(topics.size(), 7); assertEquals(consumers.size(), 7); - assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getTopics().size(), 4); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 2); topics.forEach(topic -> log.debug("topic: {}", topic)); consumers.forEach(c -> log.debug("consumer: {}", c.getTopic())); @@ -390,7 +390,7 @@ public void testBinaryProtoToGetTopicsOfNamespaceAll() throws Exception { IntStream.range(0, topics.size()).forEach(index -> assertEquals(consumers.get(index).getTopic(), topics.get(index))); - ((PatternMultiTopicsConsumerImpl) consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic)); + ((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().forEach(topic -> log.debug("getTopics topic: {}", topic)); // 5. produce data for (int i = 0; i < totalMessages / 4; i++) { @@ -508,9 +508,9 @@ public void testStartEmptyPatternConsumer() throws Exception { // 3. verify consumer get methods, to get 5 number of partitions and topics. assertSame(pattern, ((PatternMultiTopicsConsumerImpl) consumer).getPattern()); - assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 5); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitions().size(), 5); assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getConsumers().size(), 5); - assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getTopics().size(), 2); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 2); // 4. create producer String messagePredicate = "my-message-" + key + "-"; @@ -537,9 +537,9 @@ public void testStartEmptyPatternConsumer() throws Exception { // 6. verify consumer get methods, to get number of partitions and topics, value 6=1+2+3. assertSame(pattern, ((PatternMultiTopicsConsumerImpl) consumer).getPattern()); - assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 6); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitions().size(), 6); assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getConsumers().size(), 6); - assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getTopics().size(), 3); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 2); // 7. produce data @@ -614,9 +614,9 @@ public void testAutoSubscribePatternConsumer() throws Exception { // 4. verify consumer get methods, to get 6 number of partitions and topics: 6=1+2+3 assertSame(pattern, ((PatternMultiTopicsConsumerImpl) consumer).getPattern()); - assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 6); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitions().size(), 6); assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getConsumers().size(), 6); - assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getTopics().size(), 3); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 2); // 5. produce data to topic 1,2,3; verify should receive all the message for (int i = 0; i < totalMessages / 3; i++) { @@ -649,9 +649,9 @@ public void testAutoSubscribePatternConsumer() throws Exception { PatternMultiTopicsConsumerImpl consumer1 = ((PatternMultiTopicsConsumerImpl) consumer); consumer1.run(consumer1.getRecheckPatternTimeout()); Thread.sleep(100); - assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 10); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitions().size(), 10); assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getConsumers().size(), 10); - assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getTopics().size(), 4); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 3); // 8. produce data to topic3 and topic4, verify should receive all the message for (int i = 0; i < totalMessages / 2; i++) { @@ -723,9 +723,9 @@ public void testAutoUnsubscribePatternConsumer() throws Exception { // 4. verify consumer get methods, to get 0 number of partitions and topics: 6=1+2+3 assertSame(pattern, ((PatternMultiTopicsConsumerImpl) consumer).getPattern()); - assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 6); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitions().size(), 6); assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getConsumers().size(), 6); - assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getTopics().size(), 3); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 2); // 5. produce data to topic 1,2,3; verify should receive all the message for (int i = 0; i < totalMessages / 3; i++) { @@ -757,9 +757,9 @@ public void testAutoUnsubscribePatternConsumer() throws Exception { PatternMultiTopicsConsumerImpl consumer1 = ((PatternMultiTopicsConsumerImpl) consumer); consumer1.run(consumer1.getRecheckPatternTimeout()); Thread.sleep(100); - assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 2); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitions().size(), 2); assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getConsumers().size(), 2); - assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getTopics().size(), 1); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 1); // 8. produce data to topic2, verify should receive all the message for (int i = 0; i < totalMessages; i++) { @@ -808,7 +808,7 @@ public void testTopicDeletion() throws Exception { // 4. verify consumer get methods assertSame(consumerImpl.getPattern(), pattern); - assertEquals(consumerImpl.getTopics().size(), 2); + assertEquals(consumerImpl.getPartitionedTopics().size(), 0); producer1.send("msg-1"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index cb89d33134dae..898e534ed870a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -148,7 +148,7 @@ public void testGetConsumersAndGetTopics() throws Exception { assertTrue(consumer instanceof MultiTopicsConsumerImpl); assertTrue(consumer.getTopic().startsWith(MultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX)); - List topics = ((MultiTopicsConsumerImpl) consumer).getPartitionedTopics(); + List topics = ((MultiTopicsConsumerImpl) consumer).getPartitions(); List> consumers = ((MultiTopicsConsumerImpl) consumer).getConsumers(); topics.forEach(topic -> log.info("topic: {}", topic)); @@ -157,7 +157,7 @@ public void testGetConsumersAndGetTopics() throws Exception { IntStream.range(0, 6).forEach(index -> assertEquals(consumers.get(index).getTopic(), topics.get(index))); - assertEquals(((MultiTopicsConsumerImpl) consumer).getTopics().size(), 3); + assertEquals(((MultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 2); consumer.unsubscribe(); consumer.close(); @@ -563,12 +563,12 @@ public void testSubscribeUnsubscribeSingleTopic() throws Exception { assertEquals(messageSet, totalMessages * 2 / 3); // 7. use getter to verify internal topics number after un-subscribe topic3 - List topics = ((MultiTopicsConsumerImpl) consumer).getPartitionedTopics(); + List topics = ((MultiTopicsConsumerImpl) consumer).getPartitions(); List> consumers = ((MultiTopicsConsumerImpl) consumer).getConsumers(); assertEquals(topics.size(), 3); assertEquals(consumers.size(), 3); - assertEquals(((MultiTopicsConsumerImpl) consumer).getTopics().size(), 2); + assertEquals(((MultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 1); // 8. re-subscribe topic3 CompletableFuture subFuture = ((MultiTopicsConsumerImpl)consumer).subscribeAsync(topicName3, true); @@ -594,12 +594,12 @@ public void testSubscribeUnsubscribeSingleTopic() throws Exception { assertEquals(messageSet, totalMessages); // 11. use getter to verify internal topics number after subscribe topic3 - topics = ((MultiTopicsConsumerImpl) consumer).getPartitionedTopics(); + topics = ((MultiTopicsConsumerImpl) consumer).getPartitions(); consumers = ((MultiTopicsConsumerImpl) consumer).getConsumers(); assertEquals(topics.size(), 6); assertEquals(consumers.size(), 6); - assertEquals(((MultiTopicsConsumerImpl) consumer).getTopics().size(), 3); + assertEquals(((MultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 2); consumer.unsubscribe(); consumer.close(); @@ -1181,20 +1181,20 @@ public void testAutoDiscoverMultiTopicsPartitions() throws Exception { .subscribe(); Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 3); - Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 3); + Assert.assertEquals(consumer.getConsumers().size(), 3); admin.topics().deletePartitionedTopic(topicName, true); consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout()); Awaitility.await().untilAsserted(() -> { Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 0); - Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 0); + Assert.assertEquals(consumer.getConsumers().size(), 0); }); admin.topics().createPartitionedTopic(topicName, 7); consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout()); Awaitility.await().untilAsserted(() -> { Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 7); - Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 7); + Assert.assertEquals(consumer.getConsumers().size(), 7); }); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index a11f8aa78f12d..a91252801e5a7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -53,6 +53,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -81,7 +82,7 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { private final ConcurrentHashMap> consumers; // Map , store partition number for each topic - protected final ConcurrentHashMap topics; + protected final ConcurrentHashMap partitionedTopics; // Queue of partition consumers on which we have stopped calling receiveAsync() because the // shared incoming queue was full @@ -142,7 +143,7 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { checkArgument(conf.getReceiverQueueSize() > 0, "Receiver queue size needs to be greater than 0 for Topics Consumer"); - this.topics = new ConcurrentHashMap<>(); + this.partitionedTopics = new ConcurrentHashMap<>(); this.consumers = new ConcurrentHashMap<>(); this.pausedConsumers = new ConcurrentLinkedQueue<>(); this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; @@ -806,7 +807,7 @@ private String getFullTopicName(String topic) { private void removeTopic(String topic) { String fullTopicName = getFullTopicName(topic); if (fullTopicName != null) { - topics.remove(topic); + partitionedTopics.remove(topic); } } @@ -818,7 +819,8 @@ public CompletableFuture subscribeAsync(String topicName, boolean createTo new PulsarClientException.AlreadyClosedException("Topic name not valid")); } String fullTopicName = topicNameInstance.toString(); - if (topics.containsKey(fullTopicName) || topics.containsKey(topicNameInstance.getPartitionedTopicName())) { + if (consumers.containsKey(fullTopicName) + || partitionedTopics.containsKey(topicNameInstance.getPartitionedTopicName())) { return FutureUtil.failedFuture( new PulsarClientException.AlreadyClosedException("Already subscribed to " + topicName)); } @@ -882,7 +884,8 @@ CompletableFuture subscribeAsync(String topicName, int numberPartitions) { new PulsarClientException.AlreadyClosedException("Topic name not valid")); } String fullTopicName = topicNameInstance.toString(); - if (topics.containsKey(fullTopicName) || topics.containsKey(topicNameInstance.getPartitionedTopicName())) { + if (consumers.containsKey(fullTopicName) + || partitionedTopics.containsKey(topicNameInstance.getPartitionedTopicName())) { return FutureUtil.failedFuture( new PulsarClientException.AlreadyClosedException("Already subscribed to " + topicName)); } @@ -920,7 +923,8 @@ private void doSubscribeTopicPartitions(Schema schema, if (numPartitions != PartitionedTopicMetadata.NON_PARTITIONED) { // Below condition is true if subscribeAsync() has been invoked second time with same // topicName before the first invocation had reached this point. - boolean isTopicBeingSubscribedForInOtherThread = this.topics.putIfAbsent(topicName, numPartitions) != null; + boolean isTopicBeingSubscribedForInOtherThread = + partitionedTopics.putIfAbsent(topicName, numPartitions) != null; if (isTopicBeingSubscribedForInOtherThread) { String errorMessage = String.format("[%s] Failed to subscribe for topic [%s] in topics consumer. " + "Topic is already being subscribed for in other thread.", topic, topicName); @@ -956,15 +960,6 @@ private void doSubscribeTopicPartitions(Schema schema, }) .collect(Collectors.toList()); } else { - boolean isTopicBeingSubscribedForInOtherThread = this.topics.putIfAbsent(topicName, - PartitionedTopicMetadata.NON_PARTITIONED) != null; - if (isTopicBeingSubscribedForInOtherThread) { - String errorMessage = String.format("[%s] Failed to subscribe for topic [%s] in topics consumer. " - + "Topic is already being subscribed for in other thread.", topic, topicName); - log.warn(errorMessage); - subscribeResult.completeExceptionally(new PulsarClientException(errorMessage)); - return; - } allTopicPartitionsNumber.incrementAndGet(); CompletableFuture> subFuture = new CompletableFuture<>(); @@ -1141,12 +1136,12 @@ public CompletableFuture removeConsumerAsync(String topicName) { // get topics name - public List getTopics() { - return topics.keySet().stream().collect(Collectors.toList()); + public List getPartitionedTopics() { + return partitionedTopics.keySet().stream().collect(Collectors.toList()); } // get partitioned topics name - public List getPartitionedTopics() { + public List getPartitions() { return consumers.keySet().stream().collect(Collectors.toList()); } @@ -1157,7 +1152,7 @@ public List> getConsumers() { // get all partitions that in the topics map int getPartitionsOfTheTopicMap() { - return topics.values().stream().mapToInt(Integer::intValue).sum(); + return partitionedTopics.values().stream().mapToInt(Integer::intValue).sum(); } @Override @@ -1218,20 +1213,11 @@ public CompletableFuture onTopicsExtended(Collection topicsExtende // subscribe increased partitions for a given topic private CompletableFuture subscribeIncreasedTopicPartitions(String topicName) { - CompletableFuture future = new CompletableFuture<>(); - int oldPartitionNumber = topics.get(topicName); - if (oldPartitionNumber == PartitionedTopicMetadata.NON_PARTITIONED) { - return CompletableFuture.completedFuture(null); - } - - //Drop the disconnected consumers to allow the auto discovery - consumers.entrySet().removeIf(e -> TopicName.get(e.getKey()).getPartitionedTopicName().equals(topicName) && !e.getValue().isConnected()); - final int connectedPartitions = Long.valueOf(consumers.entrySet().stream().filter(e -> e.getKey().contains(topicName)).count()).intValue(); - allTopicPartitionsNumber.set(allTopicPartitionsNumber.get() - (topics.get(topicName) - connectedPartitions)); - topics.put(topicName, connectedPartitions); + int oldPartitionNumber = partitionedTopics.get(topicName); - client.getPartitionsForTopic(topicName).thenCompose(list -> { - int currentPartitionNumber = Long.valueOf(list.stream().filter(t -> TopicName.get(t).isPartitioned()).count()).intValue(); + return client.getPartitionsForTopic(topicName).thenCompose(list -> { + int currentPartitionNumber = Long.valueOf(list.stream() + .filter(t -> TopicName.get(t).isPartitioned()).count()).intValue(); if (log.isDebugEnabled()) { log.debug("[{}] partitions number. old: {}, new: {}", @@ -1240,11 +1226,28 @@ private CompletableFuture subscribeIncreasedTopicPartitions(String topicNa if (oldPartitionNumber == currentPartitionNumber) { // topic partition number not changed - future.complete(null); - return future; + return CompletableFuture.completedFuture(null); + } else if (currentPartitionNumber == PartitionedTopicMetadata.NON_PARTITIONED) { + // The topic was initially partitioned but then it was deleted. We keep it in the topics + partitionedTopics.put(topicName, 0); + + allTopicPartitionsNumber.addAndGet(-oldPartitionNumber); + List> futures = new ArrayList<>(); + for (Iterator>> it = consumers.entrySet().iterator(); it.hasNext();) { + Map.Entry> e = it.next(); + String partitionedTopicName = TopicName.get(e.getKey()).getPartitionedTopicName(); + + // Remove the consumers that belong to the deleted partitioned topic + if (partitionedTopicName.equals(topicName)) { + futures.add(e.getValue().closeAsync()); + consumers.remove(e.getKey()); + } + } + + return FutureUtil.waitForAll(futures); } else if (oldPartitionNumber < currentPartitionNumber) { allTopicPartitionsNumber.addAndGet(currentPartitionNumber - oldPartitionNumber); - topics.put(topicName, currentPartitionNumber); + partitionedTopics.put(topicName, currentPartitionNumber); List newPartitions = list.subList(oldPartitionNumber, currentPartitionNumber); // subscribe new added partitions List>> futureList = newPartitions @@ -1273,29 +1276,19 @@ private CompletableFuture subscribeIncreasedTopicPartitions(String topicNa .collect(Collectors.toList()); // wait for all partitions subscribe future complete, then startReceivingMessages - FutureUtil.waitForAll(futureList) + return FutureUtil.waitForAll(futureList) .thenAccept(finalFuture -> { List> newConsumerList = newPartitions.stream() .map(partitionTopic -> consumers.get(partitionTopic)) .collect(Collectors.toList()); startReceivingMessages(newConsumerList); - future.complete(null); - }) - .exceptionally(ex -> { - log.warn("[{}] Failed to subscribe {} partition: {} - {} : {}", - topic, topicName, oldPartitionNumber, currentPartitionNumber, ex); - future.completeExceptionally(ex); - return null; }); } else { log.error("[{}] not support shrink topic partitions. old: {}, new: {}", topicName, oldPartitionNumber, currentPartitionNumber); - future.completeExceptionally(new NotSupportedException("not support shrink topic partitions")); + return FutureUtil.failedFuture(new NotSupportedException("not support shrink topic partitions")); } - return future; }); - - return future; } private TimerTask partitionsAutoUpdateTimerTask = new TimerTask() { @@ -1311,7 +1304,7 @@ public void run(Timeout timeout) throws Exception { // if last auto update not completed yet, do nothing. if (partitionsAutoUpdateFuture == null || partitionsAutoUpdateFuture.isDone()) { - partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(topics.keySet()); + partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(partitionedTopics.keySet()); } // schedule the next re-check task diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java index 7769d707f1ff4..2f946af712bfb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java @@ -92,7 +92,14 @@ public void run(Timeout timeout) throws Exception { } List newTopics = PulsarClientImpl.topicsPatternFilter(topics, topicsPattern); - List oldTopics = PatternMultiTopicsConsumerImpl.this.getTopics(); + List oldTopics = Lists.newArrayList(); + oldTopics.addAll(getPartitionedTopics()); + getPartitions().forEach(p -> { + TopicName t = TopicName.get(p); + if (!t.isPartitioned() || !oldTopics.contains(t.getPartitionedTopicName())) { + oldTopics.add(p); + } + }); futures.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics, oldTopics))); futures.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics, newTopics))); @@ -131,7 +138,7 @@ public CompletableFuture onTopicsRemoved(Collection removedTopics) return removeFuture; } - List> futures = Lists.newArrayListWithExpectedSize(topics.size()); + List> futures = Lists.newArrayListWithExpectedSize(partitionedTopics.size()); removedTopics.stream().forEach(topic -> futures.add(removeConsumerAsync(topic))); FutureUtil.waitForAll(futures) .thenAccept(finalFuture -> removeFuture.complete(null)) @@ -152,7 +159,7 @@ public CompletableFuture onTopicsAdded(Collection addedTopics) { return addFuture; } - List> futures = Lists.newArrayListWithExpectedSize(topics.size()); + List> futures = Lists.newArrayListWithExpectedSize(partitionedTopics.size()); addedTopics.stream().forEach(topic -> futures.add(subscribeAsync(topic, false /* createTopicIfDoesNotExist */))); FutureUtil.waitForAll(futures) .thenAccept(finalFuture -> addFuture.complete(null)) From ac6dfa0c61b2f4ca4b681de5e9534594ae7c962d Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Fri, 28 May 2021 16:59:17 -0700 Subject: [PATCH 5/5] Fixed re-subscribing same topic --- .../client/impl/MultiTopicsConsumerImpl.java | 30 +++++++++++++------ 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index a91252801e5a7..63027a584c2d8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -963,16 +963,28 @@ private void doSubscribeTopicPartitions(Schema schema, allTopicPartitionsNumber.incrementAndGet(); CompletableFuture> subFuture = new CompletableFuture<>(); - ConsumerImpl newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig, - client.externalExecutorProvider(), -1, - true, subFuture, null, schema, interceptors, - createIfDoesNotExist); - synchronized (pauseMutex) { - if (paused) { - newConsumer.pause(); + + consumers.compute(topicName, (key, existingValue) -> { + if (existingValue != null) { + String errorMessage = String.format("[%s] Failed to subscribe for topic [%s] in topics consumer. " + + "Topic is already being subscribed for in other thread.", topic, topicName); + log.warn(errorMessage); + subscribeResult.completeExceptionally(new PulsarClientException(errorMessage)); + return existingValue; + } else { + ConsumerImpl newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig, + client.externalExecutorProvider(), -1, + true, subFuture, null, schema, interceptors, + createIfDoesNotExist); + + synchronized (pauseMutex) { + if (paused) { + newConsumer.pause(); + } + } + return newConsumer; } - consumers.putIfAbsent(newConsumer.getTopic(), newConsumer); - } + }); futureList = Collections.singletonList(subFuture); }