diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index a770e76fc4368..a7d52dcc0980b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -572,6 +572,9 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { // round-robin dispatch batch size for this consumer int availablePermits = c.isWritable() ? c.getAvailablePermits() : 1; + if (c.getMaxUnackedMessages() > 0) { + availablePermits = Math.min(availablePermits, c.getMaxUnackedMessages() - c.getUnackedMessages()); + } if (log.isDebugEnabled() && !c.isWritable()) { log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {}; " + "availablePermits are {}", topic.getName(), name, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 8205dbe935b79..d4c124d607a0e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -1591,6 +1591,46 @@ public void testConsumerBlockingWithUnAckedMessagesMultipleIteration(boolean ack } } + @Test(dataProvider = "ackReceiptEnabled") + public void testMaxUnAckMessagesLowerThanPermits(boolean ackReceiptEnabled) throws PulsarClientException { + final int maxUnacks = 10; + pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnacks); + final String topic = "persistent://my-property/my-ns/testMaxUnAckMessagesLowerThanPermits"; + + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic).subscriptionName("sub") + .subscriptionType(SubscriptionType.Shared) + .isAckReceiptEnabled(ackReceiptEnabled) + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .enableBatching(false) + .topic(topic) + .create(); + + final int messages = 1000; + for (int i = 0; i < messages; i++) { + producer.sendAsync("Message - " + i); + } + producer.flush(); + List receives = new ArrayList<>(); + for (int i = 0; i < maxUnacks; i++) { + Message received = consumer.receive(); + log.info("Received message {} with message ID {}", received.getValue(), received.getMessageId()); + receives.add(received.getMessageId()); + } + assertNull(consumer.receive(3, TimeUnit.SECONDS)); + consumer.acknowledge(receives); + for (int i = 0; i < messages - maxUnacks; i++) { + Message received = consumer.receive(); + log.info("Received message {} with message ID {}", received.getValue(), received.getMessageId()); + consumer.acknowledge(received); + } + } + /** * Verify: Consumer1 which doesn't send ack will not impact Consumer2 which sends ack for consumed message. * diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java index 9679620c35724..c241b6e6cc76c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java @@ -1700,7 +1700,7 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhile } // client should not receive all produced messages and should be blocked due to unack-messages - assertEquals(messages1.size(), receiverQueueSize); + assertEquals(messages1.size(), unAckedMessagesBufferSize); Set redeliveryMessages = messages1.stream().map(m -> { return (MessageIdImpl) m.getMessageId(); }).collect(Collectors.toSet());