From 8f91da383e86fe5527d4b7c568b7754cb646aedd Mon Sep 17 00:00:00 2001 From: feynmanlin Date: Wed, 29 Jul 2020 16:01:12 +0800 Subject: [PATCH 1/4] Expose messagesImpl --- .../client/api/ConsumerBatchReceiveTest.java | 26 +++++++++++++++++++ .../pulsar/client/impl/MessagesImpl.java | 6 ++--- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java index eb6519819ae6e..625d0ecf69909 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.api; import lombok.Cleanup; +import org.apache.pulsar.client.impl.MessagesImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -263,6 +264,31 @@ private void testBatchReceive(String topic, BatchReceivePolicy batchReceivePolic batchReceiveAndCheck(consumer, 100); } + @Test(timeOut = 10000) + public void testBatchAck() throws Exception { + final String topic = "persistent://my-property/my-ns/batch-ack-" + UUID.randomUUID(); + final String subName = "testBatchAck-sub"; + final int messageNum = 50; + ProducerBuilder producerBuilder = pulsarClient.newProducer(Schema.STRING).topic(topic); + @Cleanup + Producer producer = producerBuilder.create(); + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .subscriptionType(SubscriptionType.Shared) + .topic(topic) + .ackTimeout(1,TimeUnit.SECONDS) + .subscriptionName(subName) + .subscribe(); + sendMessagesAsyncAndWait(producer, messageNum); + MessagesImpl messages = new MessagesImpl(100, 100000); + for (int i = 0; i < messageNum; i++) { + messages.add(consumer.receive()); + } + consumer.acknowledge(messages); + Message msg = consumer.receive(3, TimeUnit.SECONDS); + Assert.assertNull(msg); + } + private void testBatchReceiveAsync(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception { if (batchReceivePolicy.getTimeoutMs() <= 0) { return; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java index c56694e2759ea..19967c6143a7c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java @@ -38,13 +38,13 @@ public class MessagesImpl implements Messages { private int currentNumberOfMessages; private long currentSizeOfMessages; - protected MessagesImpl(int maxNumberOfMessages, long maxSizeOfMessages) { + public MessagesImpl(int maxNumberOfMessages, long maxSizeOfMessages) { this.maxNumberOfMessages = maxNumberOfMessages; this.maxSizeOfMessages = maxSizeOfMessages; messageList = maxNumberOfMessages > 0 ? new ArrayList<>(maxNumberOfMessages) : new ArrayList<>(); } - protected boolean canAdd(Message message) { + public boolean canAdd(Message message) { if (maxNumberOfMessages > 0 && currentNumberOfMessages + 1 > maxNumberOfMessages) { return false; } @@ -56,7 +56,7 @@ protected boolean canAdd(Message message) { return true; } - protected void add(Message message) { + public void add(Message message) { if (message == null) { return; } From b4fcfcee286e5488a954be105c0a9b68fd07c537 Mon Sep 17 00:00:00 2001 From: feynmanlin Date: Wed, 29 Jul 2020 19:53:50 +0800 Subject: [PATCH 2/4] add batch ack --- .../client/api/ConsumerAckListTest.java | 127 +++++++++++++++ .../client/api/ConsumerBatchReceiveTest.java | 106 +++++-------- .../apache/pulsar/client/api/Consumer.java | 17 ++ .../impl/AcknowledgmentsGroupingTracker.java | 3 + .../pulsar/client/impl/ConsumerBase.java | 33 +++- .../pulsar/client/impl/ConsumerImpl.java | 61 ++++++-- .../pulsar/client/impl/MessagesImpl.java | 6 +- .../client/impl/MultiTopicsConsumerImpl.java | 30 ++++ ...rsistentAcknowledgmentGroupingTracker.java | 6 + ...sistentAcknowledgmentsGroupingTracker.java | 27 ++++ .../AcknowledgementsGroupingTrackerTest.java | 147 ++++++++++++++++++ 11 files changed, 481 insertions(+), 82 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java new file mode 100644 index 0000000000000..c357e4f70893f --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import lombok.Cleanup; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class ConsumerAckListTest extends ProducerConsumerBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(timeOut = 20000) + public void testBatchListAck() throws Exception { + nonPartitionAckBatchMessage(true); + nonPartitionAckBatchMessage(false); + } + + public void nonPartitionAckBatchMessage(boolean isBatch) throws Exception { + final String topic = "persistent://my-property/my-ns/batch-ack-" + UUID.randomUUID(); + final String subName = "testBatchAck-sub" + UUID.randomUUID(); + final int messageNum = 100; + ProducerBuilder producerBuilder = pulsarClient.newProducer(Schema.STRING).enableBatching(isBatch).topic(topic); + @Cleanup + Producer producer = producerBuilder.create(); + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .subscriptionType(SubscriptionType.Shared) + .topic(topic) + .ackTimeout(1001, TimeUnit.MILLISECONDS) + .negativeAckRedeliveryDelay(1001, TimeUnit.MILLISECONDS) + .subscriptionName(subName) + .subscribe(); + sendMessagesAsyncAndWait(producer, messageNum); + List messages = new ArrayList<>(); + for (int i = 0; i < messageNum; i++) { + messages.add(consumer.receive().getMessageId()); + } + consumer.acknowledge(messages); + consumer.redeliverUnacknowledgedMessages(); + Message msg = consumer.receive(3, TimeUnit.SECONDS); + Assert.assertNull(msg); + } + + @Test(timeOut = 20000) + public void testPartitionedTopicAckBatchMessage() throws Exception { + partitionedTopicAckBatchMessage(true); + partitionedTopicAckBatchMessage(false); + } + + public void partitionedTopicAckBatchMessage(boolean isBatch) throws Exception { + final String topic = "persistent://my-property/my-ns/batch-ack-" + UUID.randomUUID(); + final String subName = "testBatchAck-sub" + UUID.randomUUID(); + final int messageNum = 100; + admin.topics().createPartitionedTopic(topic, 3); + ProducerBuilder producerBuilder = pulsarClient.newProducer(Schema.STRING).enableBatching(isBatch).topic(topic); + @Cleanup + Producer producer = producerBuilder.create(); + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .subscriptionType(SubscriptionType.Shared) + .topic(topic) + .ackTimeout(1001, TimeUnit.MILLISECONDS) + .negativeAckRedeliveryDelay(1001, TimeUnit.MILLISECONDS) + .subscriptionName(subName) + .subscribe(); + sendMessagesAsyncAndWait(producer, messageNum); + + List messages = new ArrayList<>(); + for (int i = 0; i < messageNum; i++) { + messages.add(consumer.receive().getMessageId()); + } + consumer.acknowledge(messages); + consumer.redeliverUnacknowledgedMessages(); + Message msg = consumer.receive(3, TimeUnit.SECONDS); + Assert.assertNull(msg); + } + + private void sendMessagesAsyncAndWait(Producer producer, int messages) throws Exception { + CountDownLatch latch = new CountDownLatch(messages); + for (int i = 0; i < messages; i++) { + String message = "my-message-" + i; + producer.sendAsync(message).thenAccept(messageId -> { + if (messageId != null) { + latch.countDown(); + } + }); + } + latch.await(); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java index 625d0ecf69909..06a87628f62d6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java @@ -19,7 +19,6 @@ package org.apache.pulsar.client.api; import lombok.Cleanup; -import org.apache.pulsar.client.impl.MessagesImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -105,73 +104,73 @@ public Object[][] batchReceivePolicyProvider() { }, // Number of message limitation exceed receiverQueue size { - BatchReceivePolicy.builder() - .maxNumMessages(70) - .build(), true, 50 + BatchReceivePolicy.builder() + .maxNumMessages(70) + .build(), true, 50 }, // Number of message limitation exceed receiverQueue size and timeout limitation { - BatchReceivePolicy.builder() - .maxNumMessages(50) - .timeout(10, TimeUnit.MILLISECONDS) - .build(), true, 30 + BatchReceivePolicy.builder() + .maxNumMessages(50) + .timeout(10, TimeUnit.MILLISECONDS) + .build(), true, 30 }, // Number of message limitation is negative and timeout limitation { - BatchReceivePolicy.builder() - .maxNumMessages(-10) - .timeout(10, TimeUnit.MILLISECONDS) - .build(), true, 10 + BatchReceivePolicy.builder() + .maxNumMessages(-10) + .timeout(10, TimeUnit.MILLISECONDS) + .build(), true, 10 }, // Size of message limitation is negative and timeout limitation { - BatchReceivePolicy.builder() - .maxNumBytes(-100) - .timeout(50, TimeUnit.MILLISECONDS) - .build(), true, 30 + BatchReceivePolicy.builder() + .maxNumBytes(-100) + .timeout(50, TimeUnit.MILLISECONDS) + .build(), true, 30 }, // Number of message limitation and size of message limitation are both negative and timeout limitation { - BatchReceivePolicy.builder() - .maxNumMessages(-10) - .maxNumBytes(-100) - .timeout(50, TimeUnit.MILLISECONDS) - .build(), true, 30 + BatchReceivePolicy.builder() + .maxNumMessages(-10) + .maxNumBytes(-100) + .timeout(50, TimeUnit.MILLISECONDS) + .build(), true, 30 }, // Number of message limitation exceed receiverQueue size { - BatchReceivePolicy.builder() - .maxNumMessages(70) - .build(), false, 50 + BatchReceivePolicy.builder() + .maxNumMessages(70) + .build(), false, 50 }, // Number of message limitation exceed receiverQueue size and timeout limitation { - BatchReceivePolicy.builder() - .maxNumMessages(50) - .timeout(50, TimeUnit.MILLISECONDS) - .build(), false, 30 + BatchReceivePolicy.builder() + .maxNumMessages(50) + .timeout(50, TimeUnit.MILLISECONDS) + .build(), false, 30 }, // Number of message limitation is negative and timeout limitation { - BatchReceivePolicy.builder() - .maxNumMessages(-10) - .timeout(50, TimeUnit.MILLISECONDS) - .build(), false, 30 + BatchReceivePolicy.builder() + .maxNumMessages(-10) + .timeout(50, TimeUnit.MILLISECONDS) + .build(), false, 30 }, // Size of message limitation is negative and timeout limitation { - BatchReceivePolicy.builder() - .maxNumBytes(-100) - .timeout(50, TimeUnit.MILLISECONDS) - .build(), false, 30 + BatchReceivePolicy.builder() + .maxNumBytes(-100) + .timeout(50, TimeUnit.MILLISECONDS) + .build(), false, 30 }, // Number of message limitation and size of message limitation are both negative and timeout limitation { - BatchReceivePolicy.builder() - .maxNumMessages(-10) - .maxNumBytes(-100) - .timeout(50, TimeUnit.MILLISECONDS) - .build(), false, 30 + BatchReceivePolicy.builder() + .maxNumMessages(-10) + .maxNumBytes(-100) + .timeout(50, TimeUnit.MILLISECONDS) + .build(), false, 30 } }; } @@ -264,31 +263,6 @@ private void testBatchReceive(String topic, BatchReceivePolicy batchReceivePolic batchReceiveAndCheck(consumer, 100); } - @Test(timeOut = 10000) - public void testBatchAck() throws Exception { - final String topic = "persistent://my-property/my-ns/batch-ack-" + UUID.randomUUID(); - final String subName = "testBatchAck-sub"; - final int messageNum = 50; - ProducerBuilder producerBuilder = pulsarClient.newProducer(Schema.STRING).topic(topic); - @Cleanup - Producer producer = producerBuilder.create(); - @Cleanup - Consumer consumer = pulsarClient.newConsumer(Schema.STRING) - .subscriptionType(SubscriptionType.Shared) - .topic(topic) - .ackTimeout(1,TimeUnit.SECONDS) - .subscriptionName(subName) - .subscribe(); - sendMessagesAsyncAndWait(producer, messageNum); - MessagesImpl messages = new MessagesImpl(100, 100000); - for (int i = 0; i < messageNum; i++) { - messages.add(consumer.receive()); - } - consumer.acknowledge(messages); - Message msg = consumer.receive(3, TimeUnit.SECONDS); - Assert.assertNull(msg); - } - private void testBatchReceiveAsync(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception { if (batchReceivePolicy.getTimeoutMs() <= 0) { return; diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java index 1ee5bfa23f43b..35b5712206784 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.api; import java.io.Closeable; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -163,6 +164,15 @@ public interface Consumer extends Closeable { */ void acknowledge(Messages messages) throws PulsarClientException; + /** + * Acknowledge the consumption of a list of message. + * + * @param messages messages + * @throws PulsarClientException.AlreadyClosedException + * if the consumer was already closed + */ + void acknowledge(List messageIdList) throws PulsarClientException; + /** * Acknowledge the failure to process a single message. * @@ -365,6 +375,13 @@ public interface Consumer extends Closeable { */ CompletableFuture acknowledgeAsync(Messages messages); + /** + * Asynchronously acknowledge the consumption of a list of message. + * @param messages + * @return + */ + CompletableFuture acknowledgeAsync(List messageIdList); + /** * Asynchronously reconsumeLater the consumption of a single message. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java index 1517f124bc599..a624e1536f607 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import java.util.List; import java.util.Map; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; @@ -31,6 +32,8 @@ public interface AcknowledgmentsGroupingTracker extends AutoCloseable { void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map properties); + void addListAcknowledgment(List messageIds, AckType ackType, Map properties); + void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType, Map properties); void flush(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index cdc3f5fc4a391..6a909bc977fe7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -22,6 +22,7 @@ import com.google.common.collect.Queues; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -217,6 +218,15 @@ public void acknowledge(MessageId messageId) throws PulsarClientException { } } + @Override + public void acknowledge(List messageIdList) throws PulsarClientException { + try { + acknowledgeAsync(messageIdList).get(); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); + } + } + @Override public void acknowledge(Messages messages) throws PulsarClientException { try { @@ -298,6 +308,11 @@ public CompletableFuture acknowledgeAsync(Messages messages) { } } + @Override + public CompletableFuture acknowledgeAsync(List messageIdList) { + return doAcknowledgeWithTxn(messageIdList, AckType.Individual, Collections.emptyMap(), null); + } + @Override public CompletableFuture reconsumeLaterAsync(Message message, long delayTime, TimeUnit unit) { if (!conf.isRetryEnable()) { @@ -384,6 +399,18 @@ public void negativeAcknowledge(Message message) { negativeAcknowledge(message.getMessageId()); } + protected CompletableFuture doAcknowledgeWithTxn(List messageIdList, AckType ackType, + Map properties, + TransactionImpl txn) { + CompletableFuture ackFuture = doAcknowledge(messageIdList, ackType, properties, txn); + if (txn != null) { + txn.registerAckedTopic(getTopic()); + return txn.registerAckOp(ackFuture); + } else { + return ackFuture; + } + } + protected CompletableFuture doAcknowledgeWithTxn(MessageId messageId, AckType ackType, Map properties, TransactionImpl txn) { @@ -404,8 +431,12 @@ protected abstract CompletableFuture doAcknowledge(MessageId messageId, Ac Map properties, TransactionImpl txn); + protected abstract CompletableFuture doAcknowledge(List messageIdList, AckType ackType, + Map properties, + TransactionImpl txn); + protected abstract CompletableFuture doReconsumeLater(Message message, AckType ackType, - Map properties, + Map properties, long delayTime, TimeUnit unit); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index e0f1bcb0ecb56..f3127505fd4c1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -25,7 +25,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import com.google.common.collect.Queues; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -40,8 +39,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -84,7 +81,6 @@ import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.client.util.RetryMessageUtil; -import org.apache.pulsar.common.api.proto.PulsarApi.IntRange; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.api.EncryptionContext; import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey; @@ -172,9 +168,9 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private volatile Producer retryLetterProducer; private final ReadWriteLock createProducerLock = new ReentrantReadWriteLock(); - + protected volatile boolean paused; - + protected ConcurrentOpenHashMap chunkedMessagesMap = new ConcurrentOpenHashMap<>(); private int pendingChunckedMessageCount = 0; protected long expireTimeOfIncompleteChunkedMessageMillis = 0; @@ -557,10 +553,51 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack return sendAcknowledge(messageId, ackType, properties, txnImpl); } + @Override + protected CompletableFuture doAcknowledge(List messageIdList, AckType ackType, Map properties, TransactionImpl txn) { + if (AckType.Cumulative.equals(ackType)) { + List> completableFutures = new ArrayList<>(); + messageIdList.forEach(messageId -> completableFutures.add(doAcknowledge(messageId, ackType, properties, txn))); + return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])); + } + if (getState() != State.Ready && getState() != State.Connecting) { + stats.incrementNumAcksFailed(); + PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState()); + messageIdList.forEach(messageId -> onAcknowledge(messageId, exception)); + return FutureUtil.failedFuture(exception); + } + List nonBatchMessageIds = new ArrayList<>(); + MessageIdImpl messageIdImpl; + for (MessageId messageId : messageIdList) { + if (messageId instanceof BatchMessageIdImpl + && !markAckForBatchMessage((BatchMessageIdImpl) messageId, ackType, properties)) { + BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; + messageIdImpl = new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId() + , batchMessageId.getPartitionIndex()); + acknowledgmentsGroupingTracker.addBatchIndexAcknowledgment(batchMessageId, batchMessageId.getBatchIndex(), + batchMessageId.getBatchSize(), ackType, properties); + stats.incrementNumAcksSent(batchMessageId.getBatchSize()); + } else { + messageIdImpl = (MessageIdImpl) messageId; + stats.incrementNumAcksSent(1); + nonBatchMessageIds.add((MessageIdImpl) messageId); + } + unAckedMessageTracker.remove(messageIdImpl); + if (possibleSendToDeadLetterTopicMessages != null) { + possibleSendToDeadLetterTopicMessages.remove(messageIdImpl); + } + onAcknowledge(messageId, null); + } + if (nonBatchMessageIds.size() > 0) { + acknowledgmentsGroupingTracker.addListAcknowledgment(nonBatchMessageIds, ackType, properties); + } + return CompletableFuture.completedFuture(null); + } + @SuppressWarnings("unchecked") @Override protected CompletableFuture doReconsumeLater(Message message, AckType ackType, - Map properties, + Map properties, long delayTime, TimeUnit unit) { MessageId messageId = message.getMessageId(); @@ -620,7 +657,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a if (propertiesMap.containsKey(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) { reconsumetimes = Integer.valueOf(propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)); reconsumetimes = reconsumetimes + 1; - + } else { propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr); propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr); @@ -628,7 +665,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(reconsumetimes)); propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, String.valueOf(unit.toMillis(delayTime))); - + if (reconsumetimes > this.deadLetterPolicy.getMaxRedeliverCount()) { processPossibleToDLQ((MessageIdImpl)messageId); if (deadLetterProducer == null) { @@ -1075,7 +1112,7 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List ac // if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message // and return undecrypted payload if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) { - + // right now, chunked messages are only supported by non-shared subscription if (isChunkedMessage) { uncompressedPayload = processMessageChunk(uncompressedPayload, msgMetadata, msgId, messageId, cnx); @@ -1144,7 +1181,7 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m TimeUnit.MILLISECONDS); expireChunkMessageTaskScheduled = true; } - + if (msgMetadata.getChunkId() == 0) { ByteBuf chunkedMsgBuffer = Unpooled.directBuffer(msgMetadata.getTotalChunkMsgSize(), msgMetadata.getTotalChunkMsgSize()); @@ -1214,7 +1251,7 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m compressedPayload.release(); return uncompressedPayload; } - + protected void triggerListener(int numMessages) { // Trigger the notification on the message listener in a separate thread to avoid blocking the networking // thread while the message processing happens diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java index 19967c6143a7c..c56694e2759ea 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java @@ -38,13 +38,13 @@ public class MessagesImpl implements Messages { private int currentNumberOfMessages; private long currentSizeOfMessages; - public MessagesImpl(int maxNumberOfMessages, long maxSizeOfMessages) { + protected MessagesImpl(int maxNumberOfMessages, long maxSizeOfMessages) { this.maxNumberOfMessages = maxNumberOfMessages; this.maxSizeOfMessages = maxSizeOfMessages; messageList = maxNumberOfMessages > 0 ? new ArrayList<>(maxNumberOfMessages) : new ArrayList<>(); } - public boolean canAdd(Message message) { + protected boolean canAdd(Message message) { if (maxNumberOfMessages > 0 && currentNumberOfMessages + 1 > maxNumberOfMessages) { return false; } @@ -56,7 +56,7 @@ public boolean canAdd(Message message) { return true; } - public void add(Message message) { + protected void add(Message message) { if (message == null) { return; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 9841b27e90063..ebae323348f9f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -47,6 +47,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -456,6 +457,35 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack } } + @Override + protected CompletableFuture doAcknowledge(List messageIdList, AckType ackType, Map properties, TransactionImpl txn) { + if (ackType == AckType.Cumulative) { + List> completableFutures = new ArrayList<>(); + messageIdList.forEach(messageId -> completableFutures.add(doAcknowledge(messageId, ackType, properties, txn))); + return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])); + } else { + if (getState() != State.Ready) { + return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed")); + } + Map> topicToMessageIdMap = new HashMap<>(); + for (MessageId messageId : messageIdList) { + if (!(messageId instanceof TopicMessageIdImpl)) { + return FutureUtil.failedFuture(new IllegalArgumentException("messageId is not instance of TopicMessageIdImpl")); + } + TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId; + topicToMessageIdMap.putIfAbsent(topicMessageId.getTopicPartitionName(), new ArrayList<>()); + topicToMessageIdMap.get(topicMessageId.getTopicPartitionName()).add(topicMessageId.getInnerMessageId()); + } + List> resultFutures = new ArrayList<>(); + topicToMessageIdMap.forEach((topicPartitionName, messageIds) -> { + ConsumerImpl consumer = consumers.get(topicPartitionName); + resultFutures.add(consumer.doAcknowledgeWithTxn(messageIds, ackType, properties, txn) + .thenAccept((res) -> messageIdList.forEach(unAckedMessageTracker::remove))); + }); + return CompletableFuture.allOf(resultFutures.toArray(new CompletableFuture[0])); + } + } + @Override protected CompletableFuture doReconsumeLater(Message message, AckType ackType, Map properties, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java index 9061ac30bd61a..d70c7ce86c3e1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import java.util.List; import java.util.Map; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; @@ -45,6 +46,11 @@ public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map messageIds, AckType ackType, Map properties) { + // no-op + } + @Override public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int BatchSize, AckType ackType, Map properties) { // no-op diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index 690897998b83d..17e8c223ede33 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -99,6 +99,7 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl consumer, Consum * Since the ack are delayed, we need to do some best-effort duplicate check to discard messages that are being * resent after a disconnection and for which the user has already sent an acknowledgement. */ + @Override public boolean isDuplicate(MessageId messageId) { if (messageId.compareTo(lastCumulativeAck) <= 0) { // Already included in a cumulative ack @@ -108,6 +109,31 @@ public boolean isDuplicate(MessageId messageId) { } } + @Override + public void addListAcknowledgment(List messageIds, AckType ackType, Map properties) { + if (ackType == AckType.Cumulative) { + messageIds.forEach(messageId -> doCumulativeAck(messageId, null)); + return; + } + messageIds.forEach(messageId -> { + if (messageId instanceof BatchMessageIdImpl) { + BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; + pendingIndividualAcks.add(new MessageIdImpl(batchMessageId.getLedgerId(), + batchMessageId.getEntryId(), batchMessageId.getPartitionIndex())); + } else { + pendingIndividualAcks.add(messageId); + } + pendingIndividualBatchIndexAcks.remove(messageId); + if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) { + flush(); + } + }); + if (acknowledgementGroupTimeMicros == 0) { + flush(); + } + } + + @Override public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map properties) { if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) { // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an @@ -213,6 +239,7 @@ private boolean doImmediateBatchIndexAck(BatchMessageIdImpl msgId, int batchInde /** * Flush all the pending acks and send them to the broker */ + @Override public void flush() { ClientCnx cnx = consumer.getClientCnx(); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java index eac424e95163f..1eb5b4276742b 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java @@ -117,6 +117,64 @@ public void testAckTracker() throws Exception { tracker.close(); } + @Test + public void testBatchAckTracker() throws Exception { + ConsumerConfigurationData conf = new ConsumerConfigurationData<>(); + conf.setAcknowledgementsGroupTimeMicros(TimeUnit.SECONDS.toMicros(10)); + PersistentAcknowledgmentsGroupingTracker tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup); + + MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0); + MessageIdImpl msg2 = new MessageIdImpl(5, 2, 0); + MessageIdImpl msg3 = new MessageIdImpl(5, 3, 0); + MessageIdImpl msg4 = new MessageIdImpl(5, 4, 0); + MessageIdImpl msg5 = new MessageIdImpl(5, 5, 0); + MessageIdImpl msg6 = new MessageIdImpl(5, 6, 0); + + assertFalse(tracker.isDuplicate(msg1)); + + tracker.addListAcknowledgment(Collections.singletonList(msg1), AckType.Individual, Collections.emptyMap()); + assertTrue(tracker.isDuplicate(msg1)); + + assertFalse(tracker.isDuplicate(msg2)); + + tracker.addListAcknowledgment(Collections.singletonList(msg5), AckType.Cumulative, Collections.emptyMap()); + assertTrue(tracker.isDuplicate(msg1)); + assertTrue(tracker.isDuplicate(msg2)); + assertTrue(tracker.isDuplicate(msg3)); + + assertTrue(tracker.isDuplicate(msg4)); + assertTrue(tracker.isDuplicate(msg5)); + assertFalse(tracker.isDuplicate(msg6)); + + // Flush while disconnected. the internal tracking will not change + tracker.flush(); + + assertTrue(tracker.isDuplicate(msg1)); + assertTrue(tracker.isDuplicate(msg2)); + assertTrue(tracker.isDuplicate(msg3)); + + assertTrue(tracker.isDuplicate(msg4)); + assertTrue(tracker.isDuplicate(msg5)); + assertFalse(tracker.isDuplicate(msg6)); + + tracker.addListAcknowledgment(Collections.singletonList(msg6), AckType.Individual, Collections.emptyMap()); + assertTrue(tracker.isDuplicate(msg6)); + + when(consumer.getClientCnx()).thenReturn(cnx); + + tracker.flush(); + + assertTrue(tracker.isDuplicate(msg1)); + assertTrue(tracker.isDuplicate(msg2)); + assertTrue(tracker.isDuplicate(msg3)); + + assertTrue(tracker.isDuplicate(msg4)); + assertTrue(tracker.isDuplicate(msg5)); + assertFalse(tracker.isDuplicate(msg6)); + + tracker.close(); + } + @Test public void testImmediateAckingTracker() throws Exception { ConsumerConfigurationData conf = new ConsumerConfigurationData<>(); @@ -144,6 +202,35 @@ public void testImmediateAckingTracker() throws Exception { tracker.close(); } + @Test + public void testImmediateBatchAckingTracker() throws Exception { + ConsumerConfigurationData conf = new ConsumerConfigurationData<>(); + conf.setAcknowledgementsGroupTimeMicros(0); + PersistentAcknowledgmentsGroupingTracker tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup); + + MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0); + MessageIdImpl msg2 = new MessageIdImpl(5, 2, 0); + + assertFalse(tracker.isDuplicate(msg1)); + + when(consumer.getClientCnx()).thenReturn(null); + + tracker.addListAcknowledgment(Collections.singletonList(msg1), AckType.Individual, Collections.emptyMap()); + tracker.flush(); + //cnx is null can not flush + assertTrue(tracker.isDuplicate(msg1)); + + when(consumer.getClientCnx()).thenReturn(cnx); + + tracker.flush(); + assertFalse(tracker.isDuplicate(msg1)); + + tracker.addListAcknowledgment(Collections.singletonList(msg2), AckType.Individual, Collections.emptyMap()); + // Since we were connected, the ack went out immediately + assertFalse(tracker.isDuplicate(msg2)); + tracker.close(); + } + @Test public void testAckTrackerMultiAck() throws Exception { ConsumerConfigurationData conf = new ConsumerConfigurationData<>(); @@ -203,4 +290,64 @@ public void testAckTrackerMultiAck() throws Exception { tracker.close(); } + + @Test + public void testBatchAckTrackerMultiAck() throws Exception { + ConsumerConfigurationData conf = new ConsumerConfigurationData<>(); + conf.setAcknowledgementsGroupTimeMicros(TimeUnit.SECONDS.toMicros(10)); + PersistentAcknowledgmentsGroupingTracker tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup); + + when(cnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12_VALUE); + + MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0); + MessageIdImpl msg2 = new MessageIdImpl(5, 2, 0); + MessageIdImpl msg3 = new MessageIdImpl(5, 3, 0); + MessageIdImpl msg4 = new MessageIdImpl(5, 4, 0); + MessageIdImpl msg5 = new MessageIdImpl(5, 5, 0); + MessageIdImpl msg6 = new MessageIdImpl(5, 6, 0); + + assertFalse(tracker.isDuplicate(msg1)); + + tracker.addListAcknowledgment(Collections.singletonList(msg1), AckType.Individual, Collections.emptyMap()); + assertTrue(tracker.isDuplicate(msg1)); + + assertFalse(tracker.isDuplicate(msg2)); + + tracker.addListAcknowledgment(Collections.singletonList(msg5), AckType.Cumulative, Collections.emptyMap()); + assertTrue(tracker.isDuplicate(msg1)); + assertTrue(tracker.isDuplicate(msg2)); + assertTrue(tracker.isDuplicate(msg3)); + + assertTrue(tracker.isDuplicate(msg4)); + assertTrue(tracker.isDuplicate(msg5)); + assertFalse(tracker.isDuplicate(msg6)); + + // Flush while disconnected. the internal tracking will not change + tracker.flush(); + + assertTrue(tracker.isDuplicate(msg1)); + assertTrue(tracker.isDuplicate(msg2)); + assertTrue(tracker.isDuplicate(msg3)); + + assertTrue(tracker.isDuplicate(msg4)); + assertTrue(tracker.isDuplicate(msg5)); + assertFalse(tracker.isDuplicate(msg6)); + + tracker.addListAcknowledgment(Collections.singletonList(msg6), AckType.Individual, Collections.emptyMap()); + assertTrue(tracker.isDuplicate(msg6)); + + when(consumer.getClientCnx()).thenReturn(cnx); + + tracker.flush(); + + assertTrue(tracker.isDuplicate(msg1)); + assertTrue(tracker.isDuplicate(msg2)); + assertTrue(tracker.isDuplicate(msg3)); + + assertTrue(tracker.isDuplicate(msg4)); + assertTrue(tracker.isDuplicate(msg5)); + assertFalse(tracker.isDuplicate(msg6)); + + tracker.close(); + } } From 59c2b7fb287809f4c6b3ed62906fe89eb71c702a Mon Sep 17 00:00:00 2001 From: feynmanlin Date: Sat, 1 Aug 2020 20:56:58 +0800 Subject: [PATCH 3/4] change style --- .../client/api/ConsumerBatchReceiveTest.java | 80 +++++++++---------- .../apache/pulsar/client/api/Consumer.java | 8 +- .../pulsar/client/impl/ConsumerBase.java | 4 +- .../client/impl/MultiTopicsConsumerImpl.java | 7 +- .../pulsar/PulsarConsumerSourceTests.java | 10 +++ 5 files changed, 58 insertions(+), 51 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java index 06a87628f62d6..eb6519819ae6e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java @@ -104,73 +104,73 @@ public Object[][] batchReceivePolicyProvider() { }, // Number of message limitation exceed receiverQueue size { - BatchReceivePolicy.builder() - .maxNumMessages(70) - .build(), true, 50 + BatchReceivePolicy.builder() + .maxNumMessages(70) + .build(), true, 50 }, // Number of message limitation exceed receiverQueue size and timeout limitation { - BatchReceivePolicy.builder() - .maxNumMessages(50) - .timeout(10, TimeUnit.MILLISECONDS) - .build(), true, 30 + BatchReceivePolicy.builder() + .maxNumMessages(50) + .timeout(10, TimeUnit.MILLISECONDS) + .build(), true, 30 }, // Number of message limitation is negative and timeout limitation { - BatchReceivePolicy.builder() - .maxNumMessages(-10) - .timeout(10, TimeUnit.MILLISECONDS) - .build(), true, 10 + BatchReceivePolicy.builder() + .maxNumMessages(-10) + .timeout(10, TimeUnit.MILLISECONDS) + .build(), true, 10 }, // Size of message limitation is negative and timeout limitation { - BatchReceivePolicy.builder() - .maxNumBytes(-100) - .timeout(50, TimeUnit.MILLISECONDS) - .build(), true, 30 + BatchReceivePolicy.builder() + .maxNumBytes(-100) + .timeout(50, TimeUnit.MILLISECONDS) + .build(), true, 30 }, // Number of message limitation and size of message limitation are both negative and timeout limitation { - BatchReceivePolicy.builder() - .maxNumMessages(-10) - .maxNumBytes(-100) - .timeout(50, TimeUnit.MILLISECONDS) - .build(), true, 30 + BatchReceivePolicy.builder() + .maxNumMessages(-10) + .maxNumBytes(-100) + .timeout(50, TimeUnit.MILLISECONDS) + .build(), true, 30 }, // Number of message limitation exceed receiverQueue size { - BatchReceivePolicy.builder() - .maxNumMessages(70) - .build(), false, 50 + BatchReceivePolicy.builder() + .maxNumMessages(70) + .build(), false, 50 }, // Number of message limitation exceed receiverQueue size and timeout limitation { - BatchReceivePolicy.builder() - .maxNumMessages(50) - .timeout(50, TimeUnit.MILLISECONDS) - .build(), false, 30 + BatchReceivePolicy.builder() + .maxNumMessages(50) + .timeout(50, TimeUnit.MILLISECONDS) + .build(), false, 30 }, // Number of message limitation is negative and timeout limitation { - BatchReceivePolicy.builder() - .maxNumMessages(-10) - .timeout(50, TimeUnit.MILLISECONDS) - .build(), false, 30 + BatchReceivePolicy.builder() + .maxNumMessages(-10) + .timeout(50, TimeUnit.MILLISECONDS) + .build(), false, 30 }, // Size of message limitation is negative and timeout limitation { - BatchReceivePolicy.builder() - .maxNumBytes(-100) - .timeout(50, TimeUnit.MILLISECONDS) - .build(), false, 30 + BatchReceivePolicy.builder() + .maxNumBytes(-100) + .timeout(50, TimeUnit.MILLISECONDS) + .build(), false, 30 }, // Number of message limitation and size of message limitation are both negative and timeout limitation { - BatchReceivePolicy.builder() - .maxNumMessages(-10) - .maxNumBytes(-100) - .timeout(50, TimeUnit.MILLISECONDS) - .build(), false, 30 + BatchReceivePolicy.builder() + .maxNumMessages(-10) + .maxNumBytes(-100) + .timeout(50, TimeUnit.MILLISECONDS) + .build(), false, 30 } }; } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java index 35b5712206784..801896caee138 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java @@ -166,10 +166,8 @@ public interface Consumer extends Closeable { /** * Acknowledge the consumption of a list of message. - * - * @param messages messages - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed + * @param messageIdList + * @throws PulsarClientException */ void acknowledge(List messageIdList) throws PulsarClientException; @@ -377,7 +375,7 @@ public interface Consumer extends Closeable { /** * Asynchronously acknowledge the consumption of a list of message. - * @param messages + * @param messageIdList * @return */ CompletableFuture acknowledgeAsync(List messageIdList); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 6a909bc977fe7..7cc6587920dd5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -432,8 +432,8 @@ protected abstract CompletableFuture doAcknowledge(MessageId messageId, Ac TransactionImpl txn); protected abstract CompletableFuture doAcknowledge(List messageIdList, AckType ackType, - Map properties, - TransactionImpl txn); + Map properties, + TransactionImpl txn); protected abstract CompletableFuture doReconsumeLater(Message message, AckType ackType, Map properties, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index ebae323348f9f..f212144cf40a1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -459,10 +459,10 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack @Override protected CompletableFuture doAcknowledge(List messageIdList, AckType ackType, Map properties, TransactionImpl txn) { + List> resultFutures = new ArrayList<>(); if (ackType == AckType.Cumulative) { - List> completableFutures = new ArrayList<>(); - messageIdList.forEach(messageId -> completableFutures.add(doAcknowledge(messageId, ackType, properties, txn))); - return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])); + messageIdList.forEach(messageId -> resultFutures.add(doAcknowledge(messageId, ackType, properties, txn))); + return CompletableFuture.allOf(resultFutures.toArray(new CompletableFuture[0])); } else { if (getState() != State.Ready) { return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed")); @@ -476,7 +476,6 @@ protected CompletableFuture doAcknowledge(List messageIdList, A topicToMessageIdMap.putIfAbsent(topicMessageId.getTopicPartitionName(), new ArrayList<>()); topicToMessageIdMap.get(topicMessageId.getTopicPartitionName()).add(topicMessageId.getInnerMessageId()); } - List> resultFutures = new ArrayList<>(); topicToMessageIdMap.forEach((topicPartitionName, messageIds) -> { ConsumerImpl consumer = consumers.get(topicPartitionName); resultFutures.add(consumer.doAcknowledgeWithTxn(messageIds, ackType, properties, txn) diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java index 99962d8cf5b4b..9d8abbf96acc0 100644 --- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java +++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java @@ -446,6 +446,11 @@ public void acknowledge(Messages messages) throws PulsarClientException { } + @Override + public void acknowledge(List messageIdList) throws PulsarClientException { + + } + @Override public void negativeAcknowledge(Message message) { } @@ -485,6 +490,11 @@ public CompletableFuture acknowledgeAsync(Messages messages) { return null; } + @Override + public CompletableFuture acknowledgeAsync(List messageIdList) { + return null; + } + @Override public CompletableFuture acknowledgeCumulativeAsync(Message message) { return null; From 95721b3a21d302c7256f347df6b28d9029df9165 Mon Sep 17 00:00:00 2001 From: feynmanlin Date: Sat, 1 Aug 2020 22:52:34 +0800 Subject: [PATCH 4/4] fix unit test --- .../client/api/ConsumerAckListTest.java | 59 ++++++------------- .../pulsar/client/impl/ConsumerImpl.java | 4 +- 2 files changed, 19 insertions(+), 44 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java index c357e4f70893f..bd098139e9445 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; public class ConsumerAckListTest extends ProducerConsumerBase { @@ -45,69 +46,43 @@ protected void cleanup() throws Exception { super.internalCleanup(); } - @Test(timeOut = 20000) + @Test(timeOut = 30000) public void testBatchListAck() throws Exception { - nonPartitionAckBatchMessage(true); - nonPartitionAckBatchMessage(false); + ackListMessage(true,true); + ackListMessage(true,false); + ackListMessage(false,false); + ackListMessage(false,true); } - public void nonPartitionAckBatchMessage(boolean isBatch) throws Exception { + public void ackListMessage(boolean isBatch, boolean isPartitioned) throws Exception { final String topic = "persistent://my-property/my-ns/batch-ack-" + UUID.randomUUID(); final String subName = "testBatchAck-sub" + UUID.randomUUID(); - final int messageNum = 100; - ProducerBuilder producerBuilder = pulsarClient.newProducer(Schema.STRING).enableBatching(isBatch).topic(topic); - @Cleanup - Producer producer = producerBuilder.create(); - @Cleanup - Consumer consumer = pulsarClient.newConsumer(Schema.STRING) - .subscriptionType(SubscriptionType.Shared) - .topic(topic) - .ackTimeout(1001, TimeUnit.MILLISECONDS) - .negativeAckRedeliveryDelay(1001, TimeUnit.MILLISECONDS) - .subscriptionName(subName) - .subscribe(); - sendMessagesAsyncAndWait(producer, messageNum); - List messages = new ArrayList<>(); - for (int i = 0; i < messageNum; i++) { - messages.add(consumer.receive().getMessageId()); + final int messageNum = ThreadLocalRandom.current().nextInt(50, 100); + if (isPartitioned) { + admin.topics().createPartitionedTopic(topic, 3); } - consumer.acknowledge(messages); - consumer.redeliverUnacknowledgedMessages(); - Message msg = consumer.receive(3, TimeUnit.SECONDS); - Assert.assertNull(msg); - } - - @Test(timeOut = 20000) - public void testPartitionedTopicAckBatchMessage() throws Exception { - partitionedTopicAckBatchMessage(true); - partitionedTopicAckBatchMessage(false); - } - - public void partitionedTopicAckBatchMessage(boolean isBatch) throws Exception { - final String topic = "persistent://my-property/my-ns/batch-ack-" + UUID.randomUUID(); - final String subName = "testBatchAck-sub" + UUID.randomUUID(); - final int messageNum = 100; - admin.topics().createPartitionedTopic(topic, 3); - ProducerBuilder producerBuilder = pulsarClient.newProducer(Schema.STRING).enableBatching(isBatch).topic(topic); @Cleanup - Producer producer = producerBuilder.create(); + Producer producer = pulsarClient.newProducer(Schema.STRING) + .enableBatching(isBatch) + .batchingMaxPublishDelay(50, TimeUnit.MILLISECONDS) + .topic(topic).create(); @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.STRING) .subscriptionType(SubscriptionType.Shared) .topic(topic) - .ackTimeout(1001, TimeUnit.MILLISECONDS) .negativeAckRedeliveryDelay(1001, TimeUnit.MILLISECONDS) .subscriptionName(subName) .subscribe(); sendMessagesAsyncAndWait(producer, messageNum); - List messages = new ArrayList<>(); for (int i = 0; i < messageNum; i++) { messages.add(consumer.receive().getMessageId()); } consumer.acknowledge(messages); + //Wait ack send. + Thread.sleep(1000); consumer.redeliverUnacknowledgedMessages(); - Message msg = consumer.receive(3, TimeUnit.SECONDS); + Message msg = consumer.receive(2, TimeUnit.SECONDS); Assert.assertNull(msg); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index e30f620f2ee80..0576cfdbc56ae 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -570,8 +570,8 @@ protected CompletableFuture doAcknowledge(List messageIdList, A return FutureUtil.failedFuture(exception); } List nonBatchMessageIds = new ArrayList<>(); - MessageIdImpl messageIdImpl; for (MessageId messageId : messageIdList) { + MessageIdImpl messageIdImpl; if (messageId instanceof BatchMessageIdImpl && !markAckForBatchMessage((BatchMessageIdImpl) messageId, ackType, properties)) { BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; @@ -583,7 +583,7 @@ protected CompletableFuture doAcknowledge(List messageIdList, A } else { messageIdImpl = (MessageIdImpl) messageId; stats.incrementNumAcksSent(1); - nonBatchMessageIds.add((MessageIdImpl) messageId); + nonBatchMessageIds.add(messageIdImpl); } unAckedMessageTracker.remove(messageIdImpl); if (possibleSendToDeadLetterTopicMessages != null) {