diff --git a/pulsar-client-cpp/.gitignore b/pulsar-client-cpp/.gitignore index 0d8d323e7ff97..349be85a476eb 100644 --- a/pulsar-client-cpp/.gitignore +++ b/pulsar-client-cpp/.gitignore @@ -71,6 +71,7 @@ apidocs/ generated/ # CMAKE +.cmake Makefile cmake_install.cmake CMakeFiles diff --git a/pulsar-client-cpp/include/pulsar/Consumer.h b/pulsar-client-cpp/include/pulsar/Consumer.h index f1e180aea2db9..6c0ab27b06c75 100644 --- a/pulsar-client-cpp/include/pulsar/Consumer.h +++ b/pulsar-client-cpp/include/pulsar/Consumer.h @@ -407,7 +407,6 @@ class PULSAR_PUBLIC Consumer { friend class PulsarFriend; friend class PulsarWrapper; - friend class PartitionedConsumerImpl; friend class MultiTopicsConsumerImpl; friend class ConsumerImpl; friend class ClientImpl; diff --git a/pulsar-client-cpp/include/pulsar/Message.h b/pulsar-client-cpp/include/pulsar/Message.h index 9cea48f26580d..935236bd5bb5b 100644 --- a/pulsar-client-cpp/include/pulsar/Message.h +++ b/pulsar-client-cpp/include/pulsar/Message.h @@ -179,7 +179,6 @@ class PULSAR_PUBLIC Message { Message(const MessageId& messageId, proto::MessageMetadata& metadata, SharedBuffer& payload, proto::SingleMessageMetadata& singleMetadata, const std::string& topicName); friend class PartitionedProducerImpl; - friend class PartitionedConsumerImpl; friend class MultiTopicsConsumerImpl; friend class MessageBuilder; friend class ConsumerImpl; diff --git a/pulsar-client-cpp/include/pulsar/MessageId.h b/pulsar-client-cpp/include/pulsar/MessageId.h index de64d1d822413..06be790c1ea4b 100644 --- a/pulsar-client-cpp/include/pulsar/MessageId.h +++ b/pulsar-client-cpp/include/pulsar/MessageId.h @@ -94,7 +94,6 @@ class PULSAR_PUBLIC MessageId { friend class MessageImpl; friend class Commands; friend class PartitionedProducerImpl; - friend class PartitionedConsumerImpl; friend class MultiTopicsConsumerImpl; friend class UnAckedMessageTrackerEnabled; friend class BatchAcknowledgementTracker; diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc index d15e247347a6f..fba9a7e1593a6 100644 --- a/pulsar-client-cpp/lib/ClientImpl.cc +++ b/pulsar-client-cpp/lib/ClientImpl.cc @@ -23,7 +23,6 @@ #include "ProducerImpl.h" #include "ReaderImpl.h" #include "PartitionedProducerImpl.h" -#include "PartitionedConsumerImpl.h" #include "MultiTopicsConsumerImpl.h" #include "PatternMultiTopicsConsumerImpl.h" #include "TimeUtils.h" @@ -381,8 +380,9 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr callback(ResultInvalidConfiguration, Consumer()); return; } - consumer = std::make_shared( - shared_from_this(), subscriptionName, topicName, partitionMetadata->getPartitions(), conf); + consumer = std::make_shared(shared_from_this(), topicName, + partitionMetadata->getPartitions(), + subscriptionName, conf, lookupServicePtr_); } else { auto consumerImpl = std::make_shared(shared_from_this(), topicName->toString(), subscriptionName, conf); diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h index 78140f84b54a8..70fda0170cc1a 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.h +++ b/pulsar-client-cpp/lib/ConsumerImpl.h @@ -307,7 +307,6 @@ class ConsumerImpl : public ConsumerImplBase, // these two declared friend to access setNegativeAcknowledgeEnabledForTesting friend class MultiTopicsConsumerImpl; - friend class PartitionedConsumerImpl; FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery); FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery); diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc index ad30d5cca5012..a9bdbea908245 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc @@ -17,6 +17,7 @@ * under the License. */ #include "MultiTopicsConsumerImpl.h" +#include "MultiResultCallback.h" DECLARE_LOG_OBJECT() @@ -25,7 +26,7 @@ using namespace pulsar; MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector& topics, const std::string& subscriptionName, TopicNamePtr topicName, const ConsumerConfiguration& conf, - const LookupServicePtr lookupServicePtr) + LookupServicePtr lookupServicePtr) : client_(client), subscriptionName_(subscriptionName), topic_(topicName ? topicName->toString() : "EmptyTopics"), @@ -52,6 +53,12 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std } else { unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled()); } + auto partitionsUpdateInterval = static_cast(client_->conf().getPartitionsUpdateInterval()); + if (partitionsUpdateInterval > 0) { + partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer(); + partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval); + lookupServicePtr_ = client_->getLookup(); + } } void MultiTopicsConsumerImpl::start() { @@ -125,25 +132,32 @@ Future MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s } // subscribe for each partition, when all partitions completed, complete promise - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(std::bind( - &MultiTopicsConsumerImpl::subscribeTopicPartitions, shared_from_this(), std::placeholders::_1, - std::placeholders::_2, topicName, subscriptionName_, conf_, topicPromise)); + Lock lock(mutex_); + auto entry = topicsPartitions_.find(topic); + if (entry == topicsPartitions_.end()) { + lock.unlock(); + lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + [this, topicName, topicPromise](Result result, const LookupDataResultPtr& lookupDataResult) { + if (result != ResultOk) { + LOG_ERROR("Error Checking/Getting Partition Metadata while MultiTopics Subscribing- " + << consumerStr_ << " result: " << result) + topicPromise->setFailed(result); + return; + } + subscribeTopicPartitions(lookupDataResult->getPartitions(), topicName, subscriptionName_, + topicPromise); + }); + } else { + auto numPartitions = entry->second; + lock.unlock(); + subscribeTopicPartitions(numPartitions, topicName, subscriptionName_, topicPromise); + } return topicPromise->getFuture(); } -void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result, - const LookupDataResultPtr partitionMetadata, - TopicNamePtr topicName, +void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicNamePtr topicName, const std::string& consumerName, - ConsumerConfiguration conf, ConsumerSubResultPromisePtr topicSubResultPromise) { - if (result != ResultOk) { - LOG_ERROR("Error Checking/Getting Partition Metadata while MultiTopics Subscribing- " - << consumerStr_ << " result: " << result) - topicSubResultPromise->setFailed(result); - return; - } - std::shared_ptr consumer; ConsumerConfiguration config = conf_.clone(); ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get(); @@ -151,7 +165,6 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result, config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); - int numPartitions = partitionMetadata->getPartitions(); int partitions = numPartitions == 0 ? 1 : numPartitions; // Apply total limit of receiver queue size across partitions @@ -160,7 +173,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result, (int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / partitions))); Lock lock(mutex_); - topicsPartitions_.insert(std::make_pair(topicName->toString(), partitions)); + topicsPartitions_[topicName->toString()] = partitions; lock.unlock(); numberTopicPartitions_->fetch_add(partitions); @@ -214,10 +227,13 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated( return; } - LOG_DEBUG("Successfully Subscribed to a single partition of topic in TopicsConsumer. " - << "Partitions need to create - " << previous - 1); + LOG_INFO("Successfully Subscribed to a single partition of topic in TopicsConsumer. " + << "Partitions need to create : " << previous - 1); if (partitionsNeedCreate->load() == 0) { + if (partitionsUpdateTimer_) { + runPartitionUpdateTask(); + } topicSubResultPromise->setValue(Consumer(shared_from_this())); } } @@ -274,13 +290,17 @@ void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result, } void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic, ResultCallback callback) { + Lock lock(mutex_); std::map::iterator it = topicsPartitions_.find(topic); if (it == topicsPartitions_.end()) { + lock.unlock(); LOG_ERROR("TopicsConsumer does not subscribe topic : " << topic << " subscription - " << subscriptionName_); callback(ResultTopicNotFound); return; } + int numberPartitions = it->second; + lock.unlock(); const auto state = state_.load(); if (state == Closing || state == Closed) { @@ -295,7 +315,6 @@ void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic, LOG_ERROR("TopicName invalid: " << topic); callback(ResultUnknownError); } - int numberPartitions = it->second; std::shared_ptr> consumerUnsubed = std::make_shared>(0); for (int i = 0; i < numberPartitions; i++) { @@ -683,7 +702,15 @@ void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback c } void MultiTopicsConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) { - callback(ResultOperationNotSupported); + if (state_ != Ready) { + callback(ResultAlreadyClosed); + return; + } + + MultiResultCallback multiResultCallback(callback, consumers_.size()); + consumers_.forEachValue([×tamp, &multiResultCallback](ConsumerImplPtr consumer) { + consumer->seekAsync(timestamp, multiResultCallback); + }); } void MultiTopicsConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) { @@ -711,3 +738,85 @@ uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() { }); return numberOfConnectedConsumer; } +void MultiTopicsConsumerImpl::runPartitionUpdateTask() { + partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_); + auto self = shared_from_this(); + partitionsUpdateTimer_->async_wait([self](const boost::system::error_code& ec) { + // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it + // cannot continue at this time, and the request needs to be ignored. + if (!ec) { + self->topicPartitionUpdate(); + } + }); +} +void MultiTopicsConsumerImpl::topicPartitionUpdate() { + using namespace std::placeholders; + Lock lock(mutex_); + auto topicsPartitions = topicsPartitions_; + lock.unlock(); + for (const auto& item : topicsPartitions) { + auto topicName = TopicName::get(item.first); + auto currentNumPartitions = item.second; + lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + std::bind(&MultiTopicsConsumerImpl::handleGetPartitions, shared_from_this(), topicName, + std::placeholders::_1, std::placeholders::_2, currentNumPartitions)); + } +} +void MultiTopicsConsumerImpl::handleGetPartitions(TopicNamePtr topicName, Result result, + const LookupDataResultPtr& lookupDataResult, + int currentNumPartitions) { + if (state_ != Ready) { + return; + } + if (!result) { + const auto newNumPartitions = static_cast(lookupDataResult->getPartitions()); + if (newNumPartitions > currentNumPartitions) { + LOG_INFO("new partition count: " << newNumPartitions + << " current partition count: " << currentNumPartitions); + auto partitionsNeedCreate = + std::make_shared>(newNumPartitions - currentNumPartitions); + ConsumerSubResultPromisePtr topicPromise = std::make_shared>(); + Lock lock(mutex_); + topicsPartitions_[topicName->toString()] = newNumPartitions; + lock.unlock(); + numberTopicPartitions_->fetch_add(newNumPartitions - currentNumPartitions); + for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) { + subscribeSingleNewConsumer(newNumPartitions, topicName, i, topicPromise, + partitionsNeedCreate); + } + // `runPartitionUpdateTask()` will be called in `handleSingleConsumerCreated()` + return; + } + } else { + LOG_WARN("Failed to getPartitionMetadata: " << strResult(result)); + } + runPartitionUpdateTask(); +} + +void MultiTopicsConsumerImpl::subscribeSingleNewConsumer( + int numPartitions, TopicNamePtr topicName, int partitionIndex, + ConsumerSubResultPromisePtr topicSubResultPromise, + std::shared_ptr> partitionsNeedCreate) { + ConsumerConfiguration config = conf_.clone(); + ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get(); + config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, shared_from_this(), + std::placeholders::_1, std::placeholders::_2)); + + // Apply total limit of receiver queue size across partitions + config.setReceiverQueueSize( + std::min(conf_.getReceiverQueueSize(), + (int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions))); + + std::string topicPartitionName = topicName->getTopicPartitionName(partitionIndex); + + auto consumer = std::make_shared(client_, topicPartitionName, subscriptionName_, config, + internalListenerExecutor, true, Partitioned); + consumer->getConsumerCreatedFuture().addListener( + std::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(), + std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise)); + consumer->setPartitionIndex(partitionIndex); + consumer->start(); + consumers_.emplace(topicPartitionName, consumer); + LOG_INFO("Add Creating Consumer for - " << topicPartitionName << " - " << consumerStr_ + << " consumerSize: " << consumers_.size()); +} diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h index 0f111110c44a1..95c24f68c5b78 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h @@ -51,7 +51,14 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, }; MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector& topics, const std::string& subscriptionName, TopicNamePtr topicName, - const ConsumerConfiguration& conf, const LookupServicePtr lookupServicePtr_); + const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr_); + MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName, int numPartitions, + const std::string& subscriptionName, const ConsumerConfiguration& conf, + LookupServicePtr lookupServicePtr) + : MultiTopicsConsumerImpl(client, {topicName->toString()}, subscriptionName, topicName, conf, + lookupServicePtr) { + topicsPartitions_[topicName->toString()] = numPartitions; + } ~MultiTopicsConsumerImpl(); // overrided methods from ConsumerImplBase Future getConsumerCreatedFuture() override; @@ -101,14 +108,16 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, std::mutex pendingReceiveMutex_; std::atomic state_{Pending}; BlockingQueue messages_; - ExecutorServicePtr listenerExecutor_; + const ExecutorServicePtr listenerExecutor_; MessageListener messageListener_; + DeadlineTimerPtr partitionsUpdateTimer_; + boost::posix_time::time_duration partitionsUpdateInterval_; LookupServicePtr lookupServicePtr_; std::shared_ptr> numberTopicPartitions_; std::atomic failedResult{ResultOk}; Promise multiTopicsConsumerCreatedPromise_; UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_; - const std::vector& topics_; + const std::vector topics_; std::queue pendingReceives_; /* methods */ @@ -122,9 +131,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, void handleOneTopicSubscribed(Result result, Consumer consumer, const std::string& topic, std::shared_ptr> topicsNeedCreate); - void subscribeTopicPartitions(const Result result, const LookupDataResultPtr partitionMetadata, - TopicNamePtr topicName, const std::string& consumerName, - ConsumerConfiguration conf, + void subscribeTopicPartitions(int numPartitions, TopicNamePtr topicName, const std::string& consumerName, ConsumerSubResultPromisePtr topicSubResultPromise); void handleSingleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr, std::shared_ptr> partitionsNeedCreate, @@ -134,11 +141,19 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, void handleOneTopicUnsubscribedAsync(Result result, std::shared_ptr> consumerUnsubed, int numberPartitions, TopicNamePtr topicNamePtr, std::string& topicPartitionName, ResultCallback callback); + void runPartitionUpdateTask(); + void topicPartitionUpdate(); + void handleGetPartitions(TopicNamePtr topicName, Result result, + const LookupDataResultPtr& lookupDataResult, int currentNumPartitions); + void subscribeSingleNewConsumer(int numPartitions, TopicNamePtr topicName, int partitionIndex, + ConsumerSubResultPromisePtr topicSubResultPromise, + std::shared_ptr> partitionsNeedCreate); private: void setNegativeAcknowledgeEnabledForTesting(bool enabled) override; FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery); + FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery); }; typedef std::shared_ptr MultiTopicsConsumerImplPtr; diff --git a/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.cc b/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.cc deleted file mode 100644 index 9d5965b24bd92..0000000000000 --- a/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.cc +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -#include -#include -#include -#include - -namespace pulsar { - -const std::string PartitionedBrokerConsumerStatsImpl::DELIMITER = ";"; - -PartitionedBrokerConsumerStatsImpl::PartitionedBrokerConsumerStatsImpl(size_t size) { - statsList_.resize(size); -} - -bool PartitionedBrokerConsumerStatsImpl::isValid() const { - bool isValid = true; - for (int i = 0; i < statsList_.size(); i++) { - isValid &= statsList_[i].isValid(); - } - return isValid; -} - -std::ostream& operator<<(std::ostream& os, const PartitionedBrokerConsumerStatsImpl& obj) { - os << "\nPartitionedBrokerConsumerStatsImpl [" - << "validTill_ = " << obj.isValid() << ", msgRateOut_ = " << obj.getMsgRateOut() - << ", msgThroughputOut_ = " << obj.getMsgThroughputOut() - << ", msgRateRedeliver_ = " << obj.getMsgRateRedeliver() - << ", consumerName_ = " << obj.getConsumerName() - << ", availablePermits_ = " << obj.getAvailablePermits() - << ", unackedMessages_ = " << obj.getUnackedMessages() - << ", blockedConsumerOnUnackedMsgs_ = " << obj.isBlockedConsumerOnUnackedMsgs() - << ", address_ = " << obj.getAddress() << ", connectedSince_ = " << obj.getConnectedSince() - << ", type_ = " << obj.getType() << ", msgRateExpired_ = " << obj.getMsgRateExpired() - << ", msgBacklog_ = " << obj.getMsgBacklog() << "]"; - return os; -} - -double PartitionedBrokerConsumerStatsImpl::getMsgRateOut() const { - double sum = 0; - for (int i = 0; i < statsList_.size(); i++) { - sum += statsList_[i].getMsgRateOut(); - } - return sum; -} - -double PartitionedBrokerConsumerStatsImpl::getMsgThroughputOut() const { - double sum = 0; - for (int i = 0; i < statsList_.size(); i++) { - sum += statsList_[i].getMsgThroughputOut(); - } - return sum; -} - -double PartitionedBrokerConsumerStatsImpl::getMsgRateRedeliver() const { - double sum = 0; - for (int i = 0; i < statsList_.size(); i++) { - sum += statsList_[i].getMsgRateRedeliver(); - } - return sum; -} - -const std::string PartitionedBrokerConsumerStatsImpl::getConsumerName() const { - std::string str; - for (int i = 0; i < statsList_.size(); i++) { - str += statsList_[i].getConsumerName() + DELIMITER; - } - return str; -} - -uint64_t PartitionedBrokerConsumerStatsImpl::getAvailablePermits() const { - uint64_t sum = 0; - for (int i = 0; i < statsList_.size(); i++) { - sum += statsList_[i].getAvailablePermits(); - } - return sum; -} - -uint64_t PartitionedBrokerConsumerStatsImpl::getUnackedMessages() const { - uint64_t sum = 0; - for (int i = 0; i < statsList_.size(); i++) { - sum += statsList_[i].getUnackedMessages(); - } - return sum; -} - -bool PartitionedBrokerConsumerStatsImpl::isBlockedConsumerOnUnackedMsgs() const { - if (statsList_.size() == 0) { - return false; - } - - bool isValid = true; - for (int i = 0; i < statsList_.size(); i++) { - isValid &= statsList_[i].isValid(); - } - return isValid; -} - -const std::string PartitionedBrokerConsumerStatsImpl::getAddress() const { - std::string str; - for (int i = 0; i < statsList_.size(); i++) { - str += statsList_[i].getAddress() + DELIMITER; - } - return str; -} - -const std::string PartitionedBrokerConsumerStatsImpl::getConnectedSince() const { - std::string str; - for (int i = 0; i < statsList_.size(); i++) { - str += statsList_[i].getConnectedSince() + DELIMITER; - } - return str; -} - -const ConsumerType PartitionedBrokerConsumerStatsImpl::getType() const { - if (!statsList_.size()) { - return ConsumerExclusive; - } - return statsList_[0].getType(); -} - -double PartitionedBrokerConsumerStatsImpl::getMsgRateExpired() const { - double sum = 0; - for (int i = 0; i < statsList_.size(); i++) { - sum += statsList_[i].getMsgRateExpired(); - } - return sum; -} - -uint64_t PartitionedBrokerConsumerStatsImpl::getMsgBacklog() const { - uint64_t sum = 0; - for (int i = 0; i < statsList_.size(); i++) { - sum += statsList_[i].getMsgBacklog(); - } - return sum; -} - -BrokerConsumerStats PartitionedBrokerConsumerStatsImpl::getBrokerConsumerStats(int index) { - return statsList_[index]; -} - -void PartitionedBrokerConsumerStatsImpl::add(BrokerConsumerStats stats, int index) { - statsList_[index] = stats; -} - -void PartitionedBrokerConsumerStatsImpl::clear() { statsList_.clear(); } -} // namespace pulsar diff --git a/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.h b/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.h deleted file mode 100644 index 683f5245dbb2c..0000000000000 --- a/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.h +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -#ifndef PULSAR_CPP_PARTITIONEDBROKERCONSUMERSTATSIMPL_H -#define PULSAR_CPP_PARTITIONEDBROKERCONSUMERSTATSIMPL_H - -#include -#include -#include -#include -#include -#include -#include -#include - -namespace pulsar { -class PULSAR_PUBLIC PartitionedBrokerConsumerStatsImpl : public BrokerConsumerStatsImplBase { - private: - std::vector statsList_; - static const std::string DELIMITER; - - public: - PartitionedBrokerConsumerStatsImpl(size_t size); - - /** Returns true if the Stats are still valid **/ - virtual bool isValid() const; - - /** Returns the rate of messages delivered to the consumer. msg/s */ - virtual double getMsgRateOut() const; - - /** Returns the throughput delivered to the consumer. bytes/s */ - virtual double getMsgThroughputOut() const; - - /** Returns the rate of messages redelivered by this consumer. msg/s */ - virtual double getMsgRateRedeliver() const; - - /** Returns the Name of the consumer */ - virtual const std::string getConsumerName() const; - - /** Returns the Number of available message permits for the consumer */ - virtual uint64_t getAvailablePermits() const; - - /** Returns the Number of unacknowledged messages for the consumer */ - virtual uint64_t getUnackedMessages() const; - - /** Returns true if the consumer is blocked due to unacked messages. */ - virtual bool isBlockedConsumerOnUnackedMsgs() const; - - /** Returns the Address of this consumer */ - virtual const std::string getAddress() const; - - /** Returns the Timestamp of connection */ - virtual const std::string getConnectedSince() const; - - /** Returns Whether this subscription is Exclusive or Shared or Failover */ - virtual const ConsumerType getType() const; - - /** Returns the rate of messages expired on this subscription. msg/s */ - virtual double getMsgRateExpired() const; - - /** Returns the Number of messages in the subscription backlog */ - virtual uint64_t getMsgBacklog() const; - - /** Returns the BrokerConsumerStatsImpl at of ith partition */ - BrokerConsumerStats getBrokerConsumerStats(int index); - - void add(BrokerConsumerStats stats, int index); - - void clear(); - - friend std::ostream &operator<<(std::ostream &os, const PartitionedBrokerConsumerStatsImpl &obj); -}; -typedef std::shared_ptr PartitionedBrokerConsumerStatsPtr; -} // namespace pulsar -#endif // PULSAR_CPP_BROKERCONSUMERSTATSIMPL_H diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc deleted file mode 100644 index 54bb4b72c900d..0000000000000 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc +++ /dev/null @@ -1,615 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -#include "PartitionedConsumerImpl.h" -#include "MultiResultCallback.h" - -DECLARE_LOG_OBJECT() - -namespace pulsar { - -PartitionedConsumerImpl::PartitionedConsumerImpl(ClientImplPtr client, const std::string& subscriptionName, - const TopicNamePtr topicName, - const unsigned int numPartitions, - const ConsumerConfiguration& conf) - : client_(client), - subscriptionName_(subscriptionName), - topicName_(topicName), - numPartitions_(numPartitions), - conf_(conf), - messages_(1000), - listenerExecutor_(client->getListenerExecutorProvider()->get()), - messageListener_(conf.getMessageListener()), - topic_(topicName->toString()) { - std::stringstream consumerStrStream; - consumerStrStream << "[Partitioned Consumer: " << topic_ << "," << subscriptionName << "," - << numPartitions << "]"; - if (conf.getUnAckedMessagesTimeoutMs() != 0) { - if (conf.getTickDurationInMs() > 0) { - unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled( - conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this)); - } else { - unAckedMessageTrackerPtr_.reset( - new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this)); - } - } else { - unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled()); - } - auto partitionsUpdateInterval = static_cast(client_->conf().getPartitionsUpdateInterval()); - if (partitionsUpdateInterval > 0) { - partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer(); - partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval); - lookupServicePtr_ = client_->getLookup(); - } -} - -PartitionedConsumerImpl::~PartitionedConsumerImpl() {} - -Future PartitionedConsumerImpl::getConsumerCreatedFuture() { - return partitionedConsumerCreatedPromise_.getFuture(); -} -const std::string& PartitionedConsumerImpl::getSubscriptionName() const { return subscriptionName_; } - -const std::string& PartitionedConsumerImpl::getTopic() const { return topic_; } - -Result PartitionedConsumerImpl::receive(Message& msg) { - if (state_ != Ready) { - return ResultAlreadyClosed; - } - // See comments in `receive(Message&, int)` - - if (messageListener_) { - LOG_ERROR("Can not receive when a listener has been set"); - return ResultInvalidConfiguration; - } - - messages_.pop(msg); - unAckedMessageTrackerPtr_->add(msg.getMessageId()); - return ResultOk; -} - -Result PartitionedConsumerImpl::receive(Message& msg, int timeout) { - if (state_ != Ready) { - return ResultAlreadyClosed; - } - - if (messageListener_) { - LOG_ERROR("Can not receive when a listener has been set"); - return ResultInvalidConfiguration; - } - - if (messages_.pop(msg, std::chrono::milliseconds(timeout))) { - unAckedMessageTrackerPtr_->add(msg.getMessageId()); - return ResultOk; - } else { - if (state_ != Ready) { - return ResultAlreadyClosed; - } - return ResultTimeout; - } -} - -void PartitionedConsumerImpl::receiveAsync(ReceiveCallback& callback) { - Message msg; - - // fail the callback if consumer is closing or closed - if (state_ != Ready) { - callback(ResultAlreadyClosed, msg); - return; - } - - Lock lock(pendingReceiveMutex_); - if (messages_.pop(msg, std::chrono::milliseconds(0))) { - lock.unlock(); - unAckedMessageTrackerPtr_->add(msg.getMessageId()); - callback(ResultOk, msg); - } else { - pendingReceives_.push(callback); - } -} - -void PartitionedConsumerImpl::unsubscribeAsync(ResultCallback callback) { - LOG_INFO("[" << topicName_->toString() << "," << subscriptionName_ << "] Unsubscribing"); - // change state to Closing, so that no Ready state operation is permitted during unsubscribe - state_ = Closing; - // do not accept un subscribe until we have subscribe to all of the partitions of a topic - // it's a logical single topic so it should behave like a single topic, even if it's sharded - unsigned int index = 0; - for (ConsumerList::const_iterator consumer = consumers_.begin(); consumer != consumers_.end(); - consumer++) { - LOG_DEBUG("Unsubcribing Consumer - " << index << " for Subscription - " << subscriptionName_ - << " for Topic - " << topicName_->toString()); - (*consumer)->unsubscribeAsync(std::bind(&PartitionedConsumerImpl::handleUnsubscribeAsync, - shared_from_this(), std::placeholders::_1, index++, - callback)); - } -} - -void PartitionedConsumerImpl::handleUnsubscribeAsync(Result result, unsigned int consumerIndex, - ResultCallback callback) { - if (state_ == Failed) { - // we have already informed the client that unsubcribe has failed so, ignore this callbacks - // or do we still go ahead and check how many could we close successfully? - LOG_DEBUG("handleUnsubscribeAsync callback received in Failed State for consumerIndex - " - << consumerIndex << "with Result - " << result << " for Subscription - " - << subscriptionName_ << " for Topic - " << topicName_->toString()); - return; - } - if (result != ResultOk) { - state_ = Failed; - LOG_ERROR("Error Closing one of the parition consumers, consumerIndex - " << consumerIndex); - callback(ResultUnknownError); - return; - } - const auto numPartitions = getNumPartitionsWithLock(); - assert(unsubscribedSoFar_ <= numPartitions); - assert(consumerIndex <= numPartitions); - // this means we have successfully closed this partition consumer and no unsubscribe has failed so far - LOG_INFO("Successfully Unsubscribed Consumer - " << consumerIndex << " for Subscription - " - << subscriptionName_ << " for Topic - " - << topicName_->toString()); - unsubscribedSoFar_++; - if (unsubscribedSoFar_ == numPartitions) { - LOG_DEBUG("Unsubscribed all of the partition consumer for subscription - " << subscriptionName_); - state_ = Closed; - callback(ResultOk); - return; - } -} - -void PartitionedConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) { - int32_t partition = msgId.partition(); -#ifndef NDEBUG - Lock consumersLock(consumersMutex_); - assert(partition < getNumPartitions() && partition >= 0 && consumers_.size() > partition); - consumersLock.unlock(); -#endif - unAckedMessageTrackerPtr_->remove(msgId); - consumers_[partition]->acknowledgeAsync(msgId, callback); -} - -void PartitionedConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) { - callback(ResultOperationNotSupported); -} - -void PartitionedConsumerImpl::negativeAcknowledge(const MessageId& msgId) { - int32_t partition = msgId.partition(); - unAckedMessageTrackerPtr_->remove(msgId); - consumers_[partition]->negativeAcknowledge(msgId); -} - -unsigned int PartitionedConsumerImpl::getNumPartitions() const { return numPartitions_; } - -unsigned int PartitionedConsumerImpl::getNumPartitionsWithLock() const { - Lock consumersLock(consumersMutex_); - return getNumPartitions(); -} - -ConsumerConfiguration PartitionedConsumerImpl::getSinglePartitionConsumerConfig() const { - using namespace std::placeholders; - - ConsumerConfiguration config = conf_.clone(); - // all the partitioned-consumer belonging to one partitioned topic should have same name - config.setConsumerName(conf_.getConsumerName()); - config.setConsumerType(conf_.getConsumerType()); - config.setBrokerConsumerStatsCacheTimeInMs(conf_.getBrokerConsumerStatsCacheTimeInMs()); - - const auto shared_this = const_cast(this)->shared_from_this(); - config.setMessageListener(std::bind(&PartitionedConsumerImpl::messageReceived, shared_this, - std::placeholders::_1, std::placeholders::_2)); - - // Apply total limit of receiver queue size across partitions - // NOTE: if it's called by handleGetPartitions(), the queue size of new internal consumers may be smaller - // than previous created internal consumers. - config.setReceiverQueueSize( - std::min(conf_.getReceiverQueueSize(), - (int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / getNumPartitions()))); - - return config; -} - -ConsumerImplPtr PartitionedConsumerImpl::newInternalConsumer(unsigned int partition, - const ConsumerConfiguration& config) const { - using namespace std::placeholders; - - std::string topicPartitionName = topicName_->getTopicPartitionName(partition); - auto consumer = std::make_shared(client_, topicPartitionName, subscriptionName_, config, - internalListenerExecutor_, true, Partitioned); - - const auto shared_this = const_cast(this)->shared_from_this(); - consumer->getConsumerCreatedFuture().addListener( - std::bind(&PartitionedConsumerImpl::handleSinglePartitionConsumerCreated, shared_this, - std::placeholders::_1, std::placeholders::_2, partition)); - consumer->setPartitionIndex(partition); - - LOG_DEBUG("Creating Consumer for single Partition - " << topicPartitionName << "SubName - " - << subscriptionName_); - return consumer; -} - -void PartitionedConsumerImpl::start() { - internalListenerExecutor_ = client_->getPartitionListenerExecutorProvider()->get(); - const auto config = getSinglePartitionConsumerConfig(); - - // create consumer on each partition - // Here we don't need `consumersMutex` to protect `consumers_`, because `consumers_` can only be increased - // when `state_` is Ready - for (unsigned int i = 0; i < getNumPartitions(); i++) { - consumers_.push_back(newInternalConsumer(i, config)); - } - for (ConsumerList::const_iterator consumer = consumers_.begin(); consumer != consumers_.end(); - consumer++) { - (*consumer)->start(); - } -} - -void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated( - Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr, unsigned int partitionIndex) { - ResultCallback nullCallbackForCleanup = NULL; - if (state_ == Failed) { - // one of the consumer creation failed, and we are cleaning up - return; - } - const auto numPartitions = getNumPartitionsWithLock(); - assert(numConsumersCreated_ < numPartitions); - - if (result != ResultOk) { - state_ = Failed; - partitionedConsumerCreatedPromise_.setFailed(result); - // unsubscribed all of the successfully subscribed partitioned consumers - closeAsync(nullCallbackForCleanup); - LOG_ERROR("Unable to create Consumer for partition - " << partitionIndex << " Error - " << result); - return; - } - - assert(partitionIndex < numPartitions && partitionIndex >= 0); - Lock lock(mutex_); - numConsumersCreated_++; - lock.unlock(); - if (numConsumersCreated_ == numPartitions) { - LOG_INFO("Successfully Subscribed to Partitioned Topic - " << topicName_->toString() << " with - " - << numPartitions << " Partitions."); - state_ = Ready; - if (partitionsUpdateTimer_) { - runPartitionUpdateTask(); - } - receiveMessages(); - partitionedConsumerCreatedPromise_.setValue(shared_from_this()); - return; - } -} - -void PartitionedConsumerImpl::handleSinglePartitionConsumerClose(Result result, unsigned int partitionIndex, - CloseCallback callback) { - if (state_ == Failed) { - // we should have already notified the client by callback - return; - } - if (result != ResultOk) { - state_ = Failed; - LOG_ERROR("Closing the consumer failed for partition - " << partitionIndex); - partitionedConsumerCreatedPromise_.setFailed(result); - if (callback) { - callback(result); - } - return; - } - assert(partitionIndex < getNumPartitionsWithLock() && partitionIndex >= 0); - Lock lock(mutex_); - if (numConsumersCreated_ > 0) { - numConsumersCreated_--; - } - lock.unlock(); - // closed all successfully - if (!numConsumersCreated_) { - state_ = Closed; - // set the producerCreatedPromise to failure - partitionedConsumerCreatedPromise_.setFailed(ResultUnknownError); - if (callback) { - callback(result); - } - return; - } -} -void PartitionedConsumerImpl::closeAsync(ResultCallback callback) { - Lock lock(consumersMutex_); - if (consumers_.empty()) { - notifyResult(callback); - return; - } - state_ = Closed; - unsigned int consumerAlreadyClosed = 0; - // close successfully subscribed consumers - // Here we don't need `consumersMutex` to protect `consumers_`, because `consumers_` can only be increased - // when `state_` is Ready - for (auto& consumer : consumers_) { - if (!consumer->isClosed()) { - auto self = shared_from_this(); - const auto partition = consumer->getPartitionIndex(); - consumer->closeAsync([this, self, partition, callback](Result result) { - handleSinglePartitionConsumerClose(result, partition, callback); - }); - } else { - if (++consumerAlreadyClosed == consumers_.size()) { - // everything is closed already. so we are good. - notifyResult(callback); - return; - } - } - } - - // fail pending recieve - failPendingReceiveCallback(); -} - -void PartitionedConsumerImpl::notifyResult(CloseCallback closeCallback) { - if (closeCallback) { - // this means client invoked the closeAsync with a valid callback - state_ = Closed; - closeCallback(ResultOk); - } else { - // consumer create failed, closeAsync called to cleanup the successfully created producers - state_ = Failed; - partitionedConsumerCreatedPromise_.setFailed(ResultUnknownError); - } -} - -void PartitionedConsumerImpl::shutdown() {} - -bool PartitionedConsumerImpl::isClosed() { return state_ == Closed; } - -bool PartitionedConsumerImpl::isOpen() { return state_ == Ready; } - -void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message& msg) { - LOG_DEBUG("Received Message from one of the partition - " << msg.impl_->messageId.partition()); - const std::string& topicPartitionName = consumer.getTopic(); - msg.impl_->setTopicName(topicPartitionName); - // messages_ is a blocking queue: if queue is already full then no need of lock as receiveAsync already - // gets available-msg and no need to put request in pendingReceives_ - Lock lock(pendingReceiveMutex_); - if (!pendingReceives_.empty()) { - ReceiveCallback callback = pendingReceives_.front(); - pendingReceives_.pop(); - lock.unlock(); - unAckedMessageTrackerPtr_->add(msg.getMessageId()); - listenerExecutor_->postWork(std::bind(callback, ResultOk, msg)); - } else { - if (messages_.full()) { - lock.unlock(); - } - if (messages_.push(msg) && messageListener_) { - unAckedMessageTrackerPtr_->add(msg.getMessageId()); - listenerExecutor_->postWork( - std::bind(&PartitionedConsumerImpl::internalListener, shared_from_this(), consumer)); - } - } -} - -void PartitionedConsumerImpl::failPendingReceiveCallback() { - Message msg; - - messages_.close(); - - Lock lock(pendingReceiveMutex_); - while (!pendingReceives_.empty()) { - ReceiveCallback callback = pendingReceives_.front(); - pendingReceives_.pop(); - listenerExecutor_->postWork(std::bind(callback, ResultAlreadyClosed, msg)); - } - lock.unlock(); -} - -void PartitionedConsumerImpl::internalListener(Consumer consumer) { - Message m; - messages_.pop(m); - try { - messageListener_(Consumer(shared_from_this()), m); - } catch (const std::exception& e) { - LOG_ERROR("Exception thrown from listener of Partitioned Consumer" << e.what()); - } -} - -void PartitionedConsumerImpl::receiveMessages() { - for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) { - ConsumerImplPtr consumer = *i; - consumer->sendFlowPermitsToBroker(consumer->getCnx().lock(), conf_.getReceiverQueueSize()); - LOG_DEBUG("Sending FLOW command for consumer - " << consumer->getConsumerId()); - } -} - -Result PartitionedConsumerImpl::pauseMessageListener() { - if (!messageListener_) { - return ResultInvalidConfiguration; - } - for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) { - (*i)->pauseMessageListener(); - } - return ResultOk; -} - -Result PartitionedConsumerImpl::resumeMessageListener() { - if (!messageListener_) { - return ResultInvalidConfiguration; - } - for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) { - (*i)->resumeMessageListener(); - } - return ResultOk; -} - -void PartitionedConsumerImpl::redeliverUnacknowledgedMessages() { - LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned consumer."); - for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) { - (*i)->redeliverUnacknowledgedMessages(); - } - unAckedMessageTrackerPtr_->clear(); -} - -void PartitionedConsumerImpl::redeliverUnacknowledgedMessages(const std::set& messageIds) { - if (messageIds.empty()) { - return; - } - if (conf_.getConsumerType() != ConsumerShared && conf_.getConsumerType() != ConsumerKeyShared) { - redeliverUnacknowledgedMessages(); - return; - } - LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned consumer."); - for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) { - (*i)->redeliverUnacknowledgedMessages(messageIds); - } -} - -const std::string& PartitionedConsumerImpl::getName() const { return partitionStr_; } - -int PartitionedConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); } - -void PartitionedConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) { - if (state_ != Ready) { - callback(ResultConsumerNotInitialized, BrokerConsumerStats()); - return; - } - const auto numPartitions = getNumPartitionsWithLock(); - PartitionedBrokerConsumerStatsPtr statsPtr = - std::make_shared(numPartitions); - LatchPtr latchPtr = std::make_shared(numPartitions); - ConsumerList consumerList = consumers_; - for (int i = 0; i < consumerList.size(); i++) { - consumerList[i]->getBrokerConsumerStatsAsync( - std::bind(&PartitionedConsumerImpl::handleGetConsumerStats, shared_from_this(), - std::placeholders::_1, std::placeholders::_2, latchPtr, statsPtr, i, callback)); - } -} - -void PartitionedConsumerImpl::handleGetConsumerStats(Result res, BrokerConsumerStats brokerConsumerStats, - LatchPtr latchPtr, - PartitionedBrokerConsumerStatsPtr statsPtr, size_t index, - BrokerConsumerStatsCallback callback) { - Lock lock(mutex_); - if (res == ResultOk) { - latchPtr->countdown(); - statsPtr->add(brokerConsumerStats, index); - } else { - lock.unlock(); - callback(res, BrokerConsumerStats()); - return; - } - if (latchPtr->getCount() == 0) { - lock.unlock(); - callback(ResultOk, BrokerConsumerStats(statsPtr)); - } -} - -void PartitionedConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) { - callback(ResultOperationNotSupported); -} - -void PartitionedConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) { - if (state_ != Ready) { - callback(ResultAlreadyClosed); - return; - } - - // consumers_ could only be modified when state_ is Ready, so we needn't lock consumersMutex_ here - ConsumerList consumerList = consumers_; - - MultiResultCallback multiResultCallback(callback, consumers_.size()); - for (ConsumerList::const_iterator i = consumerList.begin(); i != consumerList.end(); i++) { - (*i)->seekAsync(timestamp, multiResultCallback); - } -} - -void PartitionedConsumerImpl::runPartitionUpdateTask() { - partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_); - partitionsUpdateTimer_->async_wait( - std::bind(&PartitionedConsumerImpl::getPartitionMetadata, shared_from_this())); -} - -void PartitionedConsumerImpl::getPartitionMetadata() { - using namespace std::placeholders; - lookupServicePtr_->getPartitionMetadataAsync(topicName_) - .addListener(std::bind(&PartitionedConsumerImpl::handleGetPartitions, shared_from_this(), - std::placeholders::_1, std::placeholders::_2)); -} - -void PartitionedConsumerImpl::handleGetPartitions(Result result, - const LookupDataResultPtr& lookupDataResult) { - if (state_ != Ready) { - return; - } - - if (!result) { - const auto newNumPartitions = static_cast(lookupDataResult->getPartitions()); - Lock consumersLock(consumersMutex_); - const auto currentNumPartitions = getNumPartitions(); - assert(currentNumPartitions == consumers_.size()); - if (newNumPartitions > currentNumPartitions) { - LOG_INFO("new partition count: " << newNumPartitions); - numPartitions_ = newNumPartitions; - const auto config = getSinglePartitionConsumerConfig(); - for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) { - auto consumer = newInternalConsumer(i, config); - consumer->start(); - consumers_.push_back(consumer); - } - // `runPartitionUpdateTask()` will be called in `handleSinglePartitionConsumerCreated()` - return; - } - } else { - LOG_WARN("Failed to getPartitionMetadata: " << strResult(result)); - } - - runPartitionUpdateTask(); -} - -void PartitionedConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) { - Lock lock(mutex_); - for (auto&& c : consumers_) { - c->setNegativeAcknowledgeEnabledForTesting(enabled); - } -} - -bool PartitionedConsumerImpl::isConnected() const { - if (state_ != Ready) { - return false; - } - - Lock consumersLock(consumersMutex_); - const auto consumers = consumers_; - consumersLock.unlock(); - for (const auto& consumer : consumers_) { - if (!consumer->isConnected()) { - return false; - } - } - return true; -} - -uint64_t PartitionedConsumerImpl::getNumberOfConnectedConsumer() { - uint64_t numberOfConnectedConsumer = 0; - Lock consumersLock(consumersMutex_); - const auto consumers = consumers_; - consumersLock.unlock(); - for (const auto& consumer : consumers) { - if (consumer->isConnected()) { - numberOfConnectedConsumer++; - } - } - return numberOfConnectedConsumer; -} - -} // namespace pulsar diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h deleted file mode 100644 index 8f4faf09954f2..0000000000000 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -#ifndef PULSAR_PARTITIONED_CONSUMER_HEADER -#define PULSAR_PARTITIONED_CONSUMER_HEADER -#include "lib/TestUtil.h" -#include "ConsumerImpl.h" -#include "ClientImpl.h" -#include -#include - -#include -#include "ConsumerImplBase.h" -#include "lib/UnAckedMessageTrackerDisabled.h" -#include -#include -#include - -namespace pulsar { -class PartitionedConsumerImpl; -class PartitionedConsumerImpl : public ConsumerImplBase, - public std::enable_shared_from_this { - public: - enum PartitionedConsumerState - { - Pending, - Ready, - Closing, - Closed, - Failed - }; - PartitionedConsumerImpl(ClientImplPtr client, const std::string& subscriptionName, - const TopicNamePtr topicName, const unsigned int numPartitions, - const ConsumerConfiguration& conf); - ~PartitionedConsumerImpl(); - // overrided methods from ConsumerImplBase - Future getConsumerCreatedFuture() override; - const std::string& getSubscriptionName() const override; - const std::string& getTopic() const override; - Result receive(Message& msg) override; - Result receive(Message& msg, int timeout) override; - void receiveAsync(ReceiveCallback& callback) override; - void unsubscribeAsync(ResultCallback callback) override; - void acknowledgeAsync(const MessageId& msgId, ResultCallback callback) override; - void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) override; - void closeAsync(ResultCallback callback) override; - void start() override; - void shutdown() override; - bool isClosed() override; - bool isOpen() override; - Result pauseMessageListener() override; - Result resumeMessageListener() override; - void redeliverUnacknowledgedMessages() override; - void redeliverUnacknowledgedMessages(const std::set& messageIds) override; - const std::string& getName() const override; - int getNumOfPrefetchedMessages() const override; - void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) override; - void seekAsync(const MessageId& msgId, ResultCallback callback) override; - void seekAsync(uint64_t timestamp, ResultCallback callback) override; - void negativeAcknowledge(const MessageId& msgId) override; - bool isConnected() const override; - uint64_t getNumberOfConnectedConsumer() override; - - void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, PartitionedBrokerConsumerStatsPtr, - size_t, BrokerConsumerStatsCallback); - - private: - const ClientImplPtr client_; - const std::string subscriptionName_; - const TopicNamePtr topicName_; - unsigned int numPartitions_; - unsigned int numConsumersCreated_ = 0; - const ConsumerConfiguration conf_; - typedef std::vector ConsumerList; - ConsumerList consumers_; - // consumersMutex_ is used to share consumers_ and numPartitions_ - mutable std::mutex consumersMutex_; - mutable std::mutex mutex_; - std::mutex pendingReceiveMutex_; - std::atomic state_{Pending}; - unsigned int unsubscribedSoFar_ = 0; - BlockingQueue messages_; - ExecutorServicePtr listenerExecutor_; - MessageListener messageListener_; - const std::string topic_; - const std::string name_; - const std::string partitionStr_; - ExecutorServicePtr internalListenerExecutor_; - DeadlineTimerPtr partitionsUpdateTimer_; - boost::posix_time::time_duration partitionsUpdateInterval_; - LookupServicePtr lookupServicePtr_; - - unsigned int getNumPartitions() const; - unsigned int getNumPartitionsWithLock() const; - ConsumerConfiguration getSinglePartitionConsumerConfig() const; - ConsumerImplPtr newInternalConsumer(unsigned int partition, const ConsumerConfiguration& config) const; - void handleUnsubscribeAsync(Result result, unsigned int consumerIndex, ResultCallback callback); - void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr, - unsigned int partitionIndex); - void handleSinglePartitionConsumerClose(Result result, unsigned int partitionIndex, - CloseCallback callback); - void notifyResult(CloseCallback closeCallback); - void messageReceived(Consumer consumer, const Message& msg); - void internalListener(Consumer consumer); - void receiveMessages(); - void failPendingReceiveCallback(); - void setNegativeAcknowledgeEnabledForTesting(bool enabled) override; - Promise partitionedConsumerCreatedPromise_; - UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_; - std::queue pendingReceives_; - void runPartitionUpdateTask(); - void getPartitionMetadata(); - void handleGetPartitions(const Result result, const LookupDataResultPtr& lookupDataResult); - - friend class PulsarFriend; - - FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery); -}; -typedef std::weak_ptr PartitionedConsumerImplWeakPtr; -typedef std::shared_ptr PartitionedConsumerImplPtr; -} // namespace pulsar -#endif // PULSAR_PARTITIONED_CONSUMER_HEADER diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index da5c60952dd18..e67b7a4576f16 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -1708,7 +1708,7 @@ TEST(BasicEndToEndTest, testSeekOnPartitionedTopic) { ASSERT_EQ(expected.str(), msgReceived.getDataAsString()); ASSERT_EQ(ResultOk, consumer.acknowledge(msgReceived)); ASSERT_EQ(ResultOk, consumer.unsubscribe()); - ASSERT_EQ(ResultOk, consumer.close()); + ASSERT_EQ(ResultAlreadyClosed, consumer.close()); ASSERT_EQ(ResultOk, producer.close()); ASSERT_EQ(ResultOk, client.close()); } diff --git a/pulsar-client-cpp/tests/ConsumerStatsTest.cc b/pulsar-client-cpp/tests/ConsumerStatsTest.cc index ada86760a2a02..c398a532e68cc 100644 --- a/pulsar-client-cpp/tests/ConsumerStatsTest.cc +++ b/pulsar-client-cpp/tests/ConsumerStatsTest.cc @@ -20,15 +20,12 @@ #include #include #include -#include "CustomRoutingPolicy.h" #include "lib/Future.h" #include "lib/Utils.h" #include "PulsarFriend.h" #include "ConsumerTest.h" #include "HttpHelper.h" #include -#include -#include #include #include @@ -42,8 +39,8 @@ static std::string adminUrl = "http://localhost:8080/"; void partitionedCallbackFunction(Result result, BrokerConsumerStats brokerConsumerStats, long expectedBacklog, Latch& latch, int index, bool accurate) { ASSERT_EQ(result, ResultOk); - PartitionedBrokerConsumerStatsImpl* statsPtr = - (PartitionedBrokerConsumerStatsImpl*)(brokerConsumerStats.getImpl().get()); + MultiTopicsBrokerConsumerStatsImpl* statsPtr = + (MultiTopicsBrokerConsumerStatsImpl*)(brokerConsumerStats.getImpl().get()); LOG_DEBUG(statsPtr); if (accurate) { ASSERT_EQ(expectedBacklog, statsPtr->getBrokerConsumerStats(index).getMsgBacklog()); diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc b/pulsar-client-cpp/tests/ConsumerTest.cc index 575abb696356c..694a568eb154a 100644 --- a/pulsar-client-cpp/tests/ConsumerTest.cc +++ b/pulsar-client-cpp/tests/ConsumerTest.cc @@ -30,7 +30,6 @@ #include "lib/Future.h" #include "lib/Utils.h" #include "lib/LogUtils.h" -#include "lib/PartitionedConsumerImpl.h" #include "lib/MultiTopicsConsumerImpl.h" #include "HttpHelper.h" @@ -406,8 +405,9 @@ TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery) { consumerConfig.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs); consumerConfig.setTickDurationInMs(tickDurationInMs); ASSERT_EQ(ResultOk, client.subscribe(partitionedTopic, subName, consumerConfig, consumer)); - PartitionedConsumerImplPtr partitionedConsumerImplPtr = - PulsarFriend::getPartitionedConsumerImplPtr(consumer); + + MultiTopicsConsumerImplPtr partitionedConsumerImplPtr = + PulsarFriend::getMultiTopicsConsumerImplPtr(consumer); ASSERT_EQ(numPartitions, partitionedConsumerImplPtr->consumers_.size()); // send messages @@ -442,8 +442,10 @@ TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery) { ASSERT_EQ(numOfMessages, partitionedTracker->size()); ASSERT_FALSE(partitionedTracker->isEmpty()); for (auto i = 0; i < numPartitions; i++) { + auto topicName = + "persistent://public/default/" + partitionedTopic + "-partition-" + std::to_string(i); ASSERT_EQ(numOfMessages / numPartitions, messageIds[i].size()); - auto subConsumerPtr = partitionedConsumerImplPtr->consumers_[i]; + auto subConsumerPtr = partitionedConsumerImplPtr->consumers_.find(topicName).value(); auto tracker = static_cast(subConsumerPtr->unAckedMessageTrackerPtr_.get()); ASSERT_EQ(0, tracker->size()); diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h index b74c0e1241bda..b6fb219eabb72 100644 --- a/pulsar-client-cpp/tests/PulsarFriend.h +++ b/pulsar-client-cpp/tests/PulsarFriend.h @@ -23,7 +23,6 @@ #include "lib/ProducerImpl.h" #include "lib/PartitionedProducerImpl.h" #include "lib/ConsumerImpl.h" -#include "lib/PartitionedConsumerImpl.h" #include "lib/MultiTopicsConsumerImpl.h" #include "lib/ReaderImpl.h" @@ -92,10 +91,6 @@ class PulsarFriend { return consumerImpl->chunkedMessageCache_; } - static std::shared_ptr getPartitionedConsumerImplPtr(Consumer consumer) { - return std::static_pointer_cast(consumer.impl_); - } - static std::shared_ptr getMultiTopicsConsumerImplPtr(Consumer consumer) { return std::static_pointer_cast(consumer.impl_); }