From 449e8874fee31d9e4aa8274271531bd716d8a3ee Mon Sep 17 00:00:00 2001 From: WangZhenjiang Date: Wed, 21 Jan 2026 21:10:55 +0800 Subject: [PATCH 1/6] [improve][broker]Reduce unnecessary creation of retry topics and DLQ topics --- .../pulsar/client/impl/ConsumerBuilderImpl.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 11bf617e2fdc0..1a145e9af7b30 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -175,8 +175,18 @@ public CompletableFuture> subscribeAsync() { DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy(); if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic()) || StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) { - CompletableFuture retryLetterTopicMetadata = checkDlqAlreadyExists(oldRetryLetterTopic); - CompletableFuture deadLetterTopicMetadata = checkDlqAlreadyExists(oldDeadLetterTopic); + CompletableFuture retryLetterTopicMetadata; + if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())) { + retryLetterTopicMetadata = checkDlqAlreadyExists(oldRetryLetterTopic); + } else { + retryLetterTopicMetadata = CompletableFuture.completedFuture(false); + } + CompletableFuture deadLetterTopicMetadata; + if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) { + deadLetterTopicMetadata = checkDlqAlreadyExists(oldDeadLetterTopic); + } else { + deadLetterTopicMetadata = CompletableFuture.completedFuture(false); + } applyDLQConfig = CompletableFuture.allOf(retryLetterTopicMetadata, deadLetterTopicMetadata) .thenAccept(__ -> { String retryLetterTopic = RetryMessageUtil.getRetryTopic(topicFirst.toString(), @@ -197,6 +207,7 @@ public CompletableFuture> subscribeAsync() { .build()); } else { if (StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())) { + conf.getDeadLetterPolicy().setRetryLetterTopic(retryLetterTopic); } if (StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) { From cd87b2d1e5969a36bd13e714c568fcc4a231ba63 Mon Sep 17 00:00:00 2001 From: WangZhenjiang Date: Wed, 21 Jan 2026 21:25:31 +0800 Subject: [PATCH 2/6] [improve][broker]Reduce unnecessary creation of retry topics and DLQ topics --- .../java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 1a145e9af7b30..008a9b554e984 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -207,7 +207,6 @@ public CompletableFuture> subscribeAsync() { .build()); } else { if (StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())) { - conf.getDeadLetterPolicy().setRetryLetterTopic(retryLetterTopic); } if (StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) { From 5783a0878dc2e2f59e412e5b7d9a49fa7d420125 Mon Sep 17 00:00:00 2001 From: WangZhenjiang Date: Thu, 22 Jan 2026 21:03:27 +0800 Subject: [PATCH 3/6] [improve][broker]Reduce unnecessary creation of retry topics and DLQ topics --- conf/standalone.conf | 2 +- .../client/api/DeadLetterTopicTest.java | 60 +++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/conf/standalone.conf b/conf/standalone.conf index 7f640a94f3d4b..f3d0697478f77 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -1306,7 +1306,7 @@ gcWaitTime=300000 allowAutoTopicCreation=true # The type of topic that is allowed to be automatically created.(partitioned/non-partitioned) -allowAutoTopicCreationType=non-partitioned +allowAutoTopicCreationType=partitioned # If 'allowAutoTopicCreation' is true and the name of the topic contains 'cluster', # the topic cannot be automatically created. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index df7e42df80bd4..39b54e2dda00a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -49,6 +50,7 @@ import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.util.RetryMessageUtil; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.awaitility.Awaitility; import org.slf4j.Logger; @@ -1608,4 +1610,62 @@ public void sendDeadLetterTopicWithMismatchSchemaProducer() throws Exception { consumer.close(); deadLetterConsumer.close(); } + @Test + public void testLimitAutoCreateDefaultRetryAndDLQTopic() throws Exception { + pulsar.getConfiguration().setAllowAutoTopicCreation(true); + pulsar.getConfiguration().setDefaultNumPartitions(2); + + String topic = "persistent://my-property/my-ns/my-topic"; + String myRetryTopic = "persistent://my-property/my-ns/my-retry-topic"; + String myDLQTopic = "persistent://my-property/my-ns/my-dlq-topic"; + + String defaultRetryTopic = "persistent://my-property/my-ns/sub-RETRY"; + String defaultDLQTopic = "persistent://my-property/my-ns/sub-DLQ"; + + + Consumer consumer1 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("sub") + .subscriptionType(SubscriptionType.Shared) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .enableRetry(true) + .deadLetterPolicy( + DeadLetterPolicy.builder() + .retryLetterTopic(myRetryTopic) + .maxRedeliverCount(1).build()) + .subscribe(); + + assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() + .partitionedTopicExists(TopicName.get(defaultRetryTopic))); + assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() + .partitionedTopicExists(TopicName.get(defaultDLQTopic))); + admin.topics().delete(defaultDLQTopic); + assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() + .partitionedTopicExists(TopicName.get(defaultDLQTopic))); + consumer1.close(); + + + Consumer consumer2 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("sub") + .subscriptionType(SubscriptionType.Shared) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .enableRetry(true) + .deadLetterPolicy( + DeadLetterPolicy.builder() + .deadLetterTopic(myDLQTopic) + .maxRedeliverCount(1).build()) + .subscribe(); + + assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() + .partitionedTopicExists(TopicName.get(defaultDLQTopic))); + assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() + .partitionedTopicExists(TopicName.get(defaultRetryTopic))); + + consumer2.close(); + + + } + + } From f19246e43eec5b3377f11b84e0196b000df7893d Mon Sep 17 00:00:00 2001 From: WangZhenjiang Date: Thu, 22 Jan 2026 22:25:35 +0800 Subject: [PATCH 4/6] [improve][broker]Reduce unnecessary creation of retry topics and DLQ topics --- .../client/api/DeadLetterTopicTest.java | 142 +++++++++++------- 1 file changed, 86 insertions(+), 56 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index 39b54e2dda00a..8cbc88e681166 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -19,8 +19,16 @@ package org.apache.pulsar.client.api; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -49,8 +57,11 @@ import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.util.RetryMessageUtil; -import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.awaitility.Awaitility; import org.slf4j.Logger; @@ -1610,61 +1621,80 @@ public void sendDeadLetterTopicWithMismatchSchemaProducer() throws Exception { consumer.close(); deadLetterConsumer.close(); } - @Test - public void testLimitAutoCreateDefaultRetryAndDLQTopic() throws Exception { - pulsar.getConfiguration().setAllowAutoTopicCreation(true); - pulsar.getConfiguration().setDefaultNumPartitions(2); - - String topic = "persistent://my-property/my-ns/my-topic"; - String myRetryTopic = "persistent://my-property/my-ns/my-retry-topic"; - String myDLQTopic = "persistent://my-property/my-ns/my-dlq-topic"; - - String defaultRetryTopic = "persistent://my-property/my-ns/sub-RETRY"; - String defaultDLQTopic = "persistent://my-property/my-ns/sub-DLQ"; - - - Consumer consumer1 = pulsarClient.newConsumer() - .topic(topic) - .subscriptionName("sub") - .subscriptionType(SubscriptionType.Shared) - .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .enableRetry(true) - .deadLetterPolicy( - DeadLetterPolicy.builder() - .retryLetterTopic(myRetryTopic) - .maxRedeliverCount(1).build()) - .subscribe(); - - assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() - .partitionedTopicExists(TopicName.get(defaultRetryTopic))); - assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() - .partitionedTopicExists(TopicName.get(defaultDLQTopic))); - admin.topics().delete(defaultDLQTopic); - assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() - .partitionedTopicExists(TopicName.get(defaultDLQTopic))); - consumer1.close(); - - - Consumer consumer2 = pulsarClient.newConsumer() - .topic(topic) - .subscriptionName("sub") - .subscriptionType(SubscriptionType.Shared) - .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .enableRetry(true) - .deadLetterPolicy( - DeadLetterPolicy.builder() - .deadLetterTopic(myDLQTopic) - .maxRedeliverCount(1).build()) - .subscribe(); - - assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() - .partitionedTopicExists(TopicName.get(defaultDLQTopic))); - assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() - .partitionedTopicExists(TopicName.get(defaultRetryTopic))); - - consumer2.close(); - + @Test + public void testCheckUnnecessaryGetPartitionedTopicMetadataWhenUseRetryAndDQL() { + PulsarClientImpl client = mock(PulsarClientImpl.class); + ClientConfigurationData clientConf = new ClientConfigurationData(); + when(client.getConfiguration()).thenReturn(clientConf); + when(client.getPartitionedTopicMetadata(anyString(), anyBoolean(), anyBoolean())) + .thenReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata(0))); + when(client.subscribeAsync(any(ConsumerConfigurationData.class), any(), any())) + .thenReturn(CompletableFuture.completedFuture(mock(Consumer.class))); + + // Case 1: DeadLetterPolicy is null + ConsumerBuilderImpl ConsumerBuilder1 = new ConsumerBuilderImpl<>(client, Schema.BYTES); + ConsumerBuilder1.topic("persistent://public/default/test"); + ConsumerBuilder1.subscriptionName("sub"); + ConsumerBuilder1.enableRetry(true); + ConsumerBuilder1.subscribeAsync(); + + verify(client, times(1)).getPartitionedTopicMetadata( + eq("persistent://public/default/sub-RETRY"), anyBoolean(), anyBoolean()); + verify(client, times(1)).getPartitionedTopicMetadata( + eq("persistent://public/default/sub-DLQ"), anyBoolean(), anyBoolean()); + + clearInvocations(client); + + // Case 2: DeadLetterPolicy with custom Retry topic + ConsumerBuilderImpl ConsumerBuilder2 = new ConsumerBuilderImpl<>(client, Schema.BYTES); + ConsumerBuilder2.topic("persistent://public/default/test"); + ConsumerBuilder2.subscriptionName("sub"); + ConsumerBuilder2.enableRetry(true); + ConsumerBuilder2.deadLetterPolicy(DeadLetterPolicy.builder() + .maxRedeliverCount(10) + .retryLetterTopic("persistent://public/default/topic-retry") + .build()); + ConsumerBuilder2.subscribeAsync(); + + verify(client, times(0)).getPartitionedTopicMetadata( + eq("persistent://public/default/sub-RETRY"), anyBoolean(), anyBoolean()); + verify(client, times(1)).getPartitionedTopicMetadata( + eq("persistent://public/default/sub-DLQ"), anyBoolean(), anyBoolean()); + + clearInvocations(client); + + // Case 3: DeadLetterPolicy with custom DLQ topic + ConsumerBuilderImpl ConsumerBuilder3 = new ConsumerBuilderImpl<>(client, Schema.BYTES); + ConsumerBuilder3.topic("persistent://public/default/test"); + ConsumerBuilder3.subscriptionName("sub"); + ConsumerBuilder3.enableRetry(true); + ConsumerBuilder3.deadLetterPolicy(DeadLetterPolicy.builder() + .maxRedeliverCount(10) + .deadLetterTopic("persistent://public/default/topic-dlq") + .build()); + ConsumerBuilder3.subscribeAsync(); + + verify(client, times(1)).getPartitionedTopicMetadata( + eq("persistent://public/default/sub-RETRY"), anyBoolean(), anyBoolean()); + verify(client, times(0)).getPartitionedTopicMetadata( + eq("persistent://public/default/sub-DLQ"), anyBoolean(), anyBoolean()); + + clearInvocations(client); + + // Case 4: DeadLetterPolicy with both custom topics + ConsumerBuilderImpl ConsumerBuilder4 = new ConsumerBuilderImpl<>(client, Schema.BYTES); + ConsumerBuilder4.topic("persistent://public/default/test"); + ConsumerBuilder4.subscriptionName("sub"); + ConsumerBuilder4.enableRetry(true); + ConsumerBuilder4.deadLetterPolicy(DeadLetterPolicy.builder() + .maxRedeliverCount(10) + .retryLetterTopic("custom-retry") + .deadLetterTopic("custom-dlq") + .build()); + ConsumerBuilder4.subscribeAsync(); + + verify(client, times(0)).getPartitionedTopicMetadata(anyString(), anyBoolean(), anyBoolean()); } From 7f8cdf54eed7c43fcc30d1f0b584f63e43287155 Mon Sep 17 00:00:00 2001 From: WangZhenjiang Date: Thu, 22 Jan 2026 22:26:40 +0800 Subject: [PATCH 5/6] [improve][broker]Reduce unnecessary creation of retry topics and DLQ topics --- conf/standalone.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conf/standalone.conf b/conf/standalone.conf index f3d0697478f77..7f640a94f3d4b 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -1306,7 +1306,7 @@ gcWaitTime=300000 allowAutoTopicCreation=true # The type of topic that is allowed to be automatically created.(partitioned/non-partitioned) -allowAutoTopicCreationType=partitioned +allowAutoTopicCreationType=non-partitioned # If 'allowAutoTopicCreation' is true and the name of the topic contains 'cluster', # the topic cannot be automatically created. From 770385c026505d0dc389cce282890fca62da31b2 Mon Sep 17 00:00:00 2001 From: WangZhenjiang Date: Mon, 26 Jan 2026 16:21:16 +0800 Subject: [PATCH 6/6] [improve][broker]Reduce unnecessary creation of retry topics and DLQ topics --- .../client/api/DeadLetterTopicTest.java | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index 8cbc88e681166..832e814e4eed9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -1633,11 +1633,11 @@ public void testCheckUnnecessaryGetPartitionedTopicMetadataWhenUseRetryAndDQL() .thenReturn(CompletableFuture.completedFuture(mock(Consumer.class))); // Case 1: DeadLetterPolicy is null - ConsumerBuilderImpl ConsumerBuilder1 = new ConsumerBuilderImpl<>(client, Schema.BYTES); - ConsumerBuilder1.topic("persistent://public/default/test"); - ConsumerBuilder1.subscriptionName("sub"); - ConsumerBuilder1.enableRetry(true); - ConsumerBuilder1.subscribeAsync(); + ConsumerBuilderImpl consumerBuilder1 = new ConsumerBuilderImpl<>(client, Schema.BYTES); + consumerBuilder1.topic("persistent://public/default/test"); + consumerBuilder1.subscriptionName("sub"); + consumerBuilder1.enableRetry(true); + consumerBuilder1.subscribeAsync(); verify(client, times(1)).getPartitionedTopicMetadata( eq("persistent://public/default/sub-RETRY"), anyBoolean(), anyBoolean()); @@ -1647,15 +1647,15 @@ public void testCheckUnnecessaryGetPartitionedTopicMetadataWhenUseRetryAndDQL() clearInvocations(client); // Case 2: DeadLetterPolicy with custom Retry topic - ConsumerBuilderImpl ConsumerBuilder2 = new ConsumerBuilderImpl<>(client, Schema.BYTES); - ConsumerBuilder2.topic("persistent://public/default/test"); - ConsumerBuilder2.subscriptionName("sub"); - ConsumerBuilder2.enableRetry(true); - ConsumerBuilder2.deadLetterPolicy(DeadLetterPolicy.builder() + ConsumerBuilderImpl consumerBuilder2 = new ConsumerBuilderImpl<>(client, Schema.BYTES); + consumerBuilder2.topic("persistent://public/default/test"); + consumerBuilder2.subscriptionName("sub"); + consumerBuilder2.enableRetry(true); + consumerBuilder2.deadLetterPolicy(DeadLetterPolicy.builder() .maxRedeliverCount(10) .retryLetterTopic("persistent://public/default/topic-retry") .build()); - ConsumerBuilder2.subscribeAsync(); + consumerBuilder2.subscribeAsync(); verify(client, times(0)).getPartitionedTopicMetadata( eq("persistent://public/default/sub-RETRY"), anyBoolean(), anyBoolean()); @@ -1665,15 +1665,15 @@ public void testCheckUnnecessaryGetPartitionedTopicMetadataWhenUseRetryAndDQL() clearInvocations(client); // Case 3: DeadLetterPolicy with custom DLQ topic - ConsumerBuilderImpl ConsumerBuilder3 = new ConsumerBuilderImpl<>(client, Schema.BYTES); - ConsumerBuilder3.topic("persistent://public/default/test"); - ConsumerBuilder3.subscriptionName("sub"); - ConsumerBuilder3.enableRetry(true); - ConsumerBuilder3.deadLetterPolicy(DeadLetterPolicy.builder() + ConsumerBuilderImpl consumerBuilder3 = new ConsumerBuilderImpl<>(client, Schema.BYTES); + consumerBuilder3.topic("persistent://public/default/test"); + consumerBuilder3.subscriptionName("sub"); + consumerBuilder3.enableRetry(true); + consumerBuilder3.deadLetterPolicy(DeadLetterPolicy.builder() .maxRedeliverCount(10) .deadLetterTopic("persistent://public/default/topic-dlq") .build()); - ConsumerBuilder3.subscribeAsync(); + consumerBuilder3.subscribeAsync(); verify(client, times(1)).getPartitionedTopicMetadata( eq("persistent://public/default/sub-RETRY"), anyBoolean(), anyBoolean()); @@ -1683,16 +1683,16 @@ public void testCheckUnnecessaryGetPartitionedTopicMetadataWhenUseRetryAndDQL() clearInvocations(client); // Case 4: DeadLetterPolicy with both custom topics - ConsumerBuilderImpl ConsumerBuilder4 = new ConsumerBuilderImpl<>(client, Schema.BYTES); - ConsumerBuilder4.topic("persistent://public/default/test"); - ConsumerBuilder4.subscriptionName("sub"); - ConsumerBuilder4.enableRetry(true); - ConsumerBuilder4.deadLetterPolicy(DeadLetterPolicy.builder() + ConsumerBuilderImpl consumerBuilder4 = new ConsumerBuilderImpl<>(client, Schema.BYTES); + consumerBuilder4.topic("persistent://public/default/test"); + consumerBuilder4.subscriptionName("sub"); + consumerBuilder4.enableRetry(true); + consumerBuilder4.deadLetterPolicy(DeadLetterPolicy.builder() .maxRedeliverCount(10) .retryLetterTopic("custom-retry") .deadLetterTopic("custom-dlq") .build()); - ConsumerBuilder4.subscribeAsync(); + consumerBuilder4.subscribeAsync(); verify(client, times(0)).getPartitionedTopicMetadata(anyString(), anyBoolean(), anyBoolean()); }