Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions lib/AckGroupingTracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uin
return true;
}

static std::ostream& operator<<(std::ostream& os, const std::set<MessageId>& msgIds) {
bool first = true;
for (auto&& msgId : msgIds) {
if (first) {
first = false;
} else {
os << ", ";
}
os << "[" << msgId << "]";
}
return os;
}

bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId,
const std::set<MessageId>& msgIds) {
auto cnx = connWeakPtr.lock();
Expand All @@ -54,8 +67,15 @@ bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uin
return false;
}

for (const auto& msgId : msgIds) {
sendAck(cnx, consumerId, msgId, CommandAck_AckType_Individual);
if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
auto cmd = Commands::newMultiMessageAck(consumerId, msgIds);
cnx->sendCommand(cmd);
LOG_DEBUG("ACK request is sent for " << msgIds.size() << " messages: " << msgIds);
} else {
// Broker does not support multi-message ACK, use multiple individual ACKs instead.
for (const auto& msgId : msgIds) {
sendAck(cnx, consumerId, msgId, CommandAck_AckType_Individual);
}
}
return true;
}
Expand Down
8 changes: 8 additions & 0 deletions lib/AckGroupingTrackerDisabled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ void AckGroupingTrackerDisabled::addAcknowledge(const MessageId& msgId) {
this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, CommandAck_AckType_Individual);
}

void AckGroupingTrackerDisabled::addAcknowledgeList(const MessageIdList& msgIds) {
std::set<MessageId> msgIdSet;
for (auto&& msgId : msgIds) {
msgIdSet.emplace(msgId);
}
this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgIdSet);
}

void AckGroupingTrackerDisabled::addAcknowledgeCumulative(const MessageId& msgId) {
this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, CommandAck_AckType_Cumulative);
}
Expand Down
1 change: 1 addition & 0 deletions lib/AckGroupingTrackerDisabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class AckGroupingTrackerDisabled : public AckGroupingTracker {
AckGroupingTrackerDisabled(HandlerBase& handler, uint64_t consumerId);

void addAcknowledge(const MessageId& msgId) override;
void addAcknowledgeList(const MessageIdList& msgIds) override;
void addAcknowledgeCumulative(const MessageId& msgId) override;

private:
Expand Down
8 changes: 1 addition & 7 deletions lib/AckGroupingTrackerEnabled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,7 @@ void AckGroupingTrackerEnabled::flush() {
// Send ACK for individual ACK requests.
std::lock_guard<std::recursive_mutex> lock(this->rmutexPendingIndAcks_);
if (!this->pendingIndividualAcks_.empty()) {
if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
auto cmd = Commands::newMultiMessageAck(this->consumerId_, this->pendingIndividualAcks_);
cnx->sendCommand(cmd);
} else {
// Broker does not support multi-message ACK, use multiple individual ACK instead.
this->doImmediateAck(cnx, this->consumerId_, this->pendingIndividualAcks_);
}
this->doImmediateAck(cnx, consumerId_, this->pendingIndividualAcks_);
this->pendingIndividualAcks_.clear();
}
}
Expand Down
154 changes: 154 additions & 0 deletions tests/AcknowledgeTest.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>

#include "HttpHelper.h"
#include "PulsarFriend.h"
#include "lib/LogUtils.h"

DECLARE_LOG_OBJECT()

using namespace pulsar;

static std::string lookupUrl = "pulsar://localhost:6650";
static std::string adminUrl = "http://localhost:8080/";

extern std::string unique_str();

class AcknowledgeTest : public testing::TestWithParam<int> {};

TEST_P(AcknowledgeTest, testAckMsgList) {
Client client(lookupUrl);
auto clientImplPtr = PulsarFriend::getClientImplPtr(client);

constexpr auto numMsg = 100;
std::string uniqueChunk = unique_str();
std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk;
const std::string subName = "sub-ack-list";

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));

ConsumerConfiguration consumerConfig;
consumerConfig.setAckGroupingMaxSize(numMsg);
consumerConfig.setAckGroupingTimeMs(GetParam());
consumerConfig.setUnAckedMessagesTimeoutMs(10000);
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer));

// Sending and receiving messages.
for (auto count = 0; count < numMsg; ++count) {
Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}

std::vector<MessageId> recvMsgId;
for (auto count = 0; count < numMsg; ++count) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
recvMsgId.emplace_back(msg.getMessageId());
}
ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId));

// try redeliver unack messages.
consumer.redeliverUnacknowledgedMessages();

auto consumerStats = PulsarFriend::getConsumerStatsPtr(consumer);
auto ackMap = consumerStats->getAckedMsgMap();
unsigned long totalAck = ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)];
ASSERT_EQ(totalAck, numMsg);

Message msg;
auto ret = consumer.receive(msg, 1000);
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();

producer.close();
consumer.close();
client.close();
}

TEST_P(AcknowledgeTest, testAckMsgListWithMultiConsumer) {
Client client(lookupUrl);
auto clientImplPtr = PulsarFriend::getClientImplPtr(client);

std::string uniqueChunk = unique_str();
std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk;

// call admin api to make it partitioned
std::string url =
adminUrl + "admin/v2/persistent/public/default/test-ack-msgs" + uniqueChunk + "/partitions";
int res = makePutRequest(url, "5");
LOG_INFO("res = " << res);
ASSERT_FALSE(res != 204 && res != 409);

constexpr auto numMsg = 100;
const std::string subName = "sub-ack-list";

Producer producer;
ProducerConfiguration producerConfig;
// Turn off batch to ensure even distribution
producerConfig.setBatchingEnabled(false);
producerConfig.setPartitionsRoutingMode(pulsar::ProducerConfiguration::RoundRobinDistribution);
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer));

