Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions include/pulsar/Consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ class PULSAR_PUBLIC Consumer {
*/
Result acknowledge(const MessageId& messageId);

/**
* Acknowledge the consumption of a list of message.
* @param messageIdList
*/
Result acknowledge(const MessageIdList& messageIdList);

/**
* Asynchronously acknowledge the reception of a single message.
*
Expand All @@ -186,6 +192,14 @@ class PULSAR_PUBLIC Consumer {
*/
void acknowledgeAsync(const MessageId& messageId, ResultCallback callback);

/**
* Asynchronously acknowledge the consumption of a list of message.
* @param messageIdList
* @param callback the callback that is triggered when the message has been acknowledged or not
* @return
*/
void acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback);

/**
* Acknowledge the reception of all the messages in the stream up to (and including)
* the provided message.
Expand Down
3 changes: 3 additions & 0 deletions include/pulsar/MessageId.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <iosfwd>
#include <memory>
#include <string>
#include <vector>

namespace pulsar {

Expand Down Expand Up @@ -107,6 +108,8 @@ class PULSAR_PUBLIC MessageId {
typedef std::shared_ptr<MessageIdImpl> MessageIdImplPtr;
MessageIdImplPtr impl_;
};

typedef std::vector<MessageId> MessageIdList;
} // namespace pulsar

#endif // MESSAGE_ID_H
6 changes: 6 additions & 0 deletions lib/AckGroupingTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
*/
virtual void addAcknowledge(const MessageId& msgId) {}

/**
* Adding message ID list into ACK group for individual ACK.
* @param[in] msgIds of the message to be ACKed.
*/
virtual void addAcknowledgeList(const MessageIdList& msgIds) {}

