Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import lombok.Cleanup;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class ConsumerAckListTest extends ProducerConsumerBase {

@BeforeClass
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test(timeOut = 30000)
public void testBatchListAck() throws Exception {
ackListMessage(true,true);
ackListMessage(true,false);
ackListMessage(false,false);
ackListMessage(false,true);
}

public void ackListMessage(boolean isBatch, boolean isPartitioned) throws Exception {
final String topic = "persistent://my-property/my-ns/batch-ack-" + UUID.randomUUID();
final String subName = "testBatchAck-sub" + UUID.randomUUID();
final int messageNum = ThreadLocalRandom.current().nextInt(50, 100);
if (isPartitioned) {
admin.topics().createPartitionedTopic(topic, 3);
}
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(isBatch)
.batchingMaxPublishDelay(50, TimeUnit.MILLISECONDS)
.topic(topic).create();
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.subscriptionType(SubscriptionType.Shared)
.topic(topic)
.negativeAckRedeliveryDelay(1001, TimeUnit.MILLISECONDS)
.subscriptionName(subName)
.subscribe();
sendMessagesAsyncAndWait(producer, messageNum);
List<MessageId> messages = new ArrayList<>();
for (int i = 0; i < messageNum; i++) {
messages.add(consumer.receive().getMessageId());
}
consumer.acknowledge(messages);
//Wait ack send.
Thread.sleep(1000);
consumer.redeliverUnacknowledgedMessages();
Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
Assert.assertNull(msg);
}

private void sendMessagesAsyncAndWait(Producer<String> producer, int messages) throws Exception {
CountDownLatch latch = new CountDownLatch(messages);
for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
producer.sendAsync(message).thenAccept(messageId -> {
if (messageId != null) {
latch.countDown();
}
});
}
latch.await();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.api;

import java.io.Closeable;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -163,6 +164,13 @@ public interface Consumer<T> extends Closeable {
*/
void acknowledge(Messages<?> messages) throws PulsarClientException;

/**
* Acknowledge the consumption of a list of message.
* @param messageIdList
* @throws PulsarClientException
*/
void acknowledge(List<MessageId> messageIdList) throws PulsarClientException;

/**
* Acknowledge the failure to process a single message.
*
Expand Down Expand Up @@ -365,6 +373,13 @@ public interface Consumer<T> extends Closeable {
*/
CompletableFuture<Void> acknowledgeAsync(Messages<?> messages);

/**
* Asynchronously acknowledge the consumption of a list of message.
* @param messageIdList
* @return
*/
CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList);

/**
* Asynchronously reconsumeLater the consumption of a single message.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;

import java.util.List;
import java.util.Map;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
Expand All @@ -31,6 +32,8 @@ public interface AcknowledgmentsGroupingTracker extends AutoCloseable {

void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties);

void addListAcknowledgment(List<MessageIdImpl> messageIds, AckType ackType, Map<String, Long> properties);

void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType, Map<String, Long> properties);

void flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.common.collect.Queues;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -241,6 +242,15 @@ public void acknowledge(MessageId messageId) throws PulsarClientException {
}
}

@Override
public void acknowledge(List<MessageId> messageIdList) throws PulsarClientException {
try {
acknowledgeAsync(messageIdList).get();
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}

@Override
public void acknowledge(Messages<?> messages) throws PulsarClientException {
try {
Expand Down Expand Up @@ -322,6 +332,11 @@ public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages) {
}
}

@Override
public CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList) {
return doAcknowledgeWithTxn(messageIdList, AckType.Individual, Collections.emptyMap(), null);
}

@Override
public CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit) {
if (!conf.isRetryEnable()) {
Expand Down Expand Up @@ -408,6 +423,18 @@ public void negativeAcknowledge(Message<?> message) {
negativeAcknowledge(message.getMessageId());
}

protected CompletableFuture<Void> doAcknowledgeWithTxn(List<MessageId> messageIdList, AckType ackType,
Map<String,Long> properties,
TransactionImpl txn) {
CompletableFuture<Void> ackFuture = doAcknowledge(messageIdList, ackType, properties, txn);
if (txn != null) {
txn.registerAckedTopic(getTopic());
return txn.registerAckOp(ackFuture);
} else {
return ackFuture;
}
}

protected CompletableFuture<Void> doAcknowledgeWithTxn(MessageId messageId, AckType ackType,
Map<String,Long> properties,
TransactionImpl txn) {
Expand All @@ -428,6 +455,10 @@ protected abstract CompletableFuture<Void> doAcknowledge(MessageId messageId, Ac
Map<String,Long> properties,
TransactionImpl txn);

protected abstract CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, AckType ackType,
Map<String, Long> properties,
TransactionImpl txn);

protected abstract CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType,
Map<String,Long> properties,
long delayTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,47 @@ protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ack
return sendAcknowledge(messageId, ackType, properties, txnImpl);
}

@Override
protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, AckType ackType, Map<String, Long> properties, TransactionImpl txn) {
if (AckType.Cumulative.equals(ackType)) {
List<CompletableFuture<Void>> completableFutures = new ArrayList<>();
messageIdList.forEach(messageId -> completableFutures.add(doAcknowledge(messageId, ackType, properties, txn)));
return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]));
}
if (getState() != State.Ready && getState() != State.Connecting) {
stats.incrementNumAcksFailed();
PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState());
messageIdList.forEach(messageId -> onAcknowledge(messageId, exception));
return FutureUtil.failedFuture(exception);
}
List<MessageIdImpl> nonBatchMessageIds = new ArrayList<>();
for (MessageId messageId : messageIdList) {
MessageIdImpl messageIdImpl;
if (messageId instanceof BatchMessageIdImpl
&& !markAckForBatchMessage((BatchMessageIdImpl) messageId, ackType, properties)) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
messageIdImpl = new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId()
, batchMessageId.getPartitionIndex());
acknowledgmentsGroupingTracker.addBatchIndexAcknowledgment(batchMessageId, batchMessageId.getBatchIndex(),
batchMessageId.getBatchSize(), ackType, properties);
stats.incrementNumAcksSent(batchMessageId.getBatchSize());
} else {
messageIdImpl = (MessageIdImpl) messageId;
stats.incrementNumAcksSent(1);
nonBatchMessageIds.add(messageIdImpl);
}
unAckedMessageTracker.remove(messageIdImpl);
if (possibleSendToDeadLetterTopicMessages != null) {
possibleSendToDeadLetterTopicMessages.remove(messageIdImpl);
}
onAcknowledge(messageId, null);
}
if (nonBatchMessageIds.size() > 0) {
acknowledgmentsGroupingTracker.addListAcknowledgment(nonBatchMessageIds, ackType, properties);
}
return CompletableFuture.completedFuture(null);
}

@SuppressWarnings("unchecked")
@Override
protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -456,6 +457,34 @@ protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ack
}
}

@Override
protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, AckType ackType, Map<String, Long> properties, TransactionImpl txn) {
List<CompletableFuture<Void>> resultFutures = new ArrayList<>();
if (ackType == AckType.Cumulative) {
messageIdList.forEach(messageId -> resultFutures.add(doAcknowledge(messageId, ackType, properties, txn)));
return CompletableFuture.allOf(resultFutures.toArray(new CompletableFuture[0]));
} else {
if (getState() != State.Ready) {
return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed"));
}
Map<String, List<MessageId>> topicToMessageIdMap = new HashMap<>();
for (MessageId messageId : messageIdList) {
if (!(messageId instanceof TopicMessageIdImpl)) {
return FutureUtil.failedFuture(new IllegalArgumentException("messageId is not instance of TopicMessageIdImpl"));
}
TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
topicToMessageIdMap.putIfAbsent(topicMessageId.getTopicPartitionName(), new ArrayList<>());
topicToMessageIdMap.get(topicMessageId.getTopicPartitionName()).add(topicMessageId.getInnerMessageId());
}
topicToMessageIdMap.forEach((topicPartitionName, messageIds) -> {
ConsumerImpl<T> consumer = consumers.get(topicPartitionName);
resultFutures.add(consumer.doAcknowledgeWithTxn(messageIds, ackType, properties, txn)
.thenAccept((res) -> messageIdList.forEach(unAckedMessageTracker::remove)));
});
return CompletableFuture.allOf(resultFutures.toArray(new CompletableFuture[0]));
}
}

@Override
protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType,
Map<String,Long> properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;

import java.util.List;
import java.util.Map;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
Expand Down Expand Up @@ -45,6 +46,11 @@ public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String,
// no-op
}

@Override
public void addListAcknowledgment(List<MessageIdImpl> messageIds, AckType ackType, Map<String, Long> properties) {
// no-op
}

@Override
public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int BatchSize, AckType ackType, Map<String, Long> properties) {
// no-op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer, Consum
* Since the ack are delayed, we need to do some best-effort duplicate check to discard messages that are being
* resent after a disconnection and for which the user has already sent an acknowledgement.
*/
@Override
public boolean isDuplicate(MessageId messageId) {
if (messageId.compareTo(lastCumulativeAck) <= 0) {
// Already included in a cumulative ack
Expand All @@ -108,6 +109,31 @@ public boolean isDuplicate(MessageId messageId) {
}
}

@Override
public void addListAcknowledgment(List<MessageIdImpl> messageIds, AckType ackType, Map<String, Long> properties) {
if (ackType == AckType.Cumulative) {
messageIds.forEach(messageId -> doCumulativeAck(messageId, null));
return;
}
messageIds.forEach(messageId -> {
if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
pendingIndividualAcks.add(new MessageIdImpl(batchMessageId.getLedgerId(),
batchMessageId.getEntryId(), batchMessageId.getPartitionIndex()));
} else {
pendingIndividualAcks.add(messageId);
}
pendingIndividualBatchIndexAcks.remove(messageId);
if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
flush();
}
});
if (acknowledgementGroupTimeMicros == 0) {
flush();
}
}

@Override
public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties) {
if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) {
// We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an
Expand Down Expand Up @@ -213,6 +239,7 @@ private boolean doImmediateBatchIndexAck(BatchMessageIdImpl msgId, int batchInde
/**
* Flush all the pending acks and send them to the broker
*/
@Override
public void flush() {
ClientCnx cnx = consumer.getClientCnx();

Expand Down
Loading