Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@
package org.apache.pulsar.client.api;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
Expand Down Expand Up @@ -48,7 +57,11 @@
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
Expand Down Expand Up @@ -1608,4 +1621,81 @@ public void sendDeadLetterTopicWithMismatchSchemaProducer() throws Exception {
consumer.close();
deadLetterConsumer.close();
}

@Test
public void testCheckUnnecessaryGetPartitionedTopicMetadataWhenUseRetryAndDQL() {
PulsarClientImpl client = mock(PulsarClientImpl.class);
ClientConfigurationData clientConf = new ClientConfigurationData();
when(client.getConfiguration()).thenReturn(clientConf);
when(client.getPartitionedTopicMetadata(anyString(), anyBoolean(), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)));
when(client.subscribeAsync(any(ConsumerConfigurationData.class), any(), any()))
.thenReturn(CompletableFuture.completedFuture(mock(Consumer.class)));

// Case 1: DeadLetterPolicy is null
ConsumerBuilderImpl<byte[]> consumerBuilder1 = new ConsumerBuilderImpl<>(client, Schema.BYTES);
consumerBuilder1.topic("persistent://public/default/test");
consumerBuilder1.subscriptionName("sub");
consumerBuilder1.enableRetry(true);
consumerBuilder1.subscribeAsync();

verify(client, times(1)).getPartitionedTopicMetadata(
eq("persistent://public/default/sub-RETRY"), anyBoolean(), anyBoolean());
verify(client, times(1)).getPartitionedTopicMetadata(
eq("persistent://public/default/sub-DLQ"), anyBoolean(), anyBoolean());

clearInvocations(client);

// Case 2: DeadLetterPolicy with custom Retry topic
ConsumerBuilderImpl<byte[]> consumerBuilder2 = new ConsumerBuilderImpl<>(client, Schema.BYTES);
consumerBuilder2.topic("persistent://public/default/test");
consumerBuilder2.subscriptionName("sub");
consumerBuilder2.enableRetry(true);
consumerBuilder2.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(10)
.retryLetterTopic("persistent://public/default/topic-retry")
.build());
consumerBuilder2.subscribeAsync();

verify(client, times(0)).getPartitionedTopicMetadata(
eq("persistent://public/default/sub-RETRY"), anyBoolean(), anyBoolean());
verify(client, times(1)).getPartitionedTopicMetadata(
eq("persistent://public/default/sub-DLQ"), anyBoolean(), anyBoolean());

clearInvocations(client);

// Case 3: DeadLetterPolicy with custom DLQ topic
ConsumerBuilderImpl<byte[]> consumerBuilder3 = new ConsumerBuilderImpl<>(client, Schema.BYTES);
consumerBuilder3.topic("persistent://public/default/test");
consumerBuilder3.subscriptionName("sub");
consumerBuilder3.enableRetry(true);
consumerBuilder3.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(10)
.deadLetterTopic("persistent://public/default/topic-dlq")
.build());
consumerBuilder3.subscribeAsync();

verify(client, times(1)).getPartitionedTopicMetadata(
eq("persistent://public/default/sub-RETRY"), anyBoolean(), anyBoolean());
verify(client, times(0)).getPartitionedTopicMetadata(
eq("persistent://public/default/sub-DLQ"), anyBoolean(), anyBoolean());

clearInvocations(client);

// Case 4: DeadLetterPolicy with both custom topics
ConsumerBuilderImpl<byte[]> consumerBuilder4 = new ConsumerBuilderImpl<>(client, Schema.BYTES);
consumerBuilder4.topic("persistent://public/default/test");
consumerBuilder4.subscriptionName("sub");
consumerBuilder4.enableRetry(true);
consumerBuilder4.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(10)
.retryLetterTopic("custom-retry")
.deadLetterTopic("custom-dlq")
.build());
consumerBuilder4.subscribeAsync();

verify(client, times(0)).getPartitionedTopicMetadata(anyString(), anyBoolean(), anyBoolean());
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,18 @@ public CompletableFuture<Consumer<T>> subscribeAsync() {
DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy();
if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())
|| StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {
CompletableFuture<Boolean> retryLetterTopicMetadata = checkDlqAlreadyExists(oldRetryLetterTopic);
CompletableFuture<Boolean> deadLetterTopicMetadata = checkDlqAlreadyExists(oldDeadLetterTopic);
CompletableFuture<Boolean> retryLetterTopicMetadata;
if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())) {
retryLetterTopicMetadata = checkDlqAlreadyExists(oldRetryLetterTopic);
} else {
retryLetterTopicMetadata = CompletableFuture.completedFuture(false);
}
CompletableFuture<Boolean> deadLetterTopicMetadata;
if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {
deadLetterTopicMetadata = checkDlqAlreadyExists(oldDeadLetterTopic);
} else {
deadLetterTopicMetadata = CompletableFuture.completedFuture(false);
}
applyDLQConfig = CompletableFuture.allOf(retryLetterTopicMetadata, deadLetterTopicMetadata)
.thenAccept(__ -> {
String retryLetterTopic = RetryMessageUtil.getRetryTopic(topicFirst.toString(),
Expand Down
Loading