Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,9 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {

// round-robin dispatch batch size for this consumer
int availablePermits = c.isWritable() ? c.getAvailablePermits() : 1;
if (c.getMaxUnackedMessages() > 0) {
availablePermits = Math.min(availablePermits, c.getMaxUnackedMessages() - c.getUnackedMessages());
}
if (log.isDebugEnabled() && !c.isWritable()) {
log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {}; "
+ "availablePermits are {}", topic.getName(), name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1591,6 +1591,46 @@ public void testConsumerBlockingWithUnAckedMessagesMultipleIteration(boolean ack
}
}

@Test(dataProvider = "ackReceiptEnabled")
public void testMaxUnAckMessagesLowerThanPermits(boolean ackReceiptEnabled) throws PulsarClientException {
final int maxUnacks = 10;
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnacks);
final String topic = "persistent://my-property/my-ns/testMaxUnAckMessagesLowerThanPermits";

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic).subscriptionName("sub")
.subscriptionType(SubscriptionType.Shared)
.isAckReceiptEnabled(ackReceiptEnabled)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false)
.topic(topic)
.create();

final int messages = 1000;
for (int i = 0; i < messages; i++) {
producer.sendAsync("Message - " + i);
}
producer.flush();
List<MessageId> receives = new ArrayList<>();
for (int i = 0; i < maxUnacks; i++) {
Message<String> received = consumer.receive();
log.info("Received message {} with message ID {}", received.getValue(), received.getMessageId());
receives.add(received.getMessageId());
}
assertNull(consumer.receive(3, TimeUnit.SECONDS));
consumer.acknowledge(receives);
for (int i = 0; i < messages - maxUnacks; i++) {
Message<String> received = consumer.receive();
log.info("Received message {} with message ID {}", received.getValue(), received.getMessageId());
consumer.acknowledge(received);
}
}

/**
* Verify: Consumer1 which doesn't send ack will not impact Consumer2 which sends ack for consumed message.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1700,7 +1700,7 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhile
}

// client should not receive all produced messages and should be blocked due to unack-messages
assertEquals(messages1.size(), receiverQueueSize);
assertEquals(messages1.size(), unAckedMessagesBufferSize);
Set<MessageIdImpl> redeliveryMessages = messages1.stream().map(m -> {
return (MessageIdImpl) m.getMessageId();
}).collect(Collectors.toSet());
Expand Down