From 6a953e2539f810483f7b61bee5aad5cbc93bc323 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Tue, 25 Oct 2022 15:09:28 +0800 Subject: [PATCH 1/3] [feat] Support acknowledging a list of messages --- include/pulsar/Consumer.h | 14 ++++ include/pulsar/MessageId.h | 3 + lib/AckGroupingTracker.h | 6 ++ lib/AckGroupingTrackerEnabled.cc | 10 +++ lib/AckGroupingTrackerEnabled.h | 1 + lib/Consumer.cc | 20 +++++ lib/ConsumerImpl.cc | 23 ++++-- lib/ConsumerImpl.h | 4 +- lib/ConsumerImplBase.h | 1 + lib/MultiTopicsConsumerImpl.cc | 35 +++++++++ lib/MultiTopicsConsumerImpl.h | 2 + lib/UnAckedMessageTrackerDisabled.h | 1 + lib/UnAckedMessageTrackerEnabled.cc | 23 ++++-- lib/UnAckedMessageTrackerEnabled.h | 3 +- lib/UnAckedMessageTrackerInterface.h | 1 + lib/stats/ConsumerStatsBase.h | 2 +- lib/stats/ConsumerStatsDisabled.h | 2 +- lib/stats/ConsumerStatsImpl.cc | 6 +- lib/stats/ConsumerStatsImpl.h | 2 +- tests/BasicEndToEndTest.cc | 113 +++++++++++++++++++++++++++ tests/PulsarFriend.h | 16 ++++ 21 files changed, 266 insertions(+), 22 deletions(-) diff --git a/include/pulsar/Consumer.h b/include/pulsar/Consumer.h index d37c7f91..d35defd8 100644 --- a/include/pulsar/Consumer.h +++ b/include/pulsar/Consumer.h @@ -164,6 +164,12 @@ class PULSAR_PUBLIC Consumer { */ Result acknowledge(const MessageId& messageId); + /** + * Acknowledge the consumption of a list of message. + * @param messageIdList + */ + Result acknowledge(const MessageIdList& messageIdList); + /** * Asynchronously acknowledge the reception of a single message. * @@ -186,6 +192,14 @@ class PULSAR_PUBLIC Consumer { */ void acknowledgeAsync(const MessageId& messageId, ResultCallback callback); + /** + * Asynchronously acknowledge the consumption of a list of message. + * @param messageIdList + * @param callback the callback that is triggered when the message has been acknowledged or not + * @return + */ + void acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback); + /** * Acknowledge the reception of all the messages in the stream up to (and including) * the provided message. diff --git a/include/pulsar/MessageId.h b/include/pulsar/MessageId.h index fd17df6a..7c9626c1 100644 --- a/include/pulsar/MessageId.h +++ b/include/pulsar/MessageId.h @@ -25,6 +25,7 @@ #include #include #include +#include namespace pulsar { @@ -107,6 +108,8 @@ class PULSAR_PUBLIC MessageId { typedef std::shared_ptr MessageIdImplPtr; MessageIdImplPtr impl_; }; + +typedef std::vector MessageIdList; } // namespace pulsar #endif // MESSAGE_ID_H diff --git a/lib/AckGroupingTracker.h b/lib/AckGroupingTracker.h index d0e800db..0a986a01 100644 --- a/lib/AckGroupingTracker.h +++ b/lib/AckGroupingTracker.h @@ -62,6 +62,12 @@ class AckGroupingTracker : public std::enable_shared_from_this lock(this->rmutexPendingIndAcks_); + for (const auto& msgId : msgIds) { + this->pendingIndividualAcks_.emplace(msgId); + } + if (this->ackGroupingMaxSize_ > 0 && this->pendingIndividualAcks_.size() >= this->ackGroupingMaxSize_) { + this->flush(); + } +} + void AckGroupingTrackerEnabled::addAcknowledgeCumulative(const MessageId& msgId) { std::lock_guard lock(this->mutexCumulativeAckMsgId_); if (msgId > this->nextCumulativeAckMsgId_) { diff --git a/lib/AckGroupingTrackerEnabled.h b/lib/AckGroupingTrackerEnabled.h index 89b13f8e..f3cd93af 100644 --- a/lib/AckGroupingTrackerEnabled.h +++ b/lib/AckGroupingTrackerEnabled.h @@ -61,6 +61,7 @@ class AckGroupingTrackerEnabled : public AckGroupingTracker { void start() override; bool isDuplicate(const MessageId& msgId) override; void addAcknowledge(const MessageId& msgId) override; + void addAcknowledgeList(const MessageIdList& msgIds) override; void addAcknowledgeCumulative(const MessageId& msgId) override; void close() override; void flush() override; diff --git a/lib/Consumer.cc b/lib/Consumer.cc index 8e5c7ddf..07afcea7 100644 --- a/lib/Consumer.cc +++ b/lib/Consumer.cc @@ -115,6 +115,17 @@ Result Consumer::acknowledge(const MessageId& messageId) { return result; } +Result Consumer::acknowledge(const MessageIdList& messageIdList) { + if (!impl_) { + return ResultConsumerNotInitialized; + } + Promise promise; + impl_->acknowledgeAsync(messageIdList, WaitForCallback(promise)); + Result result; + promise.getFuture().get(result); + return result; +} + void Consumer::acknowledgeAsync(const Message& message, ResultCallback callback) { if (!impl_) { callback(ResultConsumerNotInitialized); @@ -133,6 +144,15 @@ void Consumer::acknowledgeAsync(const MessageId& messageId, ResultCallback callb impl_->acknowledgeAsync(messageId, callback); } +void Consumer::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) { + if (!impl_) { + callback(ResultConsumerNotInitialized); + return; + } + + impl_->acknowledgeAsync(messageIdList, callback); +} + Result Consumer::acknowledgeCumulative(const Message& message) { return acknowledgeCumulative(message.getMessageId()); } diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 19d5055c..bff5258f 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -943,16 +943,17 @@ inline CommandSubscribe_InitialPosition ConsumerImpl::getInitialPosition() { BOOST_THROW_EXCEPTION(std::logic_error("Invalid InitialPosition enumeration value")); } -void ConsumerImpl::statsCallback(Result res, ResultCallback callback, CommandAck_AckType ackType) { - consumerStatsBasePtr_->messageAcknowledged(res, ackType); +void ConsumerImpl::statsAckCallback(Result res, ResultCallback callback, CommandAck_AckType ackType, + uint32_t numAcks) { + consumerStatsBasePtr_->messageAcknowledged(res, ackType, numAcks); if (callback) { callback(res); } } void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) { - ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, get_shared_this_ptr(), std::placeholders::_1, - callback, CommandAck_AckType_Individual); + ResultCallback cb = std::bind(&ConsumerImpl::statsAckCallback, get_shared_this_ptr(), + std::placeholders::_1, callback, CommandAck_AckType_Individual, 1); if (msgId.batchIndex() != -1 && !batchAcknowledgementTracker_.isBatchReady(msgId, CommandAck_AckType_Individual)) { cb(ResultOk); @@ -961,9 +962,19 @@ void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callb doAcknowledgeIndividual(msgId, cb); } +void ConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) { + ResultCallback cb = + std::bind(&ConsumerImpl::statsAckCallback, get_shared_this_ptr(), std::placeholders::_1, callback, + proto::CommandAck_AckType_Individual, messageIdList.size()); + // Currently not supported batch message id individual index ack. + this->ackGroupingTrackerPtr_->addAcknowledgeList(messageIdList); + this->unAckedMessageTrackerPtr_->remove(messageIdList); + cb(ResultOk); +} + void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) { - ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, get_shared_this_ptr(), std::placeholders::_1, - callback, CommandAck_AckType_Cumulative); + ResultCallback cb = std::bind(&ConsumerImpl::statsAckCallback, get_shared_this_ptr(), + std::placeholders::_1, callback, CommandAck_AckType_Cumulative, 1); if (!isCumulativeAcknowledgementAllowed(config_.getConsumerType())) { cb(ResultCumulativeAcknowledgementNotAllowedError); return; diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index d65676ba..693180fd 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -109,6 +109,7 @@ class ConsumerImpl : public ConsumerImplBase { void receiveAsync(ReceiveCallback& callback) override; void unsubscribeAsync(ResultCallback callback) override; void acknowledgeAsync(const MessageId& msgId, ResultCallback callback) override; + void acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) override; void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) override; void closeAsync(ResultCallback callback) override; void start() override; @@ -181,8 +182,9 @@ class ConsumerImpl : public ConsumerImplBase { // TODO - Convert these functions to lambda when we move to C++11 Result receiveHelper(Message& msg); Result receiveHelper(Message& msg, int timeout); - void statsCallback(Result, ResultCallback, CommandAck_AckType); void executeNotifyCallback(Message& msg); + void statsAckCallback(Result res, ResultCallback callback, CommandAck_AckType ackType, + uint32_t numAcks = 1); void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback); void failPendingReceiveCallback(); void setNegativeAcknowledgeEnabledForTesting(bool enabled) override; diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h index 37b66462..74a8810e 100644 --- a/lib/ConsumerImplBase.h +++ b/lib/ConsumerImplBase.h @@ -55,6 +55,7 @@ class ConsumerImplBase : public HandlerBase, public std::enable_shared_from_this void batchReceiveAsync(BatchReceiveCallback callback); virtual void unsubscribeAsync(ResultCallback callback) = 0; virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback callback) = 0; + virtual void acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) = 0; virtual void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) = 0; virtual void closeAsync(ResultCallback callback) = 0; virtual void start() = 0; diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index a170faf6..57fe54b0 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -647,6 +647,41 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal } } +void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) { + if (state_ != Ready) { + callback(ResultAlreadyClosed); + return; + } + + std::unordered_map topicToMessageId; + for (const MessageId& messageId : messageIdList) { + auto topicName = messageId.getTopicName(); + topicToMessageId[topicName].emplace_back(messageId); + } + + auto needCallBack = std::make_shared>(topicToMessageId.size()); + Result res = ResultOk; + auto cb = [callback, needCallBack, &res](Result result) { + if (result != ResultOk) { + res = result; + } + needCallBack->fetch_sub(1); + if (needCallBack->load() == 0) { + callback(res); + } + }; + for (const auto& kv : topicToMessageId) { + auto optConsumer = consumers_.find(kv.first); + if (optConsumer.is_present()) { + unAckedMessageTrackerPtr_->remove(kv.second); + optConsumer.value()->acknowledgeAsync(kv.second, cb); + } else { + LOG_ERROR("Message of topic: " << kv.first << " not in unAckedMessageTracker"); + callback(ResultUnknownError); + } + } +} + void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) { callback(ResultOperationNotSupported); } diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h index 2b4f83df..ac25c840 100644 --- a/lib/MultiTopicsConsumerImpl.h +++ b/lib/MultiTopicsConsumerImpl.h @@ -68,6 +68,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { void receiveAsync(ReceiveCallback& callback) override; void unsubscribeAsync(ResultCallback callback) override; void acknowledgeAsync(const MessageId& msgId, ResultCallback callback) override; + void acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) override; void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) override; void closeAsync(ResultCallback callback) override; void start() override; @@ -152,6 +153,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { bool hasEnoughMessagesForBatchReceive() const override; void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) override; void beforeConnectionChange(ClientConnection& cnx) override; + friend class PulsarFriend; private: std::shared_ptr get_shared_this_ptr(); diff --git a/lib/UnAckedMessageTrackerDisabled.h b/lib/UnAckedMessageTrackerDisabled.h index bd12b3ef..d39e969c 100644 --- a/lib/UnAckedMessageTrackerDisabled.h +++ b/lib/UnAckedMessageTrackerDisabled.h @@ -25,6 +25,7 @@ class UnAckedMessageTrackerDisabled : public UnAckedMessageTrackerInterface { public: bool add(const MessageId& m) { return false; } bool remove(const MessageId& m) { return false; } + void remove(const MessageIdList& msgIds) {} void removeMessagesTill(const MessageId& msgId) {} void removeTopicMessage(const std::string& topic) {} diff --git a/lib/UnAckedMessageTrackerEnabled.cc b/lib/UnAckedMessageTrackerEnabled.cc index 1bc878b8..0579777e 100644 --- a/lib/UnAckedMessageTrackerEnabled.cc +++ b/lib/UnAckedMessageTrackerEnabled.cc @@ -44,7 +44,7 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() { } void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() { - std::unique_lock acquire(lock_); + std::unique_lock acquire(lock_); LOG_DEBUG("UnAckedMessageTrackerEnabled::timeoutHandlerHelper invoked for consumerPtr_ " << consumerReference_.getName().c_str()); @@ -95,7 +95,7 @@ UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, long } bool UnAckedMessageTrackerEnabled::add(const MessageId& msgId) { - std::lock_guard acquire(lock_); + std::lock_guard acquire(lock_); MessageId id(msgId.partition(), msgId.ledgerId(), msgId.entryId(), -1); if (messageIdPartitionMap.count(id) == 0) { std::set& partition = timePartitions.back(); @@ -107,12 +107,12 @@ bool UnAckedMessageTrackerEnabled::add(const MessageId& msgId) { } bool UnAckedMessageTrackerEnabled::isEmpty() { - std::lock_guard acquire(lock_); + std::lock_guard acquire(lock_); return messageIdPartitionMap.empty(); } bool UnAckedMessageTrackerEnabled::remove(const MessageId& msgId) { - std::lock_guard acquire(lock_); + std::lock_guard acquire(lock_); MessageId id(msgId.partition(), msgId.ledgerId(), msgId.entryId(), -1); bool removed = false; @@ -124,13 +124,20 @@ bool UnAckedMessageTrackerEnabled::remove(const MessageId& msgId) { return removed; } +void UnAckedMessageTrackerEnabled::remove(const MessageIdList& msgIds) { + std::lock_guard acquire(lock_); + for (const auto& msgId : msgIds) { + remove(msgId); + } +} + long UnAckedMessageTrackerEnabled::size() { - std::lock_guard acquire(lock_); + std::lock_guard acquire(lock_); return messageIdPartitionMap.size(); } void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) { - std::lock_guard acquire(lock_); + std::lock_guard acquire(lock_); for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end();) { MessageId msgIdInMap = it->first; if (msgIdInMap <= msgId) { @@ -144,7 +151,7 @@ void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) { // this is only for MultiTopicsConsumerImpl, when un-subscribe a single topic, should remove all it's message. void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& topic) { - std::lock_guard acquire(lock_); + std::lock_guard acquire(lock_); for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end();) { MessageId msgIdInMap = it->first; if (msgIdInMap.getTopicName().compare(topic) == 0) { @@ -157,7 +164,7 @@ void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& topic) } void UnAckedMessageTrackerEnabled::clear() { - std::lock_guard acquire(lock_); + std::lock_guard acquire(lock_); messageIdPartitionMap.clear(); for (auto it = timePartitions.begin(); it != timePartitions.end(); it++) { it->clear(); diff --git a/lib/UnAckedMessageTrackerEnabled.h b/lib/UnAckedMessageTrackerEnabled.h index 13dee211..0bdcc852 100644 --- a/lib/UnAckedMessageTrackerEnabled.h +++ b/lib/UnAckedMessageTrackerEnabled.h @@ -41,6 +41,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface { UnAckedMessageTrackerEnabled(long timeoutMs, long tickDuration, ClientImplPtr, ConsumerImplBase&); bool add(const MessageId& msgId); bool remove(const MessageId& msgId); + void remove(const MessageIdList& msgIds); void removeMessagesTill(const MessageId& msgId); void removeTopicMessage(const std::string& topic); void timeoutHandler(); @@ -53,7 +54,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface { long size(); std::map&> messageIdPartitionMap; std::deque> timePartitions; - std::mutex lock_; + std::recursive_mutex lock_; ConsumerImplBase& consumerReference_; ClientImplPtr client_; DeadlineTimerPtr timer_; // DO NOT place this before client_! diff --git a/lib/UnAckedMessageTrackerInterface.h b/lib/UnAckedMessageTrackerInterface.h index 3bcaaa5f..d1fe7893 100644 --- a/lib/UnAckedMessageTrackerInterface.h +++ b/lib/UnAckedMessageTrackerInterface.h @@ -30,6 +30,7 @@ class UnAckedMessageTrackerInterface { UnAckedMessageTrackerInterface() {} virtual bool add(const MessageId& m) = 0; virtual bool remove(const MessageId& m) = 0; + virtual void remove(const MessageIdList& msgIds) = 0; virtual void removeMessagesTill(const MessageId& msgId) = 0; virtual void clear() = 0; // this is only for MultiTopicsConsumerImpl, when un-subscribe a single topic, should remove all it's diff --git a/lib/stats/ConsumerStatsBase.h b/lib/stats/ConsumerStatsBase.h index 13ca1549..cc4a1595 100644 --- a/lib/stats/ConsumerStatsBase.h +++ b/lib/stats/ConsumerStatsBase.h @@ -28,7 +28,7 @@ namespace pulsar { class ConsumerStatsBase { public: virtual void receivedMessage(Message&, Result) = 0; - virtual void messageAcknowledged(Result, CommandAck_AckType) = 0; + virtual void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums = 1) = 0; virtual ~ConsumerStatsBase() {} }; diff --git a/lib/stats/ConsumerStatsDisabled.h b/lib/stats/ConsumerStatsDisabled.h index f32d0262..01e87108 100644 --- a/lib/stats/ConsumerStatsDisabled.h +++ b/lib/stats/ConsumerStatsDisabled.h @@ -27,7 +27,7 @@ namespace pulsar { class ConsumerStatsDisabled : public ConsumerStatsBase { public: virtual void receivedMessage(Message&, Result) {} - virtual void messageAcknowledged(Result, CommandAck_AckType) {} + virtual void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums) {} }; } /* namespace pulsar */ diff --git a/lib/stats/ConsumerStatsImpl.cc b/lib/stats/ConsumerStatsImpl.cc index 833dcd19..4f3b0fda 100644 --- a/lib/stats/ConsumerStatsImpl.cc +++ b/lib/stats/ConsumerStatsImpl.cc @@ -85,10 +85,10 @@ void ConsumerStatsImpl::receivedMessage(Message& msg, Result res) { totalReceivedMsgMap_[res] += 1; } -void ConsumerStatsImpl::messageAcknowledged(Result res, CommandAck_AckType ackType) { +void ConsumerStatsImpl::messageAcknowledged(Result res, CommandAck_AckType ackType, uint32_t ackNums) { Lock lock(mutex_); - ackedMsgMap_[std::make_pair(res, ackType)] += 1; - totalAckedMsgMap_[std::make_pair(res, ackType)] += 1; + ackedMsgMap_[std::make_pair(res, ackType)] += ackNums; + totalAckedMsgMap_[std::make_pair(res, ackType)] += ackNums; } std::ostream& operator<<(std::ostream& os, diff --git a/lib/stats/ConsumerStatsImpl.h b/lib/stats/ConsumerStatsImpl.h index a301e12b..1309ff6b 100644 --- a/lib/stats/ConsumerStatsImpl.h +++ b/lib/stats/ConsumerStatsImpl.h @@ -59,7 +59,7 @@ class ConsumerStatsImpl : public ConsumerStatsBase { ConsumerStatsImpl(const ConsumerStatsImpl& stats); void flushAndReset(const boost::system::error_code&); virtual void receivedMessage(Message&, Result); - virtual void messageAcknowledged(Result, CommandAck_AckType); + virtual void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums = 1); virtual ~ConsumerStatsImpl(); const inline std::map, unsigned long>& getAckedMsgMap() const { diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc index 61cf28a9..84ee9d57 100644 --- a/tests/BasicEndToEndTest.cc +++ b/tests/BasicEndToEndTest.cc @@ -4276,3 +4276,116 @@ void testBatchReceiveClose(bool multiConsumer) { TEST(BasicEndToEndTest, testBatchReceiveClose) { testBatchReceiveClose(false); } TEST(BasicEndToEndTest, testBatchReceiveCloseWithMultiConsumer) { testBatchReceiveClose(true); } + +TEST(BasicEndToEndTest, testAckMsgList) { + Client client(lookupUrl); + auto clientImplPtr = PulsarFriend::getClientImplPtr(client); + + constexpr auto numMsg = 100; + std::string uniqueChunk = unique_str(); + std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk; + const std::string subName = "sub-ack-list"; + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + + ConsumerConfiguration consumerConfig; + consumerConfig.setAckGroupingMaxSize(numMsg); + consumerConfig.setUnAckedMessagesTimeoutMs(10000); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer)); + + // Sending and receiving messages. + for (auto count = 0; count < numMsg; ++count) { + Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + } + + std::vector recvMsgId; + for (auto count = 0; count < numMsg; ++count) { + Message msg; + ASSERT_EQ(ResultOk, consumer.receive(msg, 1000)); + recvMsgId.emplace_back(msg.getMessageId()); + } + ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId)); + + // try redeliver unack messages. + consumer.redeliverUnacknowledgedMessages(); + + auto consumerStats = PulsarFriend::getConsumerStatsPtr(consumer); + auto ackMap = consumerStats->getAckedMsgMap(); + unsigned long totalAck = ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)]; + ASSERT_EQ(totalAck, numMsg); + + Message msg; + auto ret = consumer.receive(msg, 1000); + ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId(); + + producer.close(); + consumer.close(); + client.close(); +} + +TEST(BasicEndToEndTest, testAckMsgListWithMultiConsumer) { + Client client(lookupUrl); + auto clientImplPtr = PulsarFriend::getClientImplPtr(client); + + std::string uniqueChunk = unique_str(); + std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk; + + // call admin api to make it partitioned + std::string url = + adminUrl + "admin/v2/persistent/public/default/test-ack-msgs" + uniqueChunk + "/partitions"; + int res = makePutRequest(url, "5"); + LOG_INFO("res = " << res); + ASSERT_FALSE(res != 204 && res != 409); + + constexpr auto numMsg = 100; + const std::string subName = "sub-ack-list"; + + Producer producer; + ProducerConfiguration producerConfig; + producerConfig.setPartitionsRoutingMode(pulsar::ProducerConfiguration::RoundRobinDistribution); + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer)); + + ConsumerConfiguration consumerConfig; + // set ack grouping max size is 10 + consumerConfig.setAckGroupingMaxSize(10); + consumerConfig.setUnAckedMessagesTimeoutMs(10000); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer)); + + // Sending and receiving messages. + for (auto count = 0; count < numMsg; ++count) { + Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + } + + std::vector recvMsgId; + for (auto count = 0; count < numMsg; ++count) { + Message msg; + ASSERT_EQ(ResultOk, consumer.receive(msg, 1000)); + recvMsgId.emplace_back(msg.getMessageId()); + } + ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId)); + + // try redeliver unack messages. + consumer.redeliverUnacknowledgedMessages(); + + // assert stats + unsigned long totalAck = 0; + auto consumerStatsList = PulsarFriend::getConsumerStatsPtrList(consumer); + for (auto consumerStats : consumerStatsList) { + auto ackMap = consumerStats->getAckedMsgMap(); + totalAck += ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)]; + } + ASSERT_EQ(totalAck, numMsg); + + Message msg; + auto ret = consumer.receive(msg, 1000); + ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId(); + + producer.close(); + consumer.close(); + client.close(); +} diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h index 18f2bb66..938b2844 100644 --- a/tests/PulsarFriend.h +++ b/tests/PulsarFriend.h @@ -61,6 +61,22 @@ class PulsarFriend { return std::static_pointer_cast(consumerImpl->consumerStatsBasePtr_); } + static std::vector getConsumerStatsPtrList(Consumer consumer) { + if (MultiTopicsConsumerImpl* multiTopicsConsumer = + dynamic_cast(consumer.impl_.get())) { + std::vector consumerStatsList; + for (const auto& kv : multiTopicsConsumer->consumers_.toPairVector()) { + auto consumerStats = + std::static_pointer_cast(kv.second->consumerStatsBasePtr_); + consumerStatsList.emplace_back(consumerStats); + } + return consumerStatsList; + + } else { + throw std::runtime_error("Consumer must is MultiTopicConsumer."); + } + } + static ProducerImpl& getProducerImpl(Producer producer) { ProducerImpl* producerImpl = static_cast(producer.impl_.get()); return *producerImpl; From 663858cf2611ba4079fca0452a1734b5ecadceb0 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Tue, 1 Nov 2022 21:01:06 +0800 Subject: [PATCH 2/3] Fix code reviews. --- lib/MultiTopicsConsumerImpl.cc | 15 ++++++++------- tests/BasicEndToEndTest.cc | 2 ++ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index 57fe54b0..6384698d 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -660,14 +660,15 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdLis } auto needCallBack = std::make_shared>(topicToMessageId.size()); - Result res = ResultOk; - auto cb = [callback, needCallBack, &res](Result result) { + auto cb = [callback, needCallBack](Result result) { if (result != ResultOk) { - res = result; + LOG_ERROR("Filed when acknowledge list: " << result); + callback(result); + // set needCallBack is -1 to avoid repeated callback. + needCallBack->store(-1); } - needCallBack->fetch_sub(1); - if (needCallBack->load() == 0) { - callback(res); + if (--(*needCallBack) == 0) { + callback(result); } }; for (const auto& kv : topicToMessageId) { @@ -676,7 +677,7 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdLis unAckedMessageTrackerPtr_->remove(kv.second); optConsumer.value()->acknowledgeAsync(kv.second, cb); } else { - LOG_ERROR("Message of topic: " << kv.first << " not in unAckedMessageTracker"); + LOG_ERROR("Message of topic: " << kv.first << " not in consumers"); callback(ResultUnknownError); } } diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc index 84ee9d57..aee679e4 100644 --- a/tests/BasicEndToEndTest.cc +++ b/tests/BasicEndToEndTest.cc @@ -4345,6 +4345,8 @@ TEST(BasicEndToEndTest, testAckMsgListWithMultiConsumer) { Producer producer; ProducerConfiguration producerConfig; + // Turn off batch to ensure even distribution + producerConfig.setBatchingEnabled(false); producerConfig.setPartitionsRoutingMode(pulsar::ProducerConfiguration::RoundRobinDistribution); ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer)); From ff4dcb27430387dfc2a3656886af5e31e69bc2ec Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Tue, 1 Nov 2022 21:40:33 +0800 Subject: [PATCH 3/3] Set needCallBack is -1 before callback. --- lib/MultiTopicsConsumerImpl.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index 6384698d..d14c3cae 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -663,9 +663,10 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdLis auto cb = [callback, needCallBack](Result result) { if (result != ResultOk) { LOG_ERROR("Filed when acknowledge list: " << result); - callback(result); // set needCallBack is -1 to avoid repeated callback. needCallBack->store(-1); + callback(result); + return; } if (--(*needCallBack) == 0) { callback(result);