Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pulsar-client-cpp/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ apidocs/
generated/

# CMAKE
.cmake
Makefile
cmake_install.cmake
CMakeFiles
Expand Down
1 change: 0 additions & 1 deletion pulsar-client-cpp/include/pulsar/Consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,6 @@ class PULSAR_PUBLIC Consumer {

friend class PulsarFriend;
friend class PulsarWrapper;
friend class PartitionedConsumerImpl;
friend class MultiTopicsConsumerImpl;
friend class ConsumerImpl;
friend class ClientImpl;
Expand Down
1 change: 0 additions & 1 deletion pulsar-client-cpp/include/pulsar/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ class PULSAR_PUBLIC Message {
Message(const MessageId& messageId, proto::MessageMetadata& metadata, SharedBuffer& payload,
proto::SingleMessageMetadata& singleMetadata, const std::string& topicName);
friend class PartitionedProducerImpl;
friend class PartitionedConsumerImpl;
friend class MultiTopicsConsumerImpl;
friend class MessageBuilder;
friend class ConsumerImpl;
Expand Down
1 change: 0 additions & 1 deletion pulsar-client-cpp/include/pulsar/MessageId.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ class PULSAR_PUBLIC MessageId {
friend class MessageImpl;
friend class Commands;
friend class PartitionedProducerImpl;
friend class PartitionedConsumerImpl;
friend class MultiTopicsConsumerImpl;
friend class UnAckedMessageTrackerEnabled;
friend class BatchAcknowledgementTracker;
Expand Down
6 changes: 3 additions & 3 deletions pulsar-client-cpp/lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include "ProducerImpl.h"
#include "ReaderImpl.h"
#include "PartitionedProducerImpl.h"
#include "PartitionedConsumerImpl.h"
#include "MultiTopicsConsumerImpl.h"
#include "PatternMultiTopicsConsumerImpl.h"
#include "TimeUtils.h"
Expand Down Expand Up @@ -381,8 +380,9 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr
callback(ResultInvalidConfiguration, Consumer());
return;
}
consumer = std::make_shared<PartitionedConsumerImpl>(
shared_from_this(), subscriptionName, topicName, partitionMetadata->getPartitions(), conf);
consumer = std::make_shared<MultiTopicsConsumerImpl>(shared_from_this(), topicName,
partitionMetadata->getPartitions(),
subscriptionName, conf, lookupServicePtr_);
} else {
auto consumerImpl = std::make_shared<ConsumerImpl>(shared_from_this(), topicName->toString(),
subscriptionName, conf);
Expand Down
1 change: 0 additions & 1 deletion pulsar-client-cpp/lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,6 @@ class ConsumerImpl : public ConsumerImplBase,

// these two declared friend to access setNegativeAcknowledgeEnabledForTesting
friend class MultiTopicsConsumerImpl;
friend class PartitionedConsumerImpl;

FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
Expand Down
151 changes: 130 additions & 21 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* under the License.
*/
#include "MultiTopicsConsumerImpl.h"
#include "MultiResultCallback.h"

DECLARE_LOG_OBJECT()

Expand All @@ -25,7 +26,7 @@ using namespace pulsar;
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics,
const std::string& subscriptionName, TopicNamePtr topicName,
const ConsumerConfiguration& conf,
const LookupServicePtr lookupServicePtr)
LookupServicePtr lookupServicePtr)
: client_(client),
subscriptionName_(subscriptionName),
topic_(topicName ? topicName->toString() : "EmptyTopics"),
Expand All @@ -52,6 +53,12 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
} else {
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
}
auto partitionsUpdateInterval = static_cast<unsigned int>(client_->conf().getPartitionsUpdateInterval());
if (partitionsUpdateInterval > 0) {
partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer();
partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval);
lookupServicePtr_ = client_->getLookup();
}
}

void MultiTopicsConsumerImpl::start() {
Expand Down Expand Up @@ -125,33 +132,39 @@ Future<Result, Consumer> MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s
}

// subscribe for each partition, when all partitions completed, complete promise
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(std::bind(
&MultiTopicsConsumerImpl::subscribeTopicPartitions, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, topicName, subscriptionName_, conf_, topicPromise));
Lock lock(mutex_);
auto entry = topicsPartitions_.find(topic);
if (entry == topicsPartitions_.end()) {
lock.unlock();
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
[this, topicName, topicPromise](Result result, const LookupDataResultPtr& lookupDataResult) {
if (result != ResultOk) {
LOG_ERROR("Error Checking/Getting Partition Metadata while MultiTopics Subscribing- "
<< consumerStr_ << " result: " << result)
topicPromise->setFailed(result);
return;
}
subscribeTopicPartitions(lookupDataResult->getPartitions(), topicName, subscriptionName_,
topicPromise);
});
} else {
auto numPartitions = entry->second;
lock.unlock();
subscribeTopicPartitions(numPartitions, topicName, subscriptionName_, topicPromise);
}
return topicPromise->getFuture();
}

