diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index df7e42df80bd4..832e814e4eed9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -19,6 +19,15 @@ package org.apache.pulsar.client.api; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; @@ -48,7 +57,11 @@ import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.util.RetryMessageUtil; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.awaitility.Awaitility; import org.slf4j.Logger; @@ -1608,4 +1621,81 @@ public void sendDeadLetterTopicWithMismatchSchemaProducer() throws Exception { consumer.close(); deadLetterConsumer.close(); } + + @Test + public void testCheckUnnecessaryGetPartitionedTopicMetadataWhenUseRetryAndDQL() { + PulsarClientImpl client = mock(PulsarClientImpl.class); + ClientConfigurationData clientConf = new ClientConfigurationData(); + when(client.getConfiguration()).thenReturn(clientConf); + when(client.getPartitionedTopicMetadata(anyString(), anyBoolean(), anyBoolean())) + .thenReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata(0))); + when(client.subscribeAsync(any(ConsumerConfigurationData.class), any(), any())) + .thenReturn(CompletableFuture.completedFuture(mock(Consumer.class))); + + // Case 1: DeadLetterPolicy is null + ConsumerBuilderImpl consumerBuilder1 = new ConsumerBuilderImpl<>(client, Schema.BYTES); + consumerBuilder1.topic("persistent://public/default/test"); + consumerBuilder1.subscriptionName("sub"); + consumerBuilder1.enableRetry(true); + consumerBuilder1.subscribeAsync(); + + verify(client, times(1)).getPartitionedTopicMetadata( + eq("persistent://public/default/sub-RETRY"), anyBoolean(), anyBoolean()); + verify(client, times(1)).getPartitionedTopicMetadata( + eq("persistent://public/default/sub-DLQ"), anyBoolean(), anyBoolean()); + + clearInvocations(client); + + // Case 2: DeadLetterPolicy with custom Retry topic + ConsumerBuilderImpl consumerBuilder2 = new ConsumerBuilderImpl<>(client, Schema.BYTES); + consumerBuilder2.topic("persistent://public/default/test"); + consumerBuilder2.subscriptionName("sub"); + consumerBuilder2.enableRetry(true); + consumerBuilder2.deadLetterPolicy(DeadLetterPolicy.builder() + .maxRedeliverCount(10) + .retryLetterTopic("persistent://public/default/topic-retry") + .build()); + consumerBuilder2.subscribeAsync(); + + verify(client, times(0)).getPartitionedTopicMetadata( + eq("persistent://public/default/sub-RETRY"), anyBoolean(), anyBoolean()); + verify(client, times(1)).getPartitionedTopicMetadata( + eq("persistent://public/default/sub-DLQ"), anyBoolean(), anyBoolean()); + + clearInvocations(client); + + // Case 3: DeadLetterPolicy with custom DLQ topic + ConsumerBuilderImpl consumerBuilder3 = new ConsumerBuilderImpl<>(client, Schema.BYTES); + consumerBuilder3.topic("persistent://public/default/test"); + consumerBuilder3.subscriptionName("sub"); + consumerBuilder3.enableRetry(true); + consumerBuilder3.deadLetterPolicy(DeadLetterPolicy.builder() + .maxRedeliverCount(10) + .deadLetterTopic("persistent://public/default/topic-dlq") + .build()); + consumerBuilder3.subscribeAsync(); + + verify(client, times(1)).getPartitionedTopicMetadata( + eq("persistent://public/default/sub-RETRY"), anyBoolean(), anyBoolean()); + verify(client, times(0)).getPartitionedTopicMetadata( + eq("persistent://public/default/sub-DLQ"), anyBoolean(), anyBoolean()); + + clearInvocations(client); + + // Case 4: DeadLetterPolicy with both custom topics + ConsumerBuilderImpl consumerBuilder4 = new ConsumerBuilderImpl<>(client, Schema.BYTES); + consumerBuilder4.topic("persistent://public/default/test"); + consumerBuilder4.subscriptionName("sub"); + consumerBuilder4.enableRetry(true); + consumerBuilder4.deadLetterPolicy(DeadLetterPolicy.builder() + .maxRedeliverCount(10) + .retryLetterTopic("custom-retry") + .deadLetterTopic("custom-dlq") + .build()); + consumerBuilder4.subscribeAsync(); + + verify(client, times(0)).getPartitionedTopicMetadata(anyString(), anyBoolean(), anyBoolean()); + } + + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 11bf617e2fdc0..008a9b554e984 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -175,8 +175,18 @@ public CompletableFuture> subscribeAsync() { DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy(); if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic()) || StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) { - CompletableFuture retryLetterTopicMetadata = checkDlqAlreadyExists(oldRetryLetterTopic); - CompletableFuture deadLetterTopicMetadata = checkDlqAlreadyExists(oldDeadLetterTopic); + CompletableFuture retryLetterTopicMetadata; + if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())) { + retryLetterTopicMetadata = checkDlqAlreadyExists(oldRetryLetterTopic); + } else { + retryLetterTopicMetadata = CompletableFuture.completedFuture(false); + } + CompletableFuture deadLetterTopicMetadata; + if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) { + deadLetterTopicMetadata = checkDlqAlreadyExists(oldDeadLetterTopic); + } else { + deadLetterTopicMetadata = CompletableFuture.completedFuture(false); + } applyDLQConfig = CompletableFuture.allOf(retryLetterTopicMetadata, deadLetterTopicMetadata) .thenAccept(__ -> { String retryLetterTopic = RetryMessageUtil.getRetryTopic(topicFirst.toString(),