ConsumerConfiguration consumerConfig;
// set ack grouping max size is 10
consumerConfig.setAckGroupingMaxSize(10);
consumerConfig.setAckGroupingTimeMs(GetParam());
consumerConfig.setUnAckedMessagesTimeoutMs(10000);
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer));

// Sending and receiving messages.
for (auto count = 0; count < numMsg; ++count) {
Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}

std::vector<MessageId> recvMsgId;
for (auto count = 0; count < numMsg; ++count) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
recvMsgId.emplace_back(msg.getMessageId());
}
ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId));

// try redeliver unack messages.
consumer.redeliverUnacknowledgedMessages();

// assert stats
unsigned long totalAck = 0;
auto consumerStatsList = PulsarFriend::getConsumerStatsPtrList(consumer);
for (auto consumerStats : consumerStatsList) {
auto ackMap = consumerStats->getAckedMsgMap();
totalAck += ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)];
}
ASSERT_EQ(totalAck, numMsg);

Message msg;
auto ret = consumer.receive(msg, 1000);
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();

producer.close();
consumer.close();
client.close();
}

INSTANTIATE_TEST_SUITE_P(BasicEndToEndTest, AcknowledgeTest, testing::Values(100, 0));
118 changes: 2 additions & 116 deletions tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <pulsar/Client.h>

#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstring>
#include <functional>
Expand Down Expand Up @@ -58,7 +59,7 @@ static int globalCount = 0;
static long globalResendMessageCount = 0;
std::string lookupUrl = "pulsar://localhost:6650";
static std::string adminUrl = "http://localhost:8080/";
static int uniqueCounter = 0;
static std::atomic_int uniqueCounter{0};

std::string unique_str() {
long nanos = std::chrono::duration_cast<std::chrono::milliseconds>(
Expand Down Expand Up @@ -4276,118 +4277,3 @@ void testBatchReceiveClose(bool multiConsumer) {
TEST(BasicEndToEndTest, testBatchReceiveClose) { testBatchReceiveClose(false); }

TEST(BasicEndToEndTest, testBatchReceiveCloseWithMultiConsumer) { testBatchReceiveClose(true); }

TEST(BasicEndToEndTest, testAckMsgList) {
Client client(lookupUrl);
auto clientImplPtr = PulsarFriend::getClientImplPtr(client);

constexpr auto numMsg = 100;
std::string uniqueChunk = unique_str();
std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk;
const std::string subName = "sub-ack-list";

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));

ConsumerConfiguration consumerConfig;
consumerConfig.setAckGroupingMaxSize(numMsg);
consumerConfig.setUnAckedMessagesTimeoutMs(10000);
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer));

// Sending and receiving messages.
for (auto count = 0; count < numMsg; ++count) {
Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}

std::vector<MessageId> recvMsgId;
for (auto count = 0; count < numMsg; ++count) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
recvMsgId.emplace_back(msg.getMessageId());
}
ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId));

// try redeliver unack messages.
consumer.redeliverUnacknowledgedMessages();

auto consumerStats = PulsarFriend::getConsumerStatsPtr(consumer);
auto ackMap = consumerStats->getAckedMsgMap();
unsigned long totalAck = ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)];
ASSERT_EQ(totalAck, numMsg);

Message msg;
auto ret = consumer.receive(msg, 1000);
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();

producer.close();
consumer.close();
client.close();
}

TEST(BasicEndToEndTest, testAckMsgListWithMultiConsumer) {
Client client(lookupUrl);
auto clientImplPtr = PulsarFriend::getClientImplPtr(client);

std::string uniqueChunk = unique_str();
std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk;

// call admin api to make it partitioned
std::string url =
adminUrl + "admin/v2/persistent/public/default/test-ack-msgs" + uniqueChunk + "/partitions";
int res = makePutRequest(url, "5");
LOG_INFO("res = " << res);
ASSERT_FALSE(res != 204 && res != 409);

constexpr auto numMsg = 100;
const std::string subName = "sub-ack-list";

Producer producer;
ProducerConfiguration producerConfig;
// Turn off batch to ensure even distribution
producerConfig.setBatchingEnabled(false);
producerConfig.setPartitionsRoutingMode(pulsar::ProducerConfiguration::RoundRobinDistribution);
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer));

ConsumerConfiguration consumerConfig;
// set ack grouping max size is 10
consumerConfig.setAckGroupingMaxSize(10);
consumerConfig.setUnAckedMessagesTimeoutMs(10000);
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer));

// Sending and receiving messages.
for (auto count = 0; count < numMsg; ++count) {
Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}

std::vector<MessageId> recvMsgId;
for (auto count = 0; count < numMsg; ++count) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
recvMsgId.emplace_back(msg.getMessageId());
}
ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId));

// try redeliver unack messages.
consumer.redeliverUnacknowledgedMessages();

// assert stats
unsigned long totalAck = 0;
auto consumerStatsList = PulsarFriend::getConsumerStatsPtrList(consumer);
for (auto consumerStats : consumerStatsList) {
auto ackMap = consumerStats->getAckedMsgMap();
totalAck += ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)];
}
ASSERT_EQ(totalAck, numMsg);

Message msg;
auto ret = consumer.receive(msg, 1000);
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();

producer.close();
consumer.close();
client.close();
}