void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
const LookupDataResultPtr partitionMetadata,
TopicNamePtr topicName,
void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicNamePtr topicName,
const std::string& consumerName,
ConsumerConfiguration conf,
ConsumerSubResultPromisePtr topicSubResultPromise) {
if (result != ResultOk) {
LOG_ERROR("Error Checking/Getting Partition Metadata while MultiTopics Subscribing- "
<< consumerStr_ << " result: " << result)
topicSubResultPromise->setFailed(result);
return;
}

std::shared_ptr<ConsumerImpl> consumer;
ConsumerConfiguration config = conf_.clone();
ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get();

config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, shared_from_this(),
std::placeholders::_1, std::placeholders::_2));

int numPartitions = partitionMetadata->getPartitions();
int partitions = numPartitions == 0 ? 1 : numPartitions;

// Apply total limit of receiver queue size across partitions
Expand All @@ -160,7 +173,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
(int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / partitions)));

Lock lock(mutex_);
topicsPartitions_.insert(std::make_pair(topicName->toString(), partitions));
topicsPartitions_[topicName->toString()] = partitions;
lock.unlock();
numberTopicPartitions_->fetch_add(partitions);

Expand Down Expand Up @@ -214,10 +227,13 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated(
return;
}

LOG_DEBUG("Successfully Subscribed to a single partition of topic in TopicsConsumer. "
<< "Partitions need to create - " << previous - 1);
LOG_INFO("Successfully Subscribed to a single partition of topic in TopicsConsumer. "
<< "Partitions need to create : " << previous - 1);

if (partitionsNeedCreate->load() == 0) {
if (partitionsUpdateTimer_) {
runPartitionUpdateTask();
}
topicSubResultPromise->setValue(Consumer(shared_from_this()));
}
}
Expand Down Expand Up @@ -274,13 +290,17 @@ void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
}

void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic, ResultCallback callback) {
Lock lock(mutex_);
std::map<std::string, int>::iterator it = topicsPartitions_.find(topic);
if (it == topicsPartitions_.end()) {
lock.unlock();
LOG_ERROR("TopicsConsumer does not subscribe topic : " << topic << " subscription - "
<< subscriptionName_);
callback(ResultTopicNotFound);
return;
}
int numberPartitions = it->second;
lock.unlock();

const auto state = state_.load();
if (state == Closing || state == Closed) {
Expand All @@ -295,7 +315,6 @@ void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic,
LOG_ERROR("TopicName invalid: " << topic);
callback(ResultUnknownError);
}
int numberPartitions = it->second;
std::shared_ptr<std::atomic<int>> consumerUnsubed = std::make_shared<std::atomic<int>>(0);

for (int i = 0; i < numberPartitions; i++) {
Expand Down Expand Up @@ -683,7 +702,15 @@ void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback c
}

void MultiTopicsConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
callback(ResultOperationNotSupported);
if (state_ != Ready) {
callback(ResultAlreadyClosed);
return;
}

MultiResultCallback multiResultCallback(callback, consumers_.size());
consumers_.forEachValue([&timestamp, &multiResultCallback](ConsumerImplPtr consumer) {
consumer->seekAsync(timestamp, multiResultCallback);
});
}

void MultiTopicsConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) {
Expand Down Expand Up @@ -711,3 +738,85 @@ uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
});
return numberOfConnectedConsumer;
}
void MultiTopicsConsumerImpl::runPartitionUpdateTask() {
partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
auto self = shared_from_this();
partitionsUpdateTimer_->async_wait([self](const boost::system::error_code& ec) {
// If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
// cannot continue at this time, and the request needs to be ignored.
if (!ec) {
self->topicPartitionUpdate();
}
});
}
void MultiTopicsConsumerImpl::topicPartitionUpdate() {
using namespace std::placeholders;
Lock lock(mutex_);
auto topicsPartitions = topicsPartitions_;
lock.unlock();
for (const auto& item : topicsPartitions) {
auto topicName = TopicName::get(item.first);
auto currentNumPartitions = item.second;
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
std::bind(&MultiTopicsConsumerImpl::handleGetPartitions, shared_from_this(), topicName,
std::placeholders::_1, std::placeholders::_2, currentNumPartitions));
}
}
void MultiTopicsConsumerImpl::handleGetPartitions(TopicNamePtr topicName, Result result,
const LookupDataResultPtr& lookupDataResult,
int currentNumPartitions) {
if (state_ != Ready) {
return;
}
if (!result) {
const auto newNumPartitions = static_cast<unsigned int>(lookupDataResult->getPartitions());
if (newNumPartitions > currentNumPartitions) {
LOG_INFO("new partition count: " << newNumPartitions
<< " current partition count: " << currentNumPartitions);
auto partitionsNeedCreate =
std::make_shared<std::atomic<int>>(newNumPartitions - currentNumPartitions);
ConsumerSubResultPromisePtr topicPromise = std::make_shared<Promise<Result, Consumer>>();
Lock lock(mutex_);
topicsPartitions_[topicName->toString()] = newNumPartitions;
lock.unlock();
numberTopicPartitions_->fetch_add(newNumPartitions - currentNumPartitions);
for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) {
subscribeSingleNewConsumer(newNumPartitions, topicName, i, topicPromise,
partitionsNeedCreate);
}
// `runPartitionUpdateTask()` will be called in `handleSingleConsumerCreated()`
return;
}
} else {
LOG_WARN("Failed to getPartitionMetadata: " << strResult(result));
}
runPartitionUpdateTask();
}

