diff --git a/lib/AckGroupingTracker.cc b/lib/AckGroupingTracker.cc index 4abfcb19..0379bc6d 100644 --- a/lib/AckGroupingTracker.cc +++ b/lib/AckGroupingTracker.cc @@ -46,6 +46,19 @@ bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uin return true; } +static std::ostream& operator<<(std::ostream& os, const std::set& msgIds) { + bool first = true; + for (auto&& msgId : msgIds) { + if (first) { + first = false; + } else { + os << ", "; + } + os << "[" << msgId << "]"; + } + return os; +} + bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId, const std::set& msgIds) { auto cnx = connWeakPtr.lock(); @@ -54,8 +67,15 @@ bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uin return false; } - for (const auto& msgId : msgIds) { - sendAck(cnx, consumerId, msgId, CommandAck_AckType_Individual); + if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) { + auto cmd = Commands::newMultiMessageAck(consumerId, msgIds); + cnx->sendCommand(cmd); + LOG_DEBUG("ACK request is sent for " << msgIds.size() << " messages: " << msgIds); + } else { + // Broker does not support multi-message ACK, use multiple individual ACKs instead. + for (const auto& msgId : msgIds) { + sendAck(cnx, consumerId, msgId, CommandAck_AckType_Individual); + } } return true; } diff --git a/lib/AckGroupingTrackerDisabled.cc b/lib/AckGroupingTrackerDisabled.cc index ca53792e..2fe2b2ef 100644 --- a/lib/AckGroupingTrackerDisabled.cc +++ b/lib/AckGroupingTrackerDisabled.cc @@ -36,6 +36,14 @@ void AckGroupingTrackerDisabled::addAcknowledge(const MessageId& msgId) { this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, CommandAck_AckType_Individual); } +void AckGroupingTrackerDisabled::addAcknowledgeList(const MessageIdList& msgIds) { + std::set msgIdSet; + for (auto&& msgId : msgIds) { + msgIdSet.emplace(msgId); + } + this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgIdSet); +} + void AckGroupingTrackerDisabled::addAcknowledgeCumulative(const MessageId& msgId) { this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, CommandAck_AckType_Cumulative); } diff --git a/lib/AckGroupingTrackerDisabled.h b/lib/AckGroupingTrackerDisabled.h index ef6bfbed..7b416862 100644 --- a/lib/AckGroupingTrackerDisabled.h +++ b/lib/AckGroupingTrackerDisabled.h @@ -44,6 +44,7 @@ class AckGroupingTrackerDisabled : public AckGroupingTracker { AckGroupingTrackerDisabled(HandlerBase& handler, uint64_t consumerId); void addAcknowledge(const MessageId& msgId) override; + void addAcknowledgeList(const MessageIdList& msgIds) override; void addAcknowledgeCumulative(const MessageId& msgId) override; private: diff --git a/lib/AckGroupingTrackerEnabled.cc b/lib/AckGroupingTrackerEnabled.cc index 4c39c6fa..6eba2d38 100644 --- a/lib/AckGroupingTrackerEnabled.cc +++ b/lib/AckGroupingTrackerEnabled.cc @@ -132,13 +132,7 @@ void AckGroupingTrackerEnabled::flush() { // Send ACK for individual ACK requests. std::lock_guard lock(this->rmutexPendingIndAcks_); if (!this->pendingIndividualAcks_.empty()) { - if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) { - auto cmd = Commands::newMultiMessageAck(this->consumerId_, this->pendingIndividualAcks_); - cnx->sendCommand(cmd); - } else { - // Broker does not support multi-message ACK, use multiple individual ACK instead. - this->doImmediateAck(cnx, this->consumerId_, this->pendingIndividualAcks_); - } + this->doImmediateAck(cnx, consumerId_, this->pendingIndividualAcks_); this->pendingIndividualAcks_.clear(); } } diff --git a/tests/AcknowledgeTest.cc b/tests/AcknowledgeTest.cc new file mode 100644 index 00000000..e0746be6 --- /dev/null +++ b/tests/AcknowledgeTest.cc @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +#include "HttpHelper.h" +#include "PulsarFriend.h" +#include "lib/LogUtils.h" + +DECLARE_LOG_OBJECT() + +using namespace pulsar; + +static std::string lookupUrl = "pulsar://localhost:6650"; +static std::string adminUrl = "http://localhost:8080/"; + +extern std::string unique_str(); + +class AcknowledgeTest : public testing::TestWithParam {}; + +TEST_P(AcknowledgeTest, testAckMsgList) { + Client client(lookupUrl); + auto clientImplPtr = PulsarFriend::getClientImplPtr(client); + + constexpr auto numMsg = 100; + std::string uniqueChunk = unique_str(); + std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk; + const std::string subName = "sub-ack-list"; + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + + ConsumerConfiguration consumerConfig; + consumerConfig.setAckGroupingMaxSize(numMsg); + consumerConfig.setAckGroupingTimeMs(GetParam()); + consumerConfig.setUnAckedMessagesTimeoutMs(10000); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer)); + + // Sending and receiving messages. + for (auto count = 0; count < numMsg; ++count) { + Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + } + + std::vector recvMsgId; + for (auto count = 0; count < numMsg; ++count) { + Message msg; + ASSERT_EQ(ResultOk, consumer.receive(msg, 1000)); + recvMsgId.emplace_back(msg.getMessageId()); + } + ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId)); + + // try redeliver unack messages. + consumer.redeliverUnacknowledgedMessages(); + + auto consumerStats = PulsarFriend::getConsumerStatsPtr(consumer); + auto ackMap = consumerStats->getAckedMsgMap(); + unsigned long totalAck = ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)]; + ASSERT_EQ(totalAck, numMsg); + + Message msg; + auto ret = consumer.receive(msg, 1000); + ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId(); + + producer.close(); + consumer.close(); + client.close(); +} + +TEST_P(AcknowledgeTest, testAckMsgListWithMultiConsumer) { + Client client(lookupUrl); + auto clientImplPtr = PulsarFriend::getClientImplPtr(client); + + std::string uniqueChunk = unique_str(); + std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk; + + // call admin api to make it partitioned + std::string url = + adminUrl + "admin/v2/persistent/public/default/test-ack-msgs" + uniqueChunk + "/partitions"; + int res = makePutRequest(url, "5"); + LOG_INFO("res = " << res); + ASSERT_FALSE(res != 204 && res != 409); + + constexpr auto numMsg = 100; + const std::string subName = "sub-ack-list"; + + Producer producer; + ProducerConfiguration producerConfig; + // Turn off batch to ensure even distribution + producerConfig.setBatchingEnabled(false); + producerConfig.setPartitionsRoutingMode(pulsar::ProducerConfiguration::RoundRobinDistribution); + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer)); + + ConsumerConfiguration consumerConfig; + // set ack grouping max size is 10 + consumerConfig.setAckGroupingMaxSize(10); + consumerConfig.setAckGroupingTimeMs(GetParam()); + consumerConfig.setUnAckedMessagesTimeoutMs(10000); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer)); + + // Sending and receiving messages. + for (auto count = 0; count < numMsg; ++count) { + Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + } + + std::vector recvMsgId; + for (auto count = 0; count < numMsg; ++count) { + Message msg; + ASSERT_EQ(ResultOk, consumer.receive(msg, 1000)); + recvMsgId.emplace_back(msg.getMessageId()); + } + ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId)); + + // try redeliver unack messages. + consumer.redeliverUnacknowledgedMessages(); + + // assert stats + unsigned long totalAck = 0; + auto consumerStatsList = PulsarFriend::getConsumerStatsPtrList(consumer); + for (auto consumerStats : consumerStatsList) { + auto ackMap = consumerStats->getAckedMsgMap(); + totalAck += ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)]; + } + ASSERT_EQ(totalAck, numMsg); + + Message msg; + auto ret = consumer.receive(msg, 1000); + ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId(); + + producer.close(); + consumer.close(); + client.close(); +} + +INSTANTIATE_TEST_SUITE_P(BasicEndToEndTest, AcknowledgeTest, testing::Values(100, 0)); diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc index 4b181d0f..b801d381 100644 --- a/tests/BasicEndToEndTest.cc +++ b/tests/BasicEndToEndTest.cc @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -58,7 +59,7 @@ static int globalCount = 0; static long globalResendMessageCount = 0; std::string lookupUrl = "pulsar://localhost:6650"; static std::string adminUrl = "http://localhost:8080/"; -static int uniqueCounter = 0; +static std::atomic_int uniqueCounter{0}; std::string unique_str() { long nanos = std::chrono::duration_cast( @@ -4276,118 +4277,3 @@ void testBatchReceiveClose(bool multiConsumer) { TEST(BasicEndToEndTest, testBatchReceiveClose) { testBatchReceiveClose(false); } TEST(BasicEndToEndTest, testBatchReceiveCloseWithMultiConsumer) { testBatchReceiveClose(true); } - -TEST(BasicEndToEndTest, testAckMsgList) { - Client client(lookupUrl); - auto clientImplPtr = PulsarFriend::getClientImplPtr(client); - - constexpr auto numMsg = 100; - std::string uniqueChunk = unique_str(); - std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk; - const std::string subName = "sub-ack-list"; - - Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); - - ConsumerConfiguration consumerConfig; - consumerConfig.setAckGroupingMaxSize(numMsg); - consumerConfig.setUnAckedMessagesTimeoutMs(10000); - Consumer consumer; - ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer)); - - // Sending and receiving messages. - for (auto count = 0; count < numMsg; ++count) { - Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build(); - ASSERT_EQ(ResultOk, producer.send(msg)); - } - - std::vector recvMsgId; - for (auto count = 0; count < numMsg; ++count) { - Message msg; - ASSERT_EQ(ResultOk, consumer.receive(msg, 1000)); - recvMsgId.emplace_back(msg.getMessageId()); - } - ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId)); - - // try redeliver unack messages. - consumer.redeliverUnacknowledgedMessages(); - - auto consumerStats = PulsarFriend::getConsumerStatsPtr(consumer); - auto ackMap = consumerStats->getAckedMsgMap(); - unsigned long totalAck = ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)]; - ASSERT_EQ(totalAck, numMsg); - - Message msg; - auto ret = consumer.receive(msg, 1000); - ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId(); - - producer.close(); - consumer.close(); - client.close(); -} - -TEST(BasicEndToEndTest, testAckMsgListWithMultiConsumer) { - Client client(lookupUrl); - auto clientImplPtr = PulsarFriend::getClientImplPtr(client); - - std::string uniqueChunk = unique_str(); - std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk; - - // call admin api to make it partitioned - std::string url = - adminUrl + "admin/v2/persistent/public/default/test-ack-msgs" + uniqueChunk + "/partitions"; - int res = makePutRequest(url, "5"); - LOG_INFO("res = " << res); - ASSERT_FALSE(res != 204 && res != 409); - - constexpr auto numMsg = 100; - const std::string subName = "sub-ack-list"; - - Producer producer; - ProducerConfiguration producerConfig; - // Turn off batch to ensure even distribution - producerConfig.setBatchingEnabled(false); - producerConfig.setPartitionsRoutingMode(pulsar::ProducerConfiguration::RoundRobinDistribution); - ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer)); - - ConsumerConfiguration consumerConfig; - // set ack grouping max size is 10 - consumerConfig.setAckGroupingMaxSize(10); - consumerConfig.setUnAckedMessagesTimeoutMs(10000); - Consumer consumer; - ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer)); - - // Sending and receiving messages. - for (auto count = 0; count < numMsg; ++count) { - Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build(); - ASSERT_EQ(ResultOk, producer.send(msg)); - } - - std::vector recvMsgId; - for (auto count = 0; count < numMsg; ++count) { - Message msg; - ASSERT_EQ(ResultOk, consumer.receive(msg, 1000)); - recvMsgId.emplace_back(msg.getMessageId()); - } - ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId)); - - // try redeliver unack messages. - consumer.redeliverUnacknowledgedMessages(); - - // assert stats - unsigned long totalAck = 0; - auto consumerStatsList = PulsarFriend::getConsumerStatsPtrList(consumer); - for (auto consumerStats : consumerStatsList) { - auto ackMap = consumerStats->getAckedMsgMap(); - totalAck += ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)]; - } - ASSERT_EQ(totalAck, numMsg); - - Message msg; - auto ret = consumer.receive(msg, 1000); - ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId(); - - producer.close(); - consumer.close(); - client.close(); -}