diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java new file mode 100644 index 0000000000000..bd098139e9445 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import lombok.Cleanup; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +public class ConsumerAckListTest extends ProducerConsumerBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(timeOut = 30000) + public void testBatchListAck() throws Exception { + ackListMessage(true,true); + ackListMessage(true,false); + ackListMessage(false,false); + ackListMessage(false,true); + } + + public void ackListMessage(boolean isBatch, boolean isPartitioned) throws Exception { + final String topic = "persistent://my-property/my-ns/batch-ack-" + UUID.randomUUID(); + final String subName = "testBatchAck-sub" + UUID.randomUUID(); + final int messageNum = ThreadLocalRandom.current().nextInt(50, 100); + if (isPartitioned) { + admin.topics().createPartitionedTopic(topic, 3); + } + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .enableBatching(isBatch) + .batchingMaxPublishDelay(50, TimeUnit.MILLISECONDS) + .topic(topic).create(); + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .subscriptionType(SubscriptionType.Shared) + .topic(topic) + .negativeAckRedeliveryDelay(1001, TimeUnit.MILLISECONDS) + .subscriptionName(subName) + .subscribe(); + sendMessagesAsyncAndWait(producer, messageNum); + List messages = new ArrayList<>(); + for (int i = 0; i < messageNum; i++) { + messages.add(consumer.receive().getMessageId()); + } + consumer.acknowledge(messages); + //Wait ack send. + Thread.sleep(1000); + consumer.redeliverUnacknowledgedMessages(); + Message msg = consumer.receive(2, TimeUnit.SECONDS); + Assert.assertNull(msg); + } + + private void sendMessagesAsyncAndWait(Producer producer, int messages) throws Exception { + CountDownLatch latch = new CountDownLatch(messages); + for (int i = 0; i < messages; i++) { + String message = "my-message-" + i; + producer.sendAsync(message).thenAccept(messageId -> { + if (messageId != null) { + latch.countDown(); + } + }); + } + latch.await(); + } + +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java index 1ee5bfa23f43b..801896caee138 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.api; import java.io.Closeable; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -163,6 +164,13 @@ public interface Consumer extends Closeable { */ void acknowledge(Messages messages) throws PulsarClientException; + /** + * Acknowledge the consumption of a list of message. + * @param messageIdList + * @throws PulsarClientException + */ + void acknowledge(List messageIdList) throws PulsarClientException; + /** * Acknowledge the failure to process a single message. * @@ -365,6 +373,13 @@ public interface Consumer extends Closeable { */ CompletableFuture acknowledgeAsync(Messages messages); + /** + * Asynchronously acknowledge the consumption of a list of message. + * @param messageIdList + * @return + */ + CompletableFuture acknowledgeAsync(List messageIdList); + /** * Asynchronously reconsumeLater the consumption of a single message. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java index 1517f124bc599..a624e1536f607 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import java.util.List; import java.util.Map; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; @@ -31,6 +32,8 @@ public interface AcknowledgmentsGroupingTracker extends AutoCloseable { void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map properties); + void addListAcknowledgment(List messageIds, AckType ackType, Map properties); + void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType, Map properties); void flush(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 0b329e68054d9..57fc65bc77cdb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -22,6 +22,7 @@ import com.google.common.collect.Queues; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -241,6 +242,15 @@ public void acknowledge(MessageId messageId) throws PulsarClientException { } } + @Override + public void acknowledge(List messageIdList) throws PulsarClientException { + try { + acknowledgeAsync(messageIdList).get(); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); + } + } + @Override public void acknowledge(Messages messages) throws PulsarClientException { try { @@ -322,6 +332,11 @@ public CompletableFuture acknowledgeAsync(Messages messages) { } } + @Override + public CompletableFuture acknowledgeAsync(List messageIdList) { + return doAcknowledgeWithTxn(messageIdList, AckType.Individual, Collections.emptyMap(), null); + } + @Override public CompletableFuture reconsumeLaterAsync(Message message, long delayTime, TimeUnit unit) { if (!conf.isRetryEnable()) { @@ -408,6 +423,18 @@ public void negativeAcknowledge(Message message) { negativeAcknowledge(message.getMessageId()); } + protected CompletableFuture doAcknowledgeWithTxn(List messageIdList, AckType ackType, + Map properties, + TransactionImpl txn) { + CompletableFuture ackFuture = doAcknowledge(messageIdList, ackType, properties, txn); + if (txn != null) { + txn.registerAckedTopic(getTopic()); + return txn.registerAckOp(ackFuture); + } else { + return ackFuture; + } + } + protected CompletableFuture doAcknowledgeWithTxn(MessageId messageId, AckType ackType, Map properties, TransactionImpl txn) { @@ -428,6 +455,10 @@ protected abstract CompletableFuture doAcknowledge(MessageId messageId, Ac Map properties, TransactionImpl txn); + protected abstract CompletableFuture doAcknowledge(List messageIdList, AckType ackType, + Map properties, + TransactionImpl txn); + protected abstract CompletableFuture doReconsumeLater(Message message, AckType ackType, Map properties, long delayTime, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index f9546e99a395a..0576cfdbc56ae 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -556,6 +556,47 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack return sendAcknowledge(messageId, ackType, properties, txnImpl); } + @Override + protected CompletableFuture doAcknowledge(List messageIdList, AckType ackType, Map properties, TransactionImpl txn) { + if (AckType.Cumulative.equals(ackType)) { + List> completableFutures = new ArrayList<>(); + messageIdList.forEach(messageId -> completableFutures.add(doAcknowledge(messageId, ackType, properties, txn))); + return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])); + } + if (getState() != State.Ready && getState() != State.Connecting) { + stats.incrementNumAcksFailed(); + PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState()); + messageIdList.forEach(messageId -> onAcknowledge(messageId, exception)); + return FutureUtil.failedFuture(exception); + } + List nonBatchMessageIds = new ArrayList<>(); + for (MessageId messageId : messageIdList) { + MessageIdImpl messageIdImpl; + if (messageId instanceof BatchMessageIdImpl + && !markAckForBatchMessage((BatchMessageIdImpl) messageId, ackType, properties)) { + BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; + messageIdImpl = new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId() + , batchMessageId.getPartitionIndex()); + acknowledgmentsGroupingTracker.addBatchIndexAcknowledgment(batchMessageId, batchMessageId.getBatchIndex(), + batchMessageId.getBatchSize(), ackType, properties); + stats.incrementNumAcksSent(batchMessageId.getBatchSize()); + } else { + messageIdImpl = (MessageIdImpl) messageId; + stats.incrementNumAcksSent(1); + nonBatchMessageIds.add(messageIdImpl); + } + unAckedMessageTracker.remove(messageIdImpl); + if (possibleSendToDeadLetterTopicMessages != null) { + possibleSendToDeadLetterTopicMessages.remove(messageIdImpl); + } + onAcknowledge(messageId, null); + } + if (nonBatchMessageIds.size() > 0) { + acknowledgmentsGroupingTracker.addListAcknowledgment(nonBatchMessageIds, ackType, properties); + } + return CompletableFuture.completedFuture(null); + } + @SuppressWarnings("unchecked") @Override protected CompletableFuture doReconsumeLater(Message message, AckType ackType, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index ba750a0e33e1d..6a724cb3ab757 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -47,6 +47,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -456,6 +457,34 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack } } + @Override + protected CompletableFuture doAcknowledge(List messageIdList, AckType ackType, Map properties, TransactionImpl txn) { + List> resultFutures = new ArrayList<>(); + if (ackType == AckType.Cumulative) { + messageIdList.forEach(messageId -> resultFutures.add(doAcknowledge(messageId, ackType, properties, txn))); + return CompletableFuture.allOf(resultFutures.toArray(new CompletableFuture[0])); + } else { + if (getState() != State.Ready) { + return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed")); + } + Map> topicToMessageIdMap = new HashMap<>(); + for (MessageId messageId : messageIdList) { + if (!(messageId instanceof TopicMessageIdImpl)) { + return FutureUtil.failedFuture(new IllegalArgumentException("messageId is not instance of TopicMessageIdImpl")); + } + TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId; + topicToMessageIdMap.putIfAbsent(topicMessageId.getTopicPartitionName(), new ArrayList<>()); + topicToMessageIdMap.get(topicMessageId.getTopicPartitionName()).add(topicMessageId.getInnerMessageId()); + } + topicToMessageIdMap.forEach((topicPartitionName, messageIds) -> { + ConsumerImpl consumer = consumers.get(topicPartitionName); + resultFutures.add(consumer.doAcknowledgeWithTxn(messageIds, ackType, properties, txn) + .thenAccept((res) -> messageIdList.forEach(unAckedMessageTracker::remove))); + }); + return CompletableFuture.allOf(resultFutures.toArray(new CompletableFuture[0])); + } + } + @Override protected CompletableFuture doReconsumeLater(Message message, AckType ackType, Map properties, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java index 9061ac30bd61a..d70c7ce86c3e1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import java.util.List; import java.util.Map; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; @@ -45,6 +46,11 @@ public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map messageIds, AckType ackType, Map properties) { + // no-op + } + @Override public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int BatchSize, AckType ackType, Map properties) { // no-op diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index 690897998b83d..17e8c223ede33 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -99,6 +99,7 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl consumer, Consum * Since the ack are delayed, we need to do some best-effort duplicate check to discard messages that are being * resent after a disconnection and for which the user has already sent an acknowledgement. */ + @Override public boolean isDuplicate(MessageId messageId) { if (messageId.compareTo(lastCumulativeAck) <= 0) { // Already included in a cumulative ack @@ -108,6 +109,31 @@ public boolean isDuplicate(MessageId messageId) { } } + @Override + public void addListAcknowledgment(List messageIds, AckType ackType, Map properties) { + if (ackType == AckType.Cumulative) { + messageIds.forEach(messageId -> doCumulativeAck(messageId, null)); + return; + } + messageIds.forEach(messageId -> { + if (messageId instanceof BatchMessageIdImpl) { + BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; + pendingIndividualAcks.add(new MessageIdImpl(batchMessageId.getLedgerId(), + batchMessageId.getEntryId(), batchMessageId.getPartitionIndex())); + } else { + pendingIndividualAcks.add(messageId); + } + pendingIndividualBatchIndexAcks.remove(messageId); + if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) { + flush(); + } + }); + if (acknowledgementGroupTimeMicros == 0) { + flush(); + } + } + + @Override public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map properties) { if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) { // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an @@ -213,6 +239,7 @@ private boolean doImmediateBatchIndexAck(BatchMessageIdImpl msgId, int batchInde /** * Flush all the pending acks and send them to the broker */ + @Override public void flush() { ClientCnx cnx = consumer.getClientCnx(); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java index eac424e95163f..1eb5b4276742b 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java @@ -117,6 +117,64 @@ public void testAckTracker() throws Exception { tracker.close(); } + @Test + public void testBatchAckTracker() throws Exception { + ConsumerConfigurationData conf = new ConsumerConfigurationData<>(); + conf.setAcknowledgementsGroupTimeMicros(TimeUnit.SECONDS.toMicros(10)); + PersistentAcknowledgmentsGroupingTracker tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup); + + MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0); + MessageIdImpl msg2 = new MessageIdImpl(5, 2, 0); + MessageIdImpl msg3 = new MessageIdImpl(5, 3, 0); + MessageIdImpl msg4 = new MessageIdImpl(5, 4, 0); + MessageIdImpl msg5 = new MessageIdImpl(5, 5, 0); + MessageIdImpl msg6 = new MessageIdImpl(5, 6, 0); + + assertFalse(tracker.isDuplicate(msg1)); + + tracker.addListAcknowledgment(Collections.singletonList(msg1), AckType.Individual, Collections.emptyMap()); + assertTrue(tracker.isDuplicate(msg1)); + + assertFalse(tracker.isDuplicate(msg2)); + + tracker.addListAcknowledgment(Collections.singletonList(msg5), AckType.Cumulative, Collections.emptyMap()); + assertTrue(tracker.isDuplicate(msg1)); + assertTrue(tracker.isDuplicate(msg2)); + assertTrue(tracker.isDuplicate(msg3)); + + assertTrue(tracker.isDuplicate(msg4)); + assertTrue(tracker.isDuplicate(msg5)); + assertFalse(tracker.isDuplicate(msg6)); + + // Flush while disconnected. the internal tracking will not change + tracker.flush(); + + assertTrue(tracker.isDuplicate(msg1)); + assertTrue(tracker.isDuplicate(msg2)); + assertTrue(tracker.isDuplicate(msg3)); + + assertTrue(tracker.isDuplicate(msg4)); + assertTrue(tracker.isDuplicate(msg5)); + assertFalse(tracker.isDuplicate(msg6)); + + tracker.addListAcknowledgment(Collections.singletonList(msg6), AckType.Individual, Collections.emptyMap()); + assertTrue(tracker.isDuplicate(msg6)); + + when(consumer.getClientCnx()).thenReturn(cnx); + + tracker.flush(); + + assertTrue(tracker.isDuplicate(msg1)); + assertTrue(tracker.isDuplicate(msg2)); + assertTrue(tracker.isDuplicate(msg3)); + + assertTrue(tracker.isDuplicate(msg4)); + assertTrue(tracker.isDuplicate(msg5)); + assertFalse(tracker.isDuplicate(msg6)); + + tracker.close(); + } + @Test public void testImmediateAckingTracker() throws Exception { ConsumerConfigurationData conf = new ConsumerConfigurationData<>(); @@ -144,6 +202,35 @@ public void testImmediateAckingTracker() throws Exception { tracker.close(); } + @Test + public void testImmediateBatchAckingTracker() throws Exception { + ConsumerConfigurationData conf = new ConsumerConfigurationData<>(); + conf.setAcknowledgementsGroupTimeMicros(0); + PersistentAcknowledgmentsGroupingTracker tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup); + + MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0); + MessageIdImpl msg2 = new MessageIdImpl(5, 2, 0); + + assertFalse(tracker.isDuplicate(msg1)); + + when(consumer.getClientCnx()).thenReturn(null); + + tracker.addListAcknowledgment(Collections.singletonList(msg1), AckType.Individual, Collections.emptyMap()); + tracker.flush(); + //cnx is null can not flush + assertTrue(tracker.isDuplicate(msg1)); + + when(consumer.getClientCnx()).thenReturn(cnx); + + tracker.flush(); + assertFalse(tracker.isDuplicate(msg1)); + + tracker.addListAcknowledgment(Collections.singletonList(msg2), AckType.Individual, Collections.emptyMap()); + // Since we were connected, the ack went out immediately + assertFalse(tracker.isDuplicate(msg2)); + tracker.close(); + } + @Test public void testAckTrackerMultiAck() throws Exception { ConsumerConfigurationData conf = new ConsumerConfigurationData<>(); @@ -203,4 +290,64 @@ public void testAckTrackerMultiAck() throws Exception { tracker.close(); } + + @Test + public void testBatchAckTrackerMultiAck() throws Exception { + ConsumerConfigurationData conf = new ConsumerConfigurationData<>(); + conf.setAcknowledgementsGroupTimeMicros(TimeUnit.SECONDS.toMicros(10)); + PersistentAcknowledgmentsGroupingTracker tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup); + + when(cnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12_VALUE); + + MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0); + MessageIdImpl msg2 = new MessageIdImpl(5, 2, 0); + MessageIdImpl msg3 = new MessageIdImpl(5, 3, 0); + MessageIdImpl msg4 = new MessageIdImpl(5, 4, 0); + MessageIdImpl msg5 = new MessageIdImpl(5, 5, 0); + MessageIdImpl msg6 = new MessageIdImpl(5, 6, 0); + + assertFalse(tracker.isDuplicate(msg1)); + + tracker.addListAcknowledgment(Collections.singletonList(msg1), AckType.Individual, Collections.emptyMap()); + assertTrue(tracker.isDuplicate(msg1)); + + assertFalse(tracker.isDuplicate(msg2)); + + tracker.addListAcknowledgment(Collections.singletonList(msg5), AckType.Cumulative, Collections.emptyMap()); + assertTrue(tracker.isDuplicate(msg1)); + assertTrue(tracker.isDuplicate(msg2)); + assertTrue(tracker.isDuplicate(msg3)); + + assertTrue(tracker.isDuplicate(msg4)); + assertTrue(tracker.isDuplicate(msg5)); + assertFalse(tracker.isDuplicate(msg6)); + + // Flush while disconnected. the internal tracking will not change + tracker.flush(); + + assertTrue(tracker.isDuplicate(msg1)); + assertTrue(tracker.isDuplicate(msg2)); + assertTrue(tracker.isDuplicate(msg3)); + + assertTrue(tracker.isDuplicate(msg4)); + assertTrue(tracker.isDuplicate(msg5)); + assertFalse(tracker.isDuplicate(msg6)); + + tracker.addListAcknowledgment(Collections.singletonList(msg6), AckType.Individual, Collections.emptyMap()); + assertTrue(tracker.isDuplicate(msg6)); + + when(consumer.getClientCnx()).thenReturn(cnx); + + tracker.flush(); + + assertTrue(tracker.isDuplicate(msg1)); + assertTrue(tracker.isDuplicate(msg2)); + assertTrue(tracker.isDuplicate(msg3)); + + assertTrue(tracker.isDuplicate(msg4)); + assertTrue(tracker.isDuplicate(msg5)); + assertFalse(tracker.isDuplicate(msg6)); + + tracker.close(); + } } diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java index 99962d8cf5b4b..9d8abbf96acc0 100644 --- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java +++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java @@ -446,6 +446,11 @@ public void acknowledge(Messages messages) throws PulsarClientException { } + @Override + public void acknowledge(List messageIdList) throws PulsarClientException { + + } + @Override public void negativeAcknowledge(Message message) { } @@ -485,6 +490,11 @@ public CompletableFuture acknowledgeAsync(Messages messages) { return null; } + @Override + public CompletableFuture acknowledgeAsync(List messageIdList) { + return null; + } + @Override public CompletableFuture acknowledgeCumulativeAsync(Message message) { return null;