From c20e71c843d5a73a8afa232f2588fb75831e3aca Mon Sep 17 00:00:00 2001 From: feynmanlin Date: Sat, 23 May 2020 14:49:07 +0800 Subject: [PATCH 1/2] fix Duplicated messages are sent to dead letter topic #6960 --- .../client/api/DeadLetterTopicTest.java | 84 ++++++++++++++++++- .../pulsar/client/impl/ConsumerImpl.java | 9 +- 2 files changed, 86 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index b1f1ddd6fd849..3a64ed9569a91 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -18,16 +18,23 @@ */ package org.apache.pulsar.client.api; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.util.concurrent.TimeUnit; - -import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.fail; public class DeadLetterTopicTest extends ProducerConsumerBase { @@ -116,6 +123,77 @@ public void testDeadLetterTopic() throws Exception { newPulsarClient.close(); } + @Test(timeOut = 30000) + public void testDuplicatedMessageSendToDeadLetterTopic() throws Exception { + final String topic = "persistent://my-property/my-ns/dead-letter-topic-DuplicatedMessage"; + final int maxRedeliveryCount = 1; + final int messageCount = 10; + final int consumerCount = 3; + //1 start 3 parallel consumers + List> consumers = new ArrayList<>(); + final AtomicInteger totalReceived = new AtomicInteger(0); + Executor executor = Executors.newFixedThreadPool(consumerCount); + for (int i = 0; i < consumerCount; i++) { + executor.execute(() -> { + Consumer consumer = null; + try { + consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("my-subscription-DuplicatedMessage") + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(1001, TimeUnit.MILLISECONDS) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).deadLetterTopic(topic + "-DLQ").build()) + .negativeAckRedeliveryDelay(1001, TimeUnit.MILLISECONDS) + .receiverQueueSize(100) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .messageListener((MessageListener) (consumer1, msg) -> { + totalReceived.getAndIncrement(); + //never ack + }) + .subscribe(); + } catch (PulsarClientException e) { + fail(); + } + consumers.add(consumer); + }); + } + + //2 send messages + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + for (int i = 0; i < messageCount; i++) { + producer.send(String.format("Message [%d]", i)); + } + + //3 start a DLQ consumer + Consumer deadLetterConsumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic + "-DLQ") + .subscriptionName("my-subscription-DuplicatedMessage-DLQ") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + int totalInDeadLetter = 0; + while (true) { + Message message = deadLetterConsumer.receive(5, TimeUnit.SECONDS); + if (message == null) { + break; + } + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } + + //4 The number of messages that consumers can consume should be equal to messageCount * (maxRedeliveryCount + 1) + assertEquals(totalReceived.get(), messageCount * (maxRedeliveryCount + 1)); + + //5 The message in DLQ should be equal to messageCount + assertEquals(totalInDeadLetter, messageCount); + + //6 clean up + producer.close(); + deadLetterConsumer.close(); + for (Consumer consumer : consumers) { + consumer.close(); + } + } + /** * The test is disabled {@link https://github.com/apache/pulsar/issues/2647}. * @throws Exception diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 04983d9055406..37db0f319e6d0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1539,10 +1539,11 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { builder.setEntryId(messageId.getEntryId()); return builder.build(); }).collect(Collectors.toList()); - - ByteBuf cmd = Commands.newRedeliverUnacknowledgedMessages(consumerId, messageIdDatas); - cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise()); - messageIdDatas.forEach(MessageIdData::recycle); + if (!messageIdDatas.isEmpty()) { + ByteBuf cmd = Commands.newRedeliverUnacknowledgedMessages(consumerId, messageIdDatas); + cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise()); + messageIdDatas.forEach(MessageIdData::recycle); + } }); if (messagesFromQueue > 0) { increaseAvailablePermits(cnx, messagesFromQueue); From 30c1c717bf83ee12258d54d9f5a54c77b52dc735 Mon Sep 17 00:00:00 2001 From: feynmanlin <315157973@qq.com> Date: Sun, 24 May 2020 02:31:51 +0800 Subject: [PATCH 2/2] add threadPool shutdown --- .../pulsar/client/api/DeadLetterTopicTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index 3a64ed9569a91..ddeb05249b22c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -20,7 +20,7 @@ import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -132,12 +132,11 @@ public void testDuplicatedMessageSendToDeadLetterTopic() throws Exception { //1 start 3 parallel consumers List> consumers = new ArrayList<>(); final AtomicInteger totalReceived = new AtomicInteger(0); - Executor executor = Executors.newFixedThreadPool(consumerCount); + ExecutorService executor = Executors.newFixedThreadPool(consumerCount); for (int i = 0; i < consumerCount; i++) { executor.execute(() -> { - Consumer consumer = null; try { - consumer = pulsarClient.newConsumer(Schema.STRING) + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) .topic(topic) .subscriptionName("my-subscription-DuplicatedMessage") .subscriptionType(SubscriptionType.Shared) @@ -151,10 +150,10 @@ public void testDuplicatedMessageSendToDeadLetterTopic() throws Exception { //never ack }) .subscribe(); + consumers.add(consumer); } catch (PulsarClientException e) { fail(); } - consumers.add(consumer); }); } @@ -172,7 +171,7 @@ public void testDuplicatedMessageSendToDeadLetterTopic() throws Exception { .subscribe(); int totalInDeadLetter = 0; while (true) { - Message message = deadLetterConsumer.receive(5, TimeUnit.SECONDS); + Message message = deadLetterConsumer.receive(10, TimeUnit.SECONDS); if (message == null) { break; } @@ -187,6 +186,7 @@ public void testDuplicatedMessageSendToDeadLetterTopic() throws Exception { assertEquals(totalInDeadLetter, messageCount); //6 clean up + executor.shutdownNow(); producer.close(); deadLetterConsumer.close(); for (Consumer consumer : consumers) {