void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
int numPartitions, TopicNamePtr topicName, int partitionIndex,
ConsumerSubResultPromisePtr topicSubResultPromise,
std::shared_ptr<std::atomic<int>> partitionsNeedCreate) {
ConsumerConfiguration config = conf_.clone();
ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get();
config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, shared_from_this(),
std::placeholders::_1, std::placeholders::_2));

// Apply total limit of receiver queue size across partitions
config.setReceiverQueueSize(
std::min(conf_.getReceiverQueueSize(),
(int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions)));

std::string topicPartitionName = topicName->getTopicPartitionName(partitionIndex);

auto consumer = std::make_shared<ConsumerImpl>(client_, topicPartitionName, subscriptionName_, config,
internalListenerExecutor, true, Partitioned);
consumer->getConsumerCreatedFuture().addListener(
std::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(),
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
consumer->setPartitionIndex(partitionIndex);
consumer->start();
consumers_.emplace(topicPartitionName, consumer);
LOG_INFO("Add Creating Consumer for - " << topicPartitionName << " - " << consumerStr_
<< " consumerSize: " << consumers_.size());
}
27 changes: 21 additions & 6 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,14 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
};
MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics,
const std::string& subscriptionName, TopicNamePtr topicName,
const ConsumerConfiguration& conf, const LookupServicePtr lookupServicePtr_);
const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr_);
MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName, int numPartitions,
const std::string& subscriptionName, const ConsumerConfiguration& conf,
LookupServicePtr lookupServicePtr)
: MultiTopicsConsumerImpl(client, {topicName->toString()}, subscriptionName, topicName, conf,
lookupServicePtr) {
topicsPartitions_[topicName->toString()] = numPartitions;
}
~MultiTopicsConsumerImpl();
// overrided methods from ConsumerImplBase
Future<Result, ConsumerImplBaseWeakPtr> getConsumerCreatedFuture() override;
Expand Down Expand Up @@ -101,14 +108,16 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
std::mutex pendingReceiveMutex_;
std::atomic<MultiTopicsConsumerState> state_{Pending};
BlockingQueue<Message> messages_;
ExecutorServicePtr listenerExecutor_;
const ExecutorServicePtr listenerExecutor_;
MessageListener messageListener_;
DeadlineTimerPtr partitionsUpdateTimer_;
boost::posix_time::time_duration partitionsUpdateInterval_;
LookupServicePtr lookupServicePtr_;
std::shared_ptr<std::atomic<int>> numberTopicPartitions_;
std::atomic<Result> failedResult{ResultOk};
Promise<Result, ConsumerImplBaseWeakPtr> multiTopicsConsumerCreatedPromise_;
UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
const std::vector<std::string>& topics_;
const std::vector<std::string> topics_;
std::queue<ReceiveCallback> pendingReceives_;

/* methods */
Expand All @@ -122,9 +131,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,

void handleOneTopicSubscribed(Result result, Consumer consumer, const std::string& topic,
std::shared_ptr<std::atomic<int>> topicsNeedCreate);
void subscribeTopicPartitions(const Result result, const LookupDataResultPtr partitionMetadata,
TopicNamePtr topicName, const std::string& consumerName,
ConsumerConfiguration conf,
void subscribeTopicPartitions(int numPartitions, TopicNamePtr topicName, const std::string& consumerName,
ConsumerSubResultPromisePtr topicSubResultPromise);
void handleSingleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
std::shared_ptr<std::atomic<int>> partitionsNeedCreate,
Expand All @@ -134,11 +141,19 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
void handleOneTopicUnsubscribedAsync(Result result, std::shared_ptr<std::atomic<int>> consumerUnsubed,
int numberPartitions, TopicNamePtr topicNamePtr,
std::string& topicPartitionName, ResultCallback callback);
void runPartitionUpdateTask();
void topicPartitionUpdate();
void handleGetPartitions(TopicNamePtr topicName, Result result,
const LookupDataResultPtr& lookupDataResult, int currentNumPartitions);
void subscribeSingleNewConsumer(int numPartitions, TopicNamePtr topicName, int partitionIndex,
ConsumerSubResultPromisePtr topicSubResultPromise,
std::shared_ptr<std::atomic<int>> partitionsNeedCreate);

private:
void setNegativeAcknowledgeEnabledForTesting(bool enabled) override;

FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
};

typedef std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImplPtr;
Expand Down
Loading