/**
* Adding message ID into ACK group for cumulative ACK.
* @param[in] msgId ID of the message to be ACKed.
Expand Down
10 changes: 10 additions & 0 deletions lib/AckGroupingTrackerEnabled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ void AckGroupingTrackerEnabled::addAcknowledge(const MessageId& msgId) {
}
}

void AckGroupingTrackerEnabled::addAcknowledgeList(const MessageIdList& msgIds) {
std::lock_guard<std::recursive_mutex> lock(this->rmutexPendingIndAcks_);
for (const auto& msgId : msgIds) {
this->pendingIndividualAcks_.emplace(msgId);
}
if (this->ackGroupingMaxSize_ > 0 && this->pendingIndividualAcks_.size() >= this->ackGroupingMaxSize_) {
this->flush();
}
}

void AckGroupingTrackerEnabled::addAcknowledgeCumulative(const MessageId& msgId) {
std::lock_guard<std::mutex> lock(this->mutexCumulativeAckMsgId_);
if (msgId > this->nextCumulativeAckMsgId_) {
Expand Down
1 change: 1 addition & 0 deletions lib/AckGroupingTrackerEnabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class AckGroupingTrackerEnabled : public AckGroupingTracker {
void start() override;
bool isDuplicate(const MessageId& msgId) override;
void addAcknowledge(const MessageId& msgId) override;
void addAcknowledgeList(const MessageIdList& msgIds) override;
void addAcknowledgeCumulative(const MessageId& msgId) override;
void close() override;
void flush() override;
Expand Down
20 changes: 20 additions & 0 deletions lib/Consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,17 @@ Result Consumer::acknowledge(const MessageId& messageId) {
return result;
}

Result Consumer::acknowledge(const MessageIdList& messageIdList) {
if (!impl_) {
return ResultConsumerNotInitialized;
}
Promise<bool, Result> promise;
impl_->acknowledgeAsync(messageIdList, WaitForCallback(promise));
Result result;
promise.getFuture().get(result);
return result;
}

void Consumer::acknowledgeAsync(const Message& message, ResultCallback callback) {
if (!impl_) {
callback(ResultConsumerNotInitialized);
Expand All @@ -133,6 +144,15 @@ void Consumer::acknowledgeAsync(const MessageId& messageId, ResultCallback callb
impl_->acknowledgeAsync(messageId, callback);
}

void Consumer::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
if (!impl_) {
callback(ResultConsumerNotInitialized);
return;
}

impl_->acknowledgeAsync(messageIdList, callback);
}

Result Consumer::acknowledgeCumulative(const Message& message) {
return acknowledgeCumulative(message.getMessageId());
}
Expand Down
23 changes: 17 additions & 6 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -943,16 +943,17 @@ inline CommandSubscribe_InitialPosition ConsumerImpl::getInitialPosition() {
BOOST_THROW_EXCEPTION(std::logic_error("Invalid InitialPosition enumeration value"));
}

void ConsumerImpl::statsCallback(Result res, ResultCallback callback, CommandAck_AckType ackType) {
consumerStatsBasePtr_->messageAcknowledged(res, ackType);
void ConsumerImpl::statsAckCallback(Result res, ResultCallback callback, CommandAck_AckType ackType,
uint32_t numAcks) {
consumerStatsBasePtr_->messageAcknowledged(res, ackType, numAcks);
if (callback) {
callback(res);
}
}

void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, get_shared_this_ptr(), std::placeholders::_1,
callback, CommandAck_AckType_Individual);
ResultCallback cb = std::bind(&ConsumerImpl::statsAckCallback, get_shared_this_ptr(),
std::placeholders::_1, callback, CommandAck_AckType_Individual, 1);
if (msgId.batchIndex() != -1 &&
!batchAcknowledgementTracker_.isBatchReady(msgId, CommandAck_AckType_Individual)) {
cb(ResultOk);
Expand All @@ -961,9 +962,19 @@ void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callb
doAcknowledgeIndividual(msgId, cb);
}

void ConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
ResultCallback cb =
std::bind(&ConsumerImpl::statsAckCallback, get_shared_this_ptr(), std::placeholders::_1, callback,
proto::CommandAck_AckType_Individual, messageIdList.size());
// Currently not supported batch message id individual index ack.
this->ackGroupingTrackerPtr_->addAcknowledgeList(messageIdList);
this->unAckedMessageTrackerPtr_->remove(messageIdList);
cb(ResultOk);
}

void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) {
ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, get_shared_this_ptr(), std::placeholders::_1,
callback, CommandAck_AckType_Cumulative);
ResultCallback cb = std::bind(&ConsumerImpl::statsAckCallback, get_shared_this_ptr(),
std::placeholders::_1, callback, CommandAck_AckType_Cumulative, 1);
if (!isCumulativeAcknowledgementAllowed(config_.getConsumerType())) {
cb(ResultCumulativeAcknowledgementNotAllowedError);
return;
Expand Down
4 changes: 3 additions & 1 deletion lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class ConsumerImpl : public ConsumerImplBase {
void receiveAsync(ReceiveCallback& callback) override;
void unsubscribeAsync(ResultCallback callback) override;
void acknowledgeAsync(const MessageId& msgId, ResultCallback callback) override;
void acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) override;
void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) override;
void closeAsync(ResultCallback callback) override;
void start() override;
Expand Down Expand Up @@ -181,8 +182,9 @@ class ConsumerImpl : public ConsumerImplBase {
// TODO - Convert these functions to lambda when we move to C++11
Result receiveHelper(Message& msg);
Result receiveHelper(Message& msg, int timeout);
void statsCallback(Result, ResultCallback, CommandAck_AckType);
void executeNotifyCallback(Message& msg);
void statsAckCallback(Result res, ResultCallback callback, CommandAck_AckType ackType,
uint32_t numAcks = 1);
void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);
void failPendingReceiveCallback();
void setNegativeAcknowledgeEnabledForTesting(bool enabled) override;
Expand Down
1 change: 1 addition & 0 deletions lib/ConsumerImplBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class ConsumerImplBase : public HandlerBase, public std::enable_shared_from_this
void batchReceiveAsync(BatchReceiveCallback callback);
virtual void unsubscribeAsync(ResultCallback callback) = 0;
virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback callback) = 0;
virtual void acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) = 0;
virtual void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) = 0;
virtual void closeAsync(ResultCallback callback) = 0;
virtual void start() = 0;
Expand Down
37 changes: 37 additions & 0 deletions lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,43 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal
}
}

void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
if (state_ != Ready) {
callback(ResultAlreadyClosed);
return;
}

std::unordered_map<std::string, MessageIdList> topicToMessageId;
for (const MessageId& messageId : messageIdList) {
auto topicName = messageId.getTopicName();
topicToMessageId[topicName].emplace_back(messageId);
}

auto needCallBack = std::make_shared<std::atomic<int>>(topicToMessageId.size());
auto cb = [callback, needCallBack](Result result) {
if (result != ResultOk) {
LOG_ERROR("Filed when acknowledge list: " << result);
// set needCallBack is -1 to avoid repeated callback.
needCallBack->store(-1);
callback(result);
return;
}
if (--(*needCallBack) == 0) {
callback(result);
}
};
for (const auto& kv : topicToMessageId) {
auto optConsumer = consumers_.find(kv.first);
if (optConsumer.is_present()) {
unAckedMessageTrackerPtr_->remove(kv.second);
optConsumer.value()->acknowledgeAsync(kv.second, cb);
} else {
LOG_ERROR("Message of topic: " << kv.first << " not in consumers");
callback(ResultUnknownError);
}
}
}

void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) {
callback(ResultOperationNotSupported);
}
Expand Down
2 changes: 2 additions & 0 deletions lib/MultiTopicsConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
void receiveAsync(ReceiveCallback& callback) override;
void unsubscribeAsync(ResultCallback callback) override;
void acknowledgeAsync(const MessageId& msgId, ResultCallback callback) override;
void acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) override;
void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) override;
void closeAsync(ResultCallback callback) override;
void start() override;
Expand Down Expand Up @@ -152,6 +153,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
bool hasEnoughMessagesForBatchReceive() const override;
void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) override;
void beforeConnectionChange(ClientConnection& cnx) override;
friend class PulsarFriend;

private:
std::shared_ptr<MultiTopicsConsumerImpl> get_shared_this_ptr();
Expand Down
1 change: 1 addition & 0 deletions lib/UnAckedMessageTrackerDisabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class UnAckedMessageTrackerDisabled : public UnAckedMessageTrackerInterface {
public:
bool add(const MessageId& m) { return false; }
bool remove(const MessageId& m) { return false; }
void remove(const MessageIdList& msgIds) {}
void removeMessagesTill(const MessageId& msgId) {}
void removeTopicMessage(const std::string& topic) {}

Expand Down
23 changes: 15 additions & 8 deletions lib/UnAckedMessageTrackerEnabled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() {
}

void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() {
std::unique_lock<std::mutex> acquire(lock_);
std::unique_lock<std::recursive_mutex> acquire(lock_);
LOG_DEBUG("UnAckedMessageTrackerEnabled::timeoutHandlerHelper invoked for consumerPtr_ "
<< consumerReference_.getName().c_str());

Expand Down Expand Up @@ -95,7 +95,7 @@ UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, long
}

bool UnAckedMessageTrackerEnabled::add(const MessageId& msgId) {
std::lock_guard<std::mutex> acquire(lock_);
std::lock_guard<std::recursive_mutex> acquire(lock_);
MessageId id(msgId.partition(), msgId.ledgerId(), msgId.entryId(), -1);
if (messageIdPartitionMap.count(id) == 0) {
std::set<MessageId>& partition = timePartitions.back();
Expand All @@ -107,12 +107,12 @@ bool UnAckedMessageTrackerEnabled::add(const MessageId& msgId) {
}

bool UnAckedMessageTrackerEnabled::isEmpty() {
std::lock_guard<std::mutex> acquire(lock_);
std::lock_guard<std::recursive_mutex> acquire(lock_);
return messageIdPartitionMap.empty();
}

bool UnAckedMessageTrackerEnabled::remove(const MessageId& msgId) {
std::lock_guard<std::mutex> acquire(lock_);
std::lock_guard<std::recursive_mutex> acquire(lock_);
MessageId id(msgId.partition(), msgId.ledgerId(), msgId.entryId(), -1);
bool removed = false;

Expand All @@ -124,13 +124,20 @@ bool UnAckedMessageTrackerEnabled::remove(const MessageId& msgId) {
return removed;
}

void UnAckedMessageTrackerEnabled::remove(const MessageIdList& msgIds) {
std::lock_guard<std::recursive_mutex> acquire(lock_);
for (const auto& msgId : msgIds) {
remove(msgId);
}
}

long UnAckedMessageTrackerEnabled::size() {
std::lock_guard<std::mutex> acquire(lock_);
std::lock_guard<std::recursive_mutex> acquire(lock_);
return messageIdPartitionMap.size();
}

void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
std::lock_guard<std::mutex> acquire(lock_);
std::lock_guard<std::recursive_mutex> acquire(lock_);
for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end();) {
MessageId msgIdInMap = it->first;
if (msgIdInMap <= msgId) {
Expand All @@ -144,7 +151,7 @@ void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {

// this is only for MultiTopicsConsumerImpl, when un-subscribe a single topic, should remove all it's message.
void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& topic) {
std::lock_guard<std::mutex> acquire(lock_);
std::lock_guard<std::recursive_mutex> acquire(lock_);
for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end();) {
MessageId msgIdInMap = it->first;
if (msgIdInMap.getTopicName().compare(topic) == 0) {
Expand All @@ -157,7 +164,7 @@ void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& topic)
}

void UnAckedMessageTrackerEnabled::clear() {
std::lock_guard<std::mutex> acquire(lock_);
std::lock_guard<std::recursive_mutex> acquire(lock_);
messageIdPartitionMap.clear();
for (auto it = timePartitions.begin(); it != timePartitions.end(); it++) {
it->clear();
Expand Down
3 changes: 2 additions & 1 deletion lib/UnAckedMessageTrackerEnabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
UnAckedMessageTrackerEnabled(long timeoutMs, long tickDuration, ClientImplPtr, ConsumerImplBase&);
bool add(const MessageId& msgId);
bool remove(const MessageId& msgId);
void remove(const MessageIdList& msgIds);
void removeMessagesTill(const MessageId& msgId);
void removeTopicMessage(const std::string& topic);
void timeoutHandler();
Expand All @@ -53,7 +54,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
long size();
std::map<MessageId, std::set<MessageId>&> messageIdPartitionMap;
std::deque<std::set<MessageId>> timePartitions;
std::mutex lock_;
std::recursive_mutex lock_;
ConsumerImplBase& consumerReference_;
ClientImplPtr client_;
DeadlineTimerPtr timer_; // DO NOT place this before client_!
Expand Down
1 change: 1 addition & 0 deletions lib/UnAckedMessageTrackerInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class UnAckedMessageTrackerInterface {
UnAckedMessageTrackerInterface() {}
virtual bool add(const MessageId& m) = 0;
virtual bool remove(const MessageId& m) = 0;
virtual void remove(const MessageIdList& msgIds) = 0;
virtual void removeMessagesTill(const MessageId& msgId) = 0;
virtual void clear() = 0;
// this is only for MultiTopicsConsumerImpl, when un-subscribe a single topic, should remove all it's
Expand Down
2 changes: 1 addition & 1 deletion lib/stats/ConsumerStatsBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace pulsar {
class ConsumerStatsBase {
public:
virtual void receivedMessage(Message&, Result) = 0;
virtual void messageAcknowledged(Result, CommandAck_AckType) = 0;
virtual void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums = 1) = 0;
virtual ~ConsumerStatsBase() {}
};

Expand Down
2 changes: 1 addition & 1 deletion lib/stats/ConsumerStatsDisabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace pulsar {
class ConsumerStatsDisabled : public ConsumerStatsBase {
public:
virtual void receivedMessage(Message&, Result) {}
virtual void messageAcknowledged(Result, CommandAck_AckType) {}
virtual void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums) {}
};

} /* namespace pulsar */
Expand Down
6 changes: 3 additions & 3 deletions lib/stats/ConsumerStatsImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ void ConsumerStatsImpl::receivedMessage(Message& msg, Result res) {
totalReceivedMsgMap_[res] += 1;
}

void ConsumerStatsImpl::messageAcknowledged(Result res, CommandAck_AckType ackType) {
void ConsumerStatsImpl::messageAcknowledged(Result res, CommandAck_AckType ackType, uint32_t ackNums) {
Lock lock(mutex_);
ackedMsgMap_[std::make_pair(res, ackType)] += 1;
totalAckedMsgMap_[std::make_pair(res, ackType)] += 1;
ackedMsgMap_[std::make_pair(res, ackType)] += ackNums;
totalAckedMsgMap_[std::make_pair(res, ackType)] += ackNums;
}

std::ostream& operator<<(std::ostream& os,
Expand Down
2 changes: 1 addition & 1 deletion lib/stats/ConsumerStatsImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class ConsumerStatsImpl : public ConsumerStatsBase {
ConsumerStatsImpl(const ConsumerStatsImpl& stats);
void flushAndReset(const boost::system::error_code&);
virtual void receivedMessage(Message&, Result);
virtual void messageAcknowledged(Result, CommandAck_AckType);
virtual void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums = 1);
virtual ~ConsumerStatsImpl();

const inline std::map<std::pair<Result, CommandAck_AckType>, unsigned long>& getAckedMsgMap() const {
Expand Down
Loading