diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index b1f1ddd6fd849..ddeb05249b22c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -18,16 +18,23 @@ */ package org.apache.pulsar.client.api; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.util.concurrent.TimeUnit; - -import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.fail; public class DeadLetterTopicTest extends ProducerConsumerBase { @@ -116,6 +123,77 @@ public void testDeadLetterTopic() throws Exception { newPulsarClient.close(); } + @Test(timeOut = 30000) + public void testDuplicatedMessageSendToDeadLetterTopic() throws Exception { + final String topic = "persistent://my-property/my-ns/dead-letter-topic-DuplicatedMessage"; + final int maxRedeliveryCount = 1; + final int messageCount = 10; + final int consumerCount = 3; + //1 start 3 parallel consumers + List> consumers = new ArrayList<>(); + final AtomicInteger totalReceived = new AtomicInteger(0); + ExecutorService executor = Executors.newFixedThreadPool(consumerCount); + for (int i = 0; i < consumerCount; i++) { + executor.execute(() -> { + try { + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("my-subscription-DuplicatedMessage") + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(1001, TimeUnit.MILLISECONDS) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).deadLetterTopic(topic + "-DLQ").build()) + .negativeAckRedeliveryDelay(1001, TimeUnit.MILLISECONDS) + .receiverQueueSize(100) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .messageListener((MessageListener) (consumer1, msg) -> { + totalReceived.getAndIncrement(); + //never ack + }) + .subscribe(); + consumers.add(consumer); + } catch (PulsarClientException e) { + fail(); + } + }); + } + + //2 send messages + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + for (int i = 0; i < messageCount; i++) { + producer.send(String.format("Message [%d]", i)); + } + + //3 start a DLQ consumer + Consumer deadLetterConsumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic + "-DLQ") + .subscriptionName("my-subscription-DuplicatedMessage-DLQ") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + int totalInDeadLetter = 0; + while (true) { + Message message = deadLetterConsumer.receive(10, TimeUnit.SECONDS); + if (message == null) { + break; + } + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } + + //4 The number of messages that consumers can consume should be equal to messageCount * (maxRedeliveryCount + 1) + assertEquals(totalReceived.get(), messageCount * (maxRedeliveryCount + 1)); + + //5 The message in DLQ should be equal to messageCount + assertEquals(totalInDeadLetter, messageCount); + + //6 clean up + executor.shutdownNow(); + producer.close(); + deadLetterConsumer.close(); + for (Consumer consumer : consumers) { + consumer.close(); + } + } + /** * The test is disabled {@link https://github.com/apache/pulsar/issues/2647}. * @throws Exception diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index e36ce0c88573c..e0f1bcb0ecb56 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1676,10 +1676,11 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { builder.setEntryId(messageId.getEntryId()); return builder.build(); }).collect(Collectors.toList()); - - ByteBuf cmd = Commands.newRedeliverUnacknowledgedMessages(consumerId, messageIdDatas); - cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise()); - messageIdDatas.forEach(MessageIdData::recycle); + if (!messageIdDatas.isEmpty()) { + ByteBuf cmd = Commands.newRedeliverUnacknowledgedMessages(consumerId, messageIdDatas); + cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise()); + messageIdDatas.forEach(MessageIdData::recycle); + } }); if (messagesFromQueue > 0) { increaseAvailablePermits(cnx, messagesFromQueue);