From 26189547b1159b740999ddc624d4781c29ab56c3 Mon Sep 17 00:00:00 2001 From: Jai ASher Date: Tue, 28 Feb 2017 20:36:04 -0800 Subject: [PATCH 1/7] C++ Client - Async call for getting ConsumerStats from broker --- pulsar-client-cpp/include/pulsar/Consumer.h | 42 ++++++--- pulsar-client-cpp/lib/ClientConnection.cc | 5 +- pulsar-client-cpp/lib/ClientConnection.h | 2 - pulsar-client-cpp/lib/Consumer.cc | 24 +++-- pulsar-client-cpp/lib/ConsumerImpl.cc | 38 +++++--- pulsar-client-cpp/lib/ConsumerImpl.h | 5 +- pulsar-client-cpp/lib/ConsumerImplBase.h | 2 +- .../lib/PartitionedConsumerImpl.cc | 15 ++- .../lib/PartitionedConsumerImpl.h | 3 +- pulsar-client-cpp/tests/ConsumerStatsTest.cc | 93 ++++++++++++++++++- 10 files changed, 174 insertions(+), 55 deletions(-) diff --git a/pulsar-client-cpp/include/pulsar/Consumer.h b/pulsar-client-cpp/include/pulsar/Consumer.h index 598c6b1319a63..a9ec05e7433b3 100644 --- a/pulsar-client-cpp/include/pulsar/Consumer.h +++ b/pulsar-client-cpp/include/pulsar/Consumer.h @@ -58,16 +58,16 @@ enum ConsumerType { class BrokerConsumerStats { private: - /* - * validTillInMs_ - Stats will be valid till this time. - */ + static const long CONSUMER_STATS_TTL_IN_MS = 30 * 1000; // 30 seconds + + /** validTill_ - Stats will be valid till this time.*/ boost::posix_time::ptime validTill_; public: BrokerConsumerStats(); - BrokerConsumerStats(boost::posix_time::ptime& validTill, double msgRateOut, double msgThroughputOut, - double msgRateRedeliver, std::string consumerName, int availablePermits, - int unackedMessages, bool blockedConsumerOnUnackedMsgs, std::string address, - std::string connectedSince, std::string type, double msgRateExpired, long msgBacklog); + BrokerConsumerStats(double msgRateOut, double msgThroughputOut, + double msgRateRedeliver, std::string consumerName, uint64_t availablePermits, + uint64_t unackedMessages, bool blockedConsumerOnUnackedMsgs, std::string address, + std::string connectedSince, std::string type, double msgRateExpired, uint64_t msgBacklog); /** Returns true if the Message is Expired **/ bool isValid() const; @@ -85,10 +85,10 @@ class BrokerConsumerStats { std::string consumerName_; /** Number of available message permits for the consumer */ - int availablePermits_; + uint64_t availablePermits_; /** Number of unacknowledged messages for the consumer */ - int unackedMessages_; + uint64_t unackedMessages_; /** Flag to verify if consumer is blocked due to reaching threshold of unacked messages */ bool blockedConsumerOnUnackedMsgs_; @@ -99,19 +99,22 @@ class BrokerConsumerStats { /** Timestamp of connection */ std::string connectedSince_; - /// Whether this subscription is Exclusive or Shared or Failover + /** Whether this subscription is Exclusive or Shared or Failover */ std::string type_; - /// Total rate of messages expired on this subscription. msg/s + /** Total rate of messages expired on this subscription. msg/s */ double msgRateExpired_; - /// Number of messages in the subscription backlog - long msgBacklog_; + /** Number of messages in the subscription backlog */ + uint64_t msgBacklog_; friend std::ostream& operator<<(std::ostream& os, const BrokerConsumerStats& obj); }; -/** +/// Callback definition for BrokerConsumerStats +typedef boost::function BrokerConsumerStatsCallback; + + /** * Class specifying the configuration of a consumer. */ class ConsumerConfiguration { @@ -353,6 +356,17 @@ class Consumer { */ Result getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex = -1); + /** + * Asynchronous call to gets Consumer Stats from broker. + * The stats are cached for 30 seconds, if a call is made before the stats returned by the previous call expires + * then cached data will be returned. BrokerConsumerStats::isValid() function can be used to check if the stats are + * still valid. + * + * @param callback - callback function to get the brokerConsumerStats, + * if result is ResultOk then the brokerConsumerStats will be populated + * @param partitionIndex - optional parameter which is to be populated only if the topic is partitioned. + */ + void getConsumerStatsAsync(BrokerConsumerStatsCallback callback, int partitionIndex = -1); private: typedef boost::shared_ptr ConsumerImplBasePtr; friend class PulsarFriend; diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index bcb35be7a64bd..1448c08fdc84d 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -124,7 +124,6 @@ havePendingPingRequest_(false), keepAliveTimer_(), maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()), consumerStatsRequestTimer_(executor_->createDeadlineTimer()), -consumerStatsTTLMs_(30 * 1000), numOfPendingLookupRequest_(0), isTlsAllowInsecureConnection_(false) { if (clientConfiguration.isUseTls()) { @@ -730,10 +729,8 @@ void ClientConnection::handleIncomingCommand() { cnxString_ << "ConsumerStatsResponse command - Received consumer stats response from server. req_id: " << consumerStatsResponse.request_id() << " Stats: "); - boost::posix_time::ptime validTill = now() + milliseconds(consumerStatsTTLMs_); BrokerConsumerStats brokerStats = - BrokerConsumerStats(validTill, - consumerStatsResponse.msgrateout(), + BrokerConsumerStats(consumerStatsResponse.msgrateout(), consumerStatsResponse.msgthroughputout(), consumerStatsResponse.msgrateredeliver(), consumerStatsResponse.consumername(), diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h index 0617b46b58aa3..27f16fdd9428b 100644 --- a/pulsar-client-cpp/lib/ClientConnection.h +++ b/pulsar-client-cpp/lib/ClientConnection.h @@ -135,8 +135,6 @@ class ClientConnection : public boost::enable_shared_from_this Future newConsumerStats(const std::string topicName, const std::string subscriptionName, uint64_t consumerId, uint64_t requestId) ; private: - long consumerStatsTTLMs_ ; - struct PendingRequestData { Promise promise; DeadlineTimerPtr timer; diff --git a/pulsar-client-cpp/lib/Consumer.cc b/pulsar-client-cpp/lib/Consumer.cc index f05d0035853e2..69138b47f7d3b 100644 --- a/pulsar-client-cpp/lib/Consumer.cc +++ b/pulsar-client-cpp/lib/Consumer.cc @@ -18,6 +18,7 @@ #include #include "ConsumerImpl.h" #include "Utils.h" +#include namespace pulsar { @@ -25,11 +26,11 @@ const std::string EMPTY_STRING; BrokerConsumerStats::BrokerConsumerStats():validTill_(now()) {}; -BrokerConsumerStats::BrokerConsumerStats(boost::posix_time::ptime& validTill, double msgRateOut, double msgThroughputOut, - double msgRateRedeliver, std::string consumerName, int availablePermits, - int unackedMessages, bool blockedConsumerOnUnackedMsgs, std::string address, - std::string connectedSince, std::string type, double msgRateExpired, long msgBacklog): - validTill_(validTill), +BrokerConsumerStats::BrokerConsumerStats(double msgRateOut, double msgThroughputOut, + double msgRateRedeliver, std::string consumerName, uint64_t availablePermits, + uint64_t unackedMessages, bool blockedConsumerOnUnackedMsgs, std::string address, + std::string connectedSince, std::string type, double msgRateExpired, uint64_t msgBacklog): + validTill_(microsec_clock::universal_time() + milliseconds(CONSUMER_STATS_TTL_IN_MS)), msgRateOut_(msgRateOut), msgThroughputOut_(msgThroughputOut), msgRateRedeliver_(msgRateRedeliver), @@ -304,10 +305,19 @@ void Consumer::redeliverUnacknowledgedMessages() { } } -Result Consumer::getConsumerStats(BrokerConsumerStats& BrokerConsumerStats, int partitionIndex) { +Result Consumer::getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex) { if (!impl_) { return ResultConsumerNotInitialized; } - return impl_->getConsumerStats(BrokerConsumerStats, partitionIndex); + Promise promise; + impl_->getConsumerStatsAsync(WaitForCallbackValue(promise), partitionIndex); + return promise.getFuture().get(brokerConsumerStats);; +} + +void Consumer::getConsumerStatsAsync(BrokerConsumerStatsCallback callback, int partitionIndex) { + if (!impl_) { + return callback(ResultConsumerNotInitialized, BrokerConsumerStats()); + } + return impl_->getConsumerStatsAsync(callback, partitionIndex); } } diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index 3f9b8541d8eca..9db3891d8b349 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -687,22 +687,25 @@ int ConsumerImpl::getNumOfPrefetchedMessages() const { return incomingMessages_.size(); } -Result ConsumerImpl::getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex) { +void ConsumerImpl::getConsumerStatsAsync(BrokerConsumerStatsCallback callback, int partitionIndex) { if (partitionIndex != -1) { LOG_WARN(getName() << "Ignoring the partitionIndex since the topic is not partitioned") } - if (!isOpen()) { + Lock lock(mutex_); + if (state_ != Ready) { LOG_ERROR(getName() << "Client connection is not open, please try again later.") - return ResultConsumerNotInitialized; + lock.unlock(); + return callback(ResultConsumerNotInitialized, BrokerConsumerStats()); } if (brokerConsumerStats_.isValid()) { LOG_DEBUG(getName() << "Serving data from cache"); - brokerConsumerStats = brokerConsumerStats_; - return ResultOk; + BrokerConsumerStats brokerConsumerStats = brokerConsumerStats_; + lock.unlock(); + return callback(ResultOk, brokerConsumerStats); } - + lock.unlock(); ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { @@ -712,19 +715,26 @@ Result ConsumerImpl::getConsumerStats(BrokerConsumerStats& brokerConsumerStats, LOG_DEBUG(getName() << " Sending ConsumerStats Command for Consumer - " << getConsumerId() << ", requestId - "<newConsumerStats(topic_, subscription_, consumerId_, requestId).get(consumerStats); - if (res == ResultOk) { - brokerConsumerStats = brokerConsumerStats_ = consumerStats; - } - return res; + cnx->newConsumerStats(topic_, subscription_, consumerId_, requestId).addListener( + boost::bind(&ConsumerImpl::brokerConsumerStatsListener, shared_from_this(), _1, _2, callback)); + return; } else { LOG_ERROR(getName() << " Operation not supported since server protobuf version " << cnx->getServerProtocolVersion() << " is older than proto::v7"); - return ResultOperationNotSupported; + return callback(ResultUnsupportedVersionError, BrokerConsumerStats()); } } LOG_ERROR(getName() << " Client Connection not ready for Consumer"); - return ResultNotConnected; + return callback(ResultNotConnected, BrokerConsumerStats()); +} + +void ConsumerImpl::brokerConsumerStatsListener(Result res, const BrokerConsumerStats brokerConsumerStats + , BrokerConsumerStatsCallback callback) { + + if (res == ResultOk) { + Lock lock(mutex_); + brokerConsumerStats_ = brokerConsumerStats; + } + callback(res, brokerConsumerStats); } } /* namespace pulsar */ diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h index 15ba7539ec4ff..d7944be04a5f3 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.h +++ b/pulsar-client-cpp/lib/ConsumerImpl.h @@ -91,8 +91,8 @@ enum ConsumerTopicType { virtual Result pauseMessageListener(); virtual Result resumeMessageListener(); virtual void redeliverUnacknowledgedMessages(); - virtual Result getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex = -1); -protected: + virtual void getConsumerStatsAsync(BrokerConsumerStatsCallback callback, int partitionIndex = -1); + protected: void connectionOpened(const ClientConnectionPtr& cnx); void connectionFailed(Result result); void handleCreateConsumer(const ClientConnectionPtr& cnx, Result result); @@ -114,6 +114,7 @@ enum ConsumerTopicType { void increaseAvailablePermits(const ClientConnectionPtr& currentCnx); void drainIncomingMessageQueue(size_t count); unsigned int receiveIndividualMessagesFromBatch(Message &batchedMessage); + void brokerConsumerStatsListener(Result, const BrokerConsumerStats, BrokerConsumerStatsCallback); boost::mutex mutexForReceiveWithZeroQueueSize; const ConsumerConfiguration config_; diff --git a/pulsar-client-cpp/lib/ConsumerImplBase.h b/pulsar-client-cpp/lib/ConsumerImplBase.h index 5d48317f8fdc4..650a1fac8dc71 100644 --- a/pulsar-client-cpp/lib/ConsumerImplBase.h +++ b/pulsar-client-cpp/lib/ConsumerImplBase.h @@ -47,7 +47,7 @@ class ConsumerImplBase { virtual void redeliverUnacknowledgedMessages() = 0; virtual const std::string& getName() const = 0; virtual int getNumOfPrefetchedMessages() const = 0; - virtual Result getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex = -1) = 0; + virtual void getConsumerStatsAsync(BrokerConsumerStatsCallback callback, int partitionIndex = -1) = 0; }; } #endif //PULSAR_CONSUMER_IMPL_BASE_HEADER diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc index 5de6993902c5c..ae0268006fd71 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc @@ -380,12 +380,17 @@ namespace pulsar { return messages_.size(); } - Result PartitionedConsumerImpl::getConsumerStats(BrokerConsumerStats& BrokerConsumerStats, int partitionIndex) { - if (partitionIndex >= numPartitions_ && partitionIndex < 0 && consumers_.size() <= partitionIndex) + void PartitionedConsumerImpl::getConsumerStatsAsync(BrokerConsumerStatsCallback callback, int partitionIndex) { + Lock lock(mutex_); + if (partitionIndex >= numPartitions_ || partitionIndex < 0 || partitionIndex >= consumers_.size()) { - LOG_ERROR(getName() << " PartitionIndex must be positive and less than number of partitiones") - return ResultInvalidConfiguration; + lock.unlock(); + LOG_ERROR(getName() << " PartitionIndex must be positive and less than number of partitions") + return callback(ResultInvalidConfiguration, BrokerConsumerStats()); } - return consumers_[partitionIndex]->getConsumerStats(BrokerConsumerStats); + ConsumerImplPtr c = consumers_[partitionIndex]; + lock.unlock(); + + return c->getConsumerStatsAsync(callback); } } diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h index 3338adda125a2..8234da9985d54 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h @@ -60,7 +60,8 @@ namespace pulsar { virtual void redeliverUnacknowledgedMessages(); virtual const std::string& getName() const; virtual int getNumOfPrefetchedMessages() const ; - virtual Result getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex); + virtual void getConsumerStatsAsync(BrokerConsumerStatsCallback callback, int partitionIndex = -1); + private: const ClientImplPtr client_; const std::string subscriptionName_; diff --git a/pulsar-client-cpp/tests/ConsumerStatsTest.cc b/pulsar-client-cpp/tests/ConsumerStatsTest.cc index b1bfecba04fd0..a7b178c401aa6 100644 --- a/pulsar-client-cpp/tests/ConsumerStatsTest.cc +++ b/pulsar-client-cpp/tests/ConsumerStatsTest.cc @@ -18,21 +18,17 @@ #include #include #include -#include #include "DestinationName.h" #include -#include #include "boost/date_time/posix_time/posix_time.hpp" #include "CustomRoutingPolicy.h" #include #include "lib/Future.h" #include "lib/Utils.h" -#include -#include "LogUtils.h" #include "PulsarFriend.h" -#include #include "ConsumerTest.h" #include "HttpHelper.h" +#include DECLARE_LOG_OBJECT(); using namespace pulsar; @@ -40,6 +36,11 @@ using namespace pulsar; static std::string lookupUrl = "pulsar://localhost:8885"; static std::string adminUrl = "http://localhost:8765/"; +void callbackFunction(Result result, const BrokerConsumerStats brokerConsumerStats, long expectedBacklog, Latch& latch) { + LOG_DEBUG(brokerConsumerStats); + ASSERT_EQ(expectedBacklog, brokerConsumerStats.msgBacklog_); + latch.countdown(); +} TEST(ConsumerStatsTest, testBacklogInfo) { long epochTime=time(NULL); @@ -74,6 +75,7 @@ TEST(ConsumerStatsTest, testBacklogInfo) { producer.send(msg); } + LOG_DEBUG("Calling consumer.getConsumerStats"); BrokerConsumerStats consumerStats; Result res = consumer.getConsumerStats(consumerStats); ASSERT_EQ(res, ResultOk); @@ -212,4 +214,85 @@ TEST(ConsumerStatsTest, testCachingMechanism) { consumer.unsubscribe(); ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); +} + + +TEST(ConsumerStatsTest, testAsyncCallOnPartitionedTopic) { + long epochTime=time(NULL); + std::string testName="testAsyncCallOnPartitionedTopic-" + boost::lexical_cast(epochTime); + Client client(lookupUrl); + std::string topicName = "persistent://property/cluster/namespace/" + testName; + std::string subName = "subscription-name"; + + // call admin api to create partitioned topics + std::string url = adminUrl + "admin/persistent/property/cluster/namespace/" + testName + "/partitions"; + int res = makePutRequest(url, "7"); + + LOG_INFO("res = "< consumerPromise; + BrokerConsumerStats consumerStats; + client.subscribeAsync(topicName, subName, WaitForCallbackValue(consumerPromise)); + ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); + Future consumerFuture = consumerPromise.getFuture(); + Result result = consumerFuture.get(consumer); + ASSERT_EQ(ResultOk, result); + + // handling dangling subscriptions + consumer.unsubscribe(); + ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); + client.subscribe(topicName, subName, consumer); + + // Producing messages + Producer producer; + int numOfMessages = 7 * 5; // 5 message per partition + Promise producerPromise; + ProducerConfiguration config; + config.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution); + client.createProducerAsync(topicName, config, WaitForCallbackValue(producerPromise)); + Future producerFuture = producerPromise.getFuture(); + result = producerFuture.get(producer); + ASSERT_EQ(ResultOk, result); + + std::string prefix = testName + "-"; + for (int i = 0; i(i); + Message msg = MessageBuilder().build(); + producer.send(msg); + } + + // Expecting return from 4 callbacks + Latch latch(4); + consumer.getConsumerStatsAsync(boost::bind(callbackFunction, _1, _2, 5, latch), 0); + + BrokerConsumerStats brokerConsumerStats; + // Expecting error since partitionIndex is invalid + ASSERT_EQ(ResultInvalidConfiguration, consumer.getConsumerStats(brokerConsumerStats, -1)); + ASSERT_EQ(ResultInvalidConfiguration, consumer.getConsumerStats(brokerConsumerStats, 7)); + + // Now we have 10 messages per partition + for (int i = numOfMessages; i<(numOfMessages*2); i++) { + std::string messageContent = prefix + boost::lexical_cast(i); + Message msg = MessageBuilder().build(); + producer.send(msg); + } + + // Expecting cached result + consumer.getConsumerStatsAsync(boost::bind(callbackFunction, _1, _2, 5, latch), 0); + + // Expecting fresh results since the partition index is different + consumer.getConsumerStatsAsync(boost::bind(callbackFunction, _1, _2, 10, latch), 2); + + Message msg; + while (consumer.receive(msg)) { + // Do nothing + } + + // Expecting the backlog to be the same since we didn't acknowledge the messages + consumer.getConsumerStatsAsync(boost::bind(callbackFunction, _1, _2, 10, latch), 3); + + // Wait for ten seconds only + ASSERT_TRUE(latch.wait(milliseconds(10 * 1000))); } \ No newline at end of file From 3480d8bf475ffd5328cd7dae2d92bb23f6d3732a Mon Sep 17 00:00:00 2001 From: Jai ASher Date: Mon, 20 Mar 2017 16:05:41 -0700 Subject: [PATCH 2/7] Removed partitionIndex --- pulsar-client-cpp/include/pulsar/Consumer.h | 6 +- pulsar-client-cpp/lib/Consumer.cc | 8 +- pulsar-client-cpp/lib/ConsumerImpl.cc | 6 +- pulsar-client-cpp/lib/ConsumerImpl.h | 2 +- pulsar-client-cpp/lib/ConsumerImplBase.h | 2 +- .../lib/PartitionedConsumerImpl.cc | 5 +- .../lib/PartitionedConsumerImpl.h | 2 +- pulsar-client-cpp/tests/ConsumerStatsTest.cc | 298 ------------------ 8 files changed, 14 insertions(+), 315 deletions(-) delete mode 100644 pulsar-client-cpp/tests/ConsumerStatsTest.cc diff --git a/pulsar-client-cpp/include/pulsar/Consumer.h b/pulsar-client-cpp/include/pulsar/Consumer.h index a9ec05e7433b3..c321dab59533a 100644 --- a/pulsar-client-cpp/include/pulsar/Consumer.h +++ b/pulsar-client-cpp/include/pulsar/Consumer.h @@ -350,11 +350,10 @@ class Consumer { * still valid. * * @param brokerConsumerStats - if the function returns ResultOk, this object will contain consumer stats - * @param partitionIndex - optional parameter which is to be populated only if the topic is partitioned. * * @note This is a blocking call with timeout of thirty seconds. */ - Result getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex = -1); + Result getConsumerStats(BrokerConsumerStats& brokerConsumerStats); /** * Asynchronous call to gets Consumer Stats from broker. @@ -364,9 +363,8 @@ class Consumer { * * @param callback - callback function to get the brokerConsumerStats, * if result is ResultOk then the brokerConsumerStats will be populated - * @param partitionIndex - optional parameter which is to be populated only if the topic is partitioned. */ - void getConsumerStatsAsync(BrokerConsumerStatsCallback callback, int partitionIndex = -1); + void getConsumerStatsAsync(BrokerConsumerStatsCallback callback); private: typedef boost::shared_ptr ConsumerImplBasePtr; friend class PulsarFriend; diff --git a/pulsar-client-cpp/lib/Consumer.cc b/pulsar-client-cpp/lib/Consumer.cc index 69138b47f7d3b..5543a4e0fb81d 100644 --- a/pulsar-client-cpp/lib/Consumer.cc +++ b/pulsar-client-cpp/lib/Consumer.cc @@ -305,19 +305,19 @@ void Consumer::redeliverUnacknowledgedMessages() { } } -Result Consumer::getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex) { +Result Consumer::getConsumerStats(BrokerConsumerStats& brokerConsumerStats) { if (!impl_) { return ResultConsumerNotInitialized; } Promise promise; - impl_->getConsumerStatsAsync(WaitForCallbackValue(promise), partitionIndex); + impl_->getConsumerStatsAsync(WaitForCallbackValue(promise)); return promise.getFuture().get(brokerConsumerStats);; } -void Consumer::getConsumerStatsAsync(BrokerConsumerStatsCallback callback, int partitionIndex) { +void Consumer::getConsumerStatsAsync(BrokerConsumerStatsCallback callback) { if (!impl_) { return callback(ResultConsumerNotInitialized, BrokerConsumerStats()); } - return impl_->getConsumerStatsAsync(callback, partitionIndex); + return impl_->getConsumerStatsAsync(callback); } } diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index 9db3891d8b349..9612bd92affea 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -687,11 +687,7 @@ int ConsumerImpl::getNumOfPrefetchedMessages() const { return incomingMessages_.size(); } -void ConsumerImpl::getConsumerStatsAsync(BrokerConsumerStatsCallback callback, int partitionIndex) { - if (partitionIndex != -1) { - LOG_WARN(getName() << "Ignoring the partitionIndex since the topic is not partitioned") - } - +void ConsumerImpl::getConsumerStatsAsync(BrokerConsumerStatsCallback callback) { Lock lock(mutex_); if (state_ != Ready) { LOG_ERROR(getName() << "Client connection is not open, please try again later.") diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h index d7944be04a5f3..d4b5726e833b0 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.h +++ b/pulsar-client-cpp/lib/ConsumerImpl.h @@ -91,7 +91,7 @@ enum ConsumerTopicType { virtual Result pauseMessageListener(); virtual Result resumeMessageListener(); virtual void redeliverUnacknowledgedMessages(); - virtual void getConsumerStatsAsync(BrokerConsumerStatsCallback callback, int partitionIndex = -1); + virtual void getConsumerStatsAsync(BrokerConsumerStatsCallback callback); protected: void connectionOpened(const ClientConnectionPtr& cnx); void connectionFailed(Result result); diff --git a/pulsar-client-cpp/lib/ConsumerImplBase.h b/pulsar-client-cpp/lib/ConsumerImplBase.h index 650a1fac8dc71..d675d6896bda0 100644 --- a/pulsar-client-cpp/lib/ConsumerImplBase.h +++ b/pulsar-client-cpp/lib/ConsumerImplBase.h @@ -47,7 +47,7 @@ class ConsumerImplBase { virtual void redeliverUnacknowledgedMessages() = 0; virtual const std::string& getName() const = 0; virtual int getNumOfPrefetchedMessages() const = 0; - virtual void getConsumerStatsAsync(BrokerConsumerStatsCallback callback, int partitionIndex = -1) = 0; + virtual void getConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0; }; } #endif //PULSAR_CONSUMER_IMPL_BASE_HEADER diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc index ae0268006fd71..1e7f0ee23d056 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc @@ -380,8 +380,10 @@ namespace pulsar { return messages_.size(); } - void PartitionedConsumerImpl::getConsumerStatsAsync(BrokerConsumerStatsCallback callback, int partitionIndex) { + void PartitionedConsumerImpl::getConsumerStatsAsync(BrokerConsumerStatsCallback callback) { Lock lock(mutex_); + // TODO - think about this code change + /* if (partitionIndex >= numPartitions_ || partitionIndex < 0 || partitionIndex >= consumers_.size()) { lock.unlock(); @@ -392,5 +394,6 @@ namespace pulsar { lock.unlock(); return c->getConsumerStatsAsync(callback); + */ } } diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h index 8234da9985d54..d3328cc0c28af 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h @@ -60,7 +60,7 @@ namespace pulsar { virtual void redeliverUnacknowledgedMessages(); virtual const std::string& getName() const; virtual int getNumOfPrefetchedMessages() const ; - virtual void getConsumerStatsAsync(BrokerConsumerStatsCallback callback, int partitionIndex = -1); + virtual void getConsumerStatsAsync(BrokerConsumerStatsCallback callback); private: const ClientImplPtr client_; diff --git a/pulsar-client-cpp/tests/ConsumerStatsTest.cc b/pulsar-client-cpp/tests/ConsumerStatsTest.cc deleted file mode 100644 index a7b178c401aa6..0000000000000 --- a/pulsar-client-cpp/tests/ConsumerStatsTest.cc +++ /dev/null @@ -1,298 +0,0 @@ -/** - * Copyright 2016 Yahoo Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include -#include "DestinationName.h" -#include -#include "boost/date_time/posix_time/posix_time.hpp" -#include "CustomRoutingPolicy.h" -#include -#include "lib/Future.h" -#include "lib/Utils.h" -#include "PulsarFriend.h" -#include "ConsumerTest.h" -#include "HttpHelper.h" -#include -DECLARE_LOG_OBJECT(); - -using namespace pulsar; - -static std::string lookupUrl = "pulsar://localhost:8885"; -static std::string adminUrl = "http://localhost:8765/"; - -void callbackFunction(Result result, const BrokerConsumerStats brokerConsumerStats, long expectedBacklog, Latch& latch) { - LOG_DEBUG(brokerConsumerStats); - ASSERT_EQ(expectedBacklog, brokerConsumerStats.msgBacklog_); - latch.countdown(); -} - -TEST(ConsumerStatsTest, testBacklogInfo) { - long epochTime=time(NULL); - std::string testName="testBacklogInfo-" + boost::lexical_cast(epochTime); - Client client(lookupUrl); - std::string topicName = "persistent://property/cluster/namespace/" + testName; - std::string subName = "subscription-name"; - Consumer consumer; - Promise consumerPromise; - client.subscribeAsync(topicName, subName, WaitForCallbackValue(consumerPromise)); - Future consumerFuture = consumerPromise.getFuture(); - Result result = consumerFuture.get(consumer); - ASSERT_EQ(ResultOk, result); - - // handling dangling subscriptions - consumer.unsubscribe(); - client.subscribe(topicName, subName, consumer); - - // Producing messages - Producer producer; - int numOfMessages = 10; - Promise producerPromise; - client.createProducerAsync(topicName, WaitForCallbackValue(producerPromise)); - Future producerFuture = producerPromise.getFuture(); - result = producerFuture.get(producer); - ASSERT_EQ(ResultOk, result); - - std::string prefix = testName + "-"; - for (int i = 0; i(i); - Message msg = MessageBuilder().build(); - producer.send(msg); - } - - LOG_DEBUG("Calling consumer.getConsumerStats"); - BrokerConsumerStats consumerStats; - Result res = consumer.getConsumerStats(consumerStats); - ASSERT_EQ(res, ResultOk); - - LOG_DEBUG(consumerStats); - ASSERT_EQ(consumerStats.msgBacklog_, numOfMessages); - - for (int i = numOfMessages; i<(numOfMessages*2); i++) { - std::string messageContent = prefix + boost::lexical_cast(i); - Message msg = MessageBuilder().build(); - producer.send(msg); - } - - usleep(35 * 1000 * 1000); - res = consumer.getConsumerStats(consumerStats); - ASSERT_EQ(res, ResultOk); - - LOG_DEBUG(consumerStats); - ASSERT_EQ(consumerStats.msgBacklog_, 2 * numOfMessages); - consumer.unsubscribe(); -} - -TEST(ConsumerStatsTest, testFailure) { - long epochTime=time(NULL); - std::string testName="testFailure-" + boost::lexical_cast(epochTime); - Client client(lookupUrl); - std::string topicName = "persistent://property/cluster/namespace/" + testName; - std::string subName = "subscription-name"; - Consumer consumer; - Promise consumerPromise; - BrokerConsumerStats consumerStats; - client.subscribeAsync(topicName, subName, WaitForCallbackValue(consumerPromise)); - ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); - Future consumerFuture = consumerPromise.getFuture(); - Result result = consumerFuture.get(consumer); - ASSERT_EQ(ResultOk, result); - - // handling dangling subscriptions - consumer.unsubscribe(); - ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); - client.subscribe(topicName, subName, consumer); - - // Producing messages - Producer producer; - int numOfMessages = 5; - Promise producerPromise; - client.createProducerAsync(topicName, WaitForCallbackValue(producerPromise)); - Future producerFuture = producerPromise.getFuture(); - result = producerFuture.get(producer); - ASSERT_EQ(ResultOk, result); - - std::string prefix = testName + "-"; - for (int i = 0; i(i); - Message msg = MessageBuilder().build(); - producer.send(msg); - } - - ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats)); - - LOG_DEBUG(consumerStats); - ASSERT_EQ(consumerStats.msgBacklog_, numOfMessages); - - consumer.unsubscribe(); - ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); -} - -TEST(ConsumerStatsTest, testCachingMechanism) { - long epochTime=time(NULL); - std::string testName="testCachingMechanism-" + boost::lexical_cast(epochTime); - Client client(lookupUrl); - std::string topicName = "persistent://property/cluster/namespace/" + testName; - std::string subName = "subscription-name"; - Consumer consumer; - Promise consumerPromise; - BrokerConsumerStats consumerStats; - client.subscribeAsync(topicName, subName, WaitForCallbackValue(consumerPromise)); - ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); - Future consumerFuture = consumerPromise.getFuture(); - Result result = consumerFuture.get(consumer); - ASSERT_EQ(ResultOk, result); - - // handling dangling subscriptions - consumer.unsubscribe(); - ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); - client.subscribe(topicName, subName, consumer); - - // Producing messages - Producer producer; - int numOfMessages = 5; - Promise producerPromise; - client.createProducerAsync(topicName, WaitForCallbackValue(producerPromise)); - Future producerFuture = producerPromise.getFuture(); - result = producerFuture.get(producer); - ASSERT_EQ(ResultOk, result); - - std::string prefix = testName + "-"; - for (int i = 0; i(i); - Message msg = MessageBuilder().build(); - producer.send(msg); - } - - ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats)); - - LOG_DEBUG(consumerStats); - ASSERT_EQ(consumerStats.msgBacklog_, numOfMessages); - - for (int i = numOfMessages; i<(numOfMessages*2); i++) { - std::string messageContent = prefix + boost::lexical_cast(i); - Message msg = MessageBuilder().build(); - producer.send(msg); - } - - LOG_DEBUG("Expecting cached results"); - ASSERT_TRUE(consumerStats.isValid()); - ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats)); - LOG_DEBUG(consumerStats); - ASSERT_EQ(consumerStats.msgBacklog_, numOfMessages); - - LOG_DEBUG("Still Expecting cached results"); - usleep(10 * 1000 * 1000); - ASSERT_TRUE(consumerStats.isValid()); - ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats)); - - LOG_DEBUG(consumerStats); - ASSERT_EQ(consumerStats.msgBacklog_, numOfMessages); - - LOG_DEBUG("Now expecting new results"); - usleep(25 * 1000 * 1000); - ASSERT_FALSE(consumerStats.isValid()); - ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats)); - - LOG_DEBUG(consumerStats); - ASSERT_EQ(consumerStats.msgBacklog_, numOfMessages * 2); - - consumer.unsubscribe(); - ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); -} - - -TEST(ConsumerStatsTest, testAsyncCallOnPartitionedTopic) { - long epochTime=time(NULL); - std::string testName="testAsyncCallOnPartitionedTopic-" + boost::lexical_cast(epochTime); - Client client(lookupUrl); - std::string topicName = "persistent://property/cluster/namespace/" + testName; - std::string subName = "subscription-name"; - - // call admin api to create partitioned topics - std::string url = adminUrl + "admin/persistent/property/cluster/namespace/" + testName + "/partitions"; - int res = makePutRequest(url, "7"); - - LOG_INFO("res = "< consumerPromise; - BrokerConsumerStats consumerStats; - client.subscribeAsync(topicName, subName, WaitForCallbackValue(consumerPromise)); - ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); - Future consumerFuture = consumerPromise.getFuture(); - Result result = consumerFuture.get(consumer); - ASSERT_EQ(ResultOk, result); - - // handling dangling subscriptions - consumer.unsubscribe(); - ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); - client.subscribe(topicName, subName, consumer); - - // Producing messages - Producer producer; - int numOfMessages = 7 * 5; // 5 message per partition - Promise producerPromise; - ProducerConfiguration config; - config.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution); - client.createProducerAsync(topicName, config, WaitForCallbackValue(producerPromise)); - Future producerFuture = producerPromise.getFuture(); - result = producerFuture.get(producer); - ASSERT_EQ(ResultOk, result); - - std::string prefix = testName + "-"; - for (int i = 0; i(i); - Message msg = MessageBuilder().build(); - producer.send(msg); - } - - // Expecting return from 4 callbacks - Latch latch(4); - consumer.getConsumerStatsAsync(boost::bind(callbackFunction, _1, _2, 5, latch), 0); - - BrokerConsumerStats brokerConsumerStats; - // Expecting error since partitionIndex is invalid - ASSERT_EQ(ResultInvalidConfiguration, consumer.getConsumerStats(brokerConsumerStats, -1)); - ASSERT_EQ(ResultInvalidConfiguration, consumer.getConsumerStats(brokerConsumerStats, 7)); - - // Now we have 10 messages per partition - for (int i = numOfMessages; i<(numOfMessages*2); i++) { - std::string messageContent = prefix + boost::lexical_cast(i); - Message msg = MessageBuilder().build(); - producer.send(msg); - } - - // Expecting cached result - consumer.getConsumerStatsAsync(boost::bind(callbackFunction, _1, _2, 5, latch), 0); - - // Expecting fresh results since the partition index is different - consumer.getConsumerStatsAsync(boost::bind(callbackFunction, _1, _2, 10, latch), 2); - - Message msg; - while (consumer.receive(msg)) { - // Do nothing - } - - // Expecting the backlog to be the same since we didn't acknowledge the messages - consumer.getConsumerStatsAsync(boost::bind(callbackFunction, _1, _2, 10, latch), 3); - - // Wait for ten seconds only - ASSERT_TRUE(latch.wait(milliseconds(10 * 1000))); -} \ No newline at end of file From fdfa65373d40e7320806bef6e3d9f5c530621021 Mon Sep 17 00:00:00 2001 From: Jai ASher Date: Mon, 20 Mar 2017 16:27:16 -0700 Subject: [PATCH 3/7] Separated ConsumerType.h --- pulsar-client-cpp/include/pulsar/Consumer.h | 21 ++------------ .../include/pulsar/ConsumerType.h | 28 +++++++++++++++++++ 2 files changed, 30 insertions(+), 19 deletions(-) create mode 100644 pulsar-client-cpp/include/pulsar/ConsumerType.h diff --git a/pulsar-client-cpp/include/pulsar/Consumer.h b/pulsar-client-cpp/include/pulsar/Consumer.h index c321dab59533a..09ae044a33859 100644 --- a/pulsar-client-cpp/include/pulsar/Consumer.h +++ b/pulsar-client-cpp/include/pulsar/Consumer.h @@ -23,6 +23,8 @@ #include #include #include +#include + #pragma GCC visibility push(default) class PulsarFriend; @@ -37,25 +39,6 @@ typedef boost::function ResultCallback; /// Callback definition for MessageListener typedef boost::function MessageListener; -enum ConsumerType { - /** - * There can be only 1 consumer on the same topic with the same consumerName - */ - ConsumerExclusive, - - /** - * Multiple consumers will be able to use the same consumerName and the messages - * will be dispatched according to a round-robin rotation between the connected consumers - */ - ConsumerShared, - - /** Only one consumer is active on the subscription; Subscription can have N consumers - * connected one of which will get promoted to master if the current master becomes inactive - */ - - ConsumerFailover -}; - class BrokerConsumerStats { private: static const long CONSUMER_STATS_TTL_IN_MS = 30 * 1000; // 30 seconds diff --git a/pulsar-client-cpp/include/pulsar/ConsumerType.h b/pulsar-client-cpp/include/pulsar/ConsumerType.h new file mode 100644 index 0000000000000..85f2bb3e1a90e --- /dev/null +++ b/pulsar-client-cpp/include/pulsar/ConsumerType.h @@ -0,0 +1,28 @@ +// +// Created by Jai Asher on 3/20/17. +// + +#ifndef PULSAR_CPP_CONSUMERTYPE_H +#define PULSAR_CPP_CONSUMERTYPE_H + +namespace pulsar { + enum ConsumerType { + /** + * There can be only 1 consumer on the same topic with the same consumerName + */ + ConsumerExclusive, + + /** + * Multiple consumers will be able to use the same consumerName and the messages + * will be dispatched according to a round-robin rotation between the connected consumers + */ + ConsumerShared, + + /** Only one consumer is active on the subscription; Subscription can have N consumers + * connected one of which will get promoted to master if the current master becomes inactive + */ + ConsumerFailover + }; +} + +#endif //PULSAR_CPP_CONSUMERTYPE_H From 0196f39edd4450eb4fb0e5dba2726cf8564615ab Mon Sep 17 00:00:00 2001 From: Jai ASher Date: Mon, 20 Mar 2017 18:15:09 -0700 Subject: [PATCH 4/7] Working so far --- pulsar-client-cpp/include/pulsar/Consumer.h | 62 +-------------------- pulsar-client-cpp/lib/ClientConnection.cc | 11 ++-- pulsar-client-cpp/lib/ClientConnection.h | 5 +- pulsar-client-cpp/lib/Consumer.cc | 51 ++--------------- pulsar-client-cpp/lib/ConsumerImpl.cc | 14 +++-- pulsar-client-cpp/lib/ConsumerImpl.h | 5 +- 6 files changed, 26 insertions(+), 122 deletions(-) diff --git a/pulsar-client-cpp/include/pulsar/Consumer.h b/pulsar-client-cpp/include/pulsar/Consumer.h index 09ae044a33859..d24e1c7aded5d 100644 --- a/pulsar-client-cpp/include/pulsar/Consumer.h +++ b/pulsar-client-cpp/include/pulsar/Consumer.h @@ -24,7 +24,7 @@ #include #include #include - +#include #pragma GCC visibility push(default) class PulsarFriend; @@ -39,65 +39,7 @@ typedef boost::function ResultCallback; /// Callback definition for MessageListener typedef boost::function MessageListener; -class BrokerConsumerStats { - private: - static const long CONSUMER_STATS_TTL_IN_MS = 30 * 1000; // 30 seconds - - /** validTill_ - Stats will be valid till this time.*/ - boost::posix_time::ptime validTill_; - public: - BrokerConsumerStats(); - BrokerConsumerStats(double msgRateOut, double msgThroughputOut, - double msgRateRedeliver, std::string consumerName, uint64_t availablePermits, - uint64_t unackedMessages, bool blockedConsumerOnUnackedMsgs, std::string address, - std::string connectedSince, std::string type, double msgRateExpired, uint64_t msgBacklog); - - /** Returns true if the Message is Expired **/ - bool isValid() const; - - /** Total rate of messages delivered to the consumer. msg/s */ - double msgRateOut_; - - /** Total throughput delivered to the consumer. bytes/s */ - double msgThroughputOut_; - - /** Total rate of messages redelivered by this consumer. msg/s */ - double msgRateRedeliver_; - - /** Name of the consumer */ - std::string consumerName_; - - /** Number of available message permits for the consumer */ - uint64_t availablePermits_; - - /** Number of unacknowledged messages for the consumer */ - uint64_t unackedMessages_; - - /** Flag to verify if consumer is blocked due to reaching threshold of unacked messages */ - bool blockedConsumerOnUnackedMsgs_; - - /** Address of this consumer */ - std::string address_; - - /** Timestamp of connection */ - std::string connectedSince_; - - /** Whether this subscription is Exclusive or Shared or Failover */ - std::string type_; - - /** Total rate of messages expired on this subscription. msg/s */ - double msgRateExpired_; - - /** Number of messages in the subscription backlog */ - uint64_t msgBacklog_; - - friend std::ostream& operator<<(std::ostream& os, const BrokerConsumerStats& obj); -}; - -/// Callback definition for BrokerConsumerStats -typedef boost::function BrokerConsumerStatsCallback; - - /** +/** * Class specifying the configuration of a consumer. */ class ConsumerConfiguration { diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index 1448c08fdc84d..e3c668e0260e6 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -204,7 +204,7 @@ void ClientConnection::handlePulsarConnected(const CommandConnected& cmdConnecte } void ClientConnection::startConsumerStatsTimer(std::vector consumerStatsRequests) { - std::vector > consumerStatsPromises; + std::vector > consumerStatsPromises; Lock lock(mutex_); for (int i = 0; i < consumerStatsRequests.size(); i++) { @@ -714,7 +714,7 @@ void ClientConnection::handleIncomingCommand() { PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap_.find( consumerStatsResponse.request_id()); if (it != pendingConsumerStatsMap_.end()) { - Promise consumerStatsPromise = it->second; + Promise consumerStatsPromise = it->second; pendingConsumerStatsMap_.erase(it); lock.unlock(); @@ -729,8 +729,7 @@ void ClientConnection::handleIncomingCommand() { cnxString_ << "ConsumerStatsResponse command - Received consumer stats response from server. req_id: " << consumerStatsResponse.request_id() << " Stats: "); - BrokerConsumerStats brokerStats = - BrokerConsumerStats(consumerStatsResponse.msgrateout(), + BrokerConsumerStatsImpl brokerStats(consumerStatsResponse.msgrateout(), consumerStatsResponse.msgthroughputout(), consumerStatsResponse.msgrateredeliver(), consumerStatsResponse.consumername(), @@ -920,11 +919,11 @@ void ClientConnection::handleIncomingCommand() { } } -Future +Future ClientConnection::newConsumerStats(const std::string topicName, const std::string subscriptionName, uint64_t consumerId, uint64_t requestId) { Lock lock(mutex_); - Promise promise; + Promise promise; if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString_ << " Client is not connected to the broker"); diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h index 27f16fdd9428b..981e70ac09fbb 100644 --- a/pulsar-client-cpp/lib/ClientConnection.h +++ b/pulsar-client-cpp/lib/ClientConnection.h @@ -40,6 +40,7 @@ #include "UtilAllocator.h" #include #include +#include using namespace pulsar; @@ -132,7 +133,7 @@ class ClientConnection : public boost::enable_shared_from_this Commands::ChecksumType getChecksumType() const; - Future newConsumerStats(const std::string topicName, const std::string subscriptionName, + Future newConsumerStats(const std::string topicName, const std::string subscriptionName, uint64_t consumerId, uint64_t requestId) ; private: struct PendingRequestData { @@ -252,7 +253,7 @@ class ClientConnection : public boost::enable_shared_from_this typedef std::map ConsumersMap; ConsumersMap consumers_; - typedef std::map > PendingConsumerStatsMap; + typedef std::map > PendingConsumerStatsMap; PendingConsumerStatsMap pendingConsumerStatsMap_; diff --git a/pulsar-client-cpp/lib/Consumer.cc b/pulsar-client-cpp/lib/Consumer.cc index 5543a4e0fb81d..ac01b7c364bce 100644 --- a/pulsar-client-cpp/lib/Consumer.cc +++ b/pulsar-client-cpp/lib/Consumer.cc @@ -18,56 +18,12 @@ #include #include "ConsumerImpl.h" #include "Utils.h" -#include +#include namespace pulsar { const std::string EMPTY_STRING; -BrokerConsumerStats::BrokerConsumerStats():validTill_(now()) {}; - -BrokerConsumerStats::BrokerConsumerStats(double msgRateOut, double msgThroughputOut, - double msgRateRedeliver, std::string consumerName, uint64_t availablePermits, - uint64_t unackedMessages, bool blockedConsumerOnUnackedMsgs, std::string address, - std::string connectedSince, std::string type, double msgRateExpired, uint64_t msgBacklog): - validTill_(microsec_clock::universal_time() + milliseconds(CONSUMER_STATS_TTL_IN_MS)), - msgRateOut_(msgRateOut), - msgThroughputOut_(msgThroughputOut), - msgRateRedeliver_(msgRateRedeliver), - consumerName_(consumerName), - availablePermits_(availablePermits), - unackedMessages_(unackedMessages), - blockedConsumerOnUnackedMsgs_(blockedConsumerOnUnackedMsgs), - address_(address), - connectedSince_(connectedSince), - type_(type), - msgRateExpired_(msgRateExpired), - msgBacklog_(msgBacklog) -{} - -bool BrokerConsumerStats::isValid() const { - return now() <= validTill_; -} - -std::ostream& operator<<(std::ostream& os, const BrokerConsumerStats& obj) { - os << "\nBrokerConsumerStats [" - << "validTill_ = " << obj.validTill_ - << ", msgRateOut_ = " << obj.msgRateOut_ - << ", msgThroughputOut_ = " << obj.msgThroughputOut_ - << ", msgRateRedeliver_ = " << obj.msgRateRedeliver_ - << ", consumerName_ = " << obj.consumerName_ - << ", availablePermits_ = " << obj.availablePermits_ - << ", unackedMessages_ = " << obj.unackedMessages_ - << ", blockedConsumerOnUnackedMsgs_ = " << obj.blockedConsumerOnUnackedMsgs_ - << ", address_ = " << obj.address_ - << ", connectedSince_ = " << obj.connectedSince_ - << ", type_ = " << obj.type_ - << ", msgRateExpired_ = " << obj.msgRateExpired_ - << ", msgBacklog_ = " << obj.msgBacklog_ - << "]"; - return os; -} - struct ConsumerConfiguration::Impl { long unAckedMessagesTimeoutMs; ConsumerType consumerType; @@ -311,12 +267,13 @@ Result Consumer::getConsumerStats(BrokerConsumerStats& brokerConsumerStats) { } Promise promise; impl_->getConsumerStatsAsync(WaitForCallbackValue(promise)); - return promise.getFuture().get(brokerConsumerStats);; + return promise.getFuture().get(brokerConsumerStats); } void Consumer::getConsumerStatsAsync(BrokerConsumerStatsCallback callback) { if (!impl_) { - return callback(ResultConsumerNotInitialized, BrokerConsumerStats()); + BrokerConsumerStatsImpl impl; + return callback(ResultConsumerNotInitialized, impl); } return impl_->getConsumerStatsAsync(callback); } diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index 9612bd92affea..ff5e69bd8585b 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -692,7 +692,7 @@ void ConsumerImpl::getConsumerStatsAsync(BrokerConsumerStatsCallback callback) { if (state_ != Ready) { LOG_ERROR(getName() << "Client connection is not open, please try again later.") lock.unlock(); - return callback(ResultConsumerNotInitialized, BrokerConsumerStats()); + return callback(ResultConsumerNotInitialized, brokerConsumerStats_); } if (brokerConsumerStats_.isValid()) { @@ -716,21 +716,25 @@ void ConsumerImpl::getConsumerStatsAsync(BrokerConsumerStatsCallback callback) { return; } else { LOG_ERROR(getName() << " Operation not supported since server protobuf version " << cnx->getServerProtocolVersion() << " is older than proto::v7"); - return callback(ResultUnsupportedVersionError, BrokerConsumerStats()); + return callback(ResultUnsupportedVersionError, brokerConsumerStats_); } } LOG_ERROR(getName() << " Client Connection not ready for Consumer"); - return callback(ResultNotConnected, BrokerConsumerStats()); + return callback(ResultNotConnected, brokerConsumerStats_); } -void ConsumerImpl::brokerConsumerStatsListener(Result res, const BrokerConsumerStats brokerConsumerStats +void ConsumerImpl::brokerConsumerStatsListener(Result res, BrokerConsumerStatsImpl brokerConsumerStats , BrokerConsumerStatsCallback callback) { if (res == ResultOk) { Lock lock(mutex_); brokerConsumerStats_ = brokerConsumerStats; + // TODO - add logic to set expiry time + } + + if (!callback.empty()) { + callback(res, brokerConsumerStats_); } - callback(res, brokerConsumerStats); } } /* namespace pulsar */ diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h index d4b5726e833b0..7c2fe50cd4443 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.h +++ b/pulsar-client-cpp/lib/ConsumerImpl.h @@ -36,6 +36,7 @@ #include #include "BatchAcknowledgementTracker.h" #include +#include using namespace pulsar; @@ -114,7 +115,7 @@ enum ConsumerTopicType { void increaseAvailablePermits(const ClientConnectionPtr& currentCnx); void drainIncomingMessageQueue(size_t count); unsigned int receiveIndividualMessagesFromBatch(Message &batchedMessage); - void brokerConsumerStatsListener(Result, const BrokerConsumerStats, BrokerConsumerStatsCallback); + void brokerConsumerStatsListener(Result, BrokerConsumerStatsImpl, BrokerConsumerStatsCallback); boost::mutex mutexForReceiveWithZeroQueueSize; const ConsumerConfiguration config_; @@ -135,7 +136,7 @@ enum ConsumerTopicType { CompressionCodecProvider compressionCodecProvider_; UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_; BatchAcknowledgementTracker batchAcknowledgementTracker_; - BrokerConsumerStats brokerConsumerStats_; + BrokerConsumerStatsImpl brokerConsumerStats_; }; } /* namespace pulsar */ From 9dde11ce2216649ce0ef036ff2d553bc520ded7c Mon Sep 17 00:00:00 2001 From: Jai ASher Date: Mon, 20 Mar 2017 18:15:37 -0700 Subject: [PATCH 5/7] Working so far --- .../include/pulsar/BrokerConsumerStats.h | 62 ++++++++++ pulsar-client-cpp/lib/BrokerConsumerStats.cc | 76 ++++++++++++ .../lib/BrokerConsumerStatsImpl.cc | 117 ++++++++++++++++++ .../lib/BrokerConsumerStatsImpl.h | 115 +++++++++++++++++ 4 files changed, 370 insertions(+) create mode 100644 pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h create mode 100644 pulsar-client-cpp/lib/BrokerConsumerStats.cc create mode 100644 pulsar-client-cpp/lib/BrokerConsumerStatsImpl.cc create mode 100644 pulsar-client-cpp/lib/BrokerConsumerStatsImpl.h diff --git a/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h b/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h new file mode 100644 index 0000000000000..f3696f5634b61 --- /dev/null +++ b/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h @@ -0,0 +1,62 @@ +// +// Created by Jai Asher on 3/20/17. +// + +#ifndef PULSAR_CPP_BROKERCONSUMERSTATS_H +#define PULSAR_CPP_BROKERCONSUMERSTATS_H + +#include +#include +#include +#include +#include +#include + +namespace pulsar { +class BrokerConsumerStats { + public: + /** Returns true if the Message is Expired **/ + virtual bool isValid() const; + + /** Returns the rate of messages delivered to the consumer. msg/s */ + virtual double getMsgRateOut() const; + + /** Returns the throughput delivered to the consumer. bytes/s */ + virtual double getMsgThroughputOut() const; + + /** Returns the rate of messages redelivered by this consumer. msg/s */ + virtual double getMsgRateRedeliver() const; + + /** Returns the Name of the consumer */ + virtual const std::string &getConsumerName() const; + + /** Returns the Number of available message permits for the consumer */ + virtual uint64_t getAvailablePermits() const; + + /** Returns the Number of unacknowledged messages for the consumer */ + virtual uint64_t getUnackedMessages() const; + + /** Returns true if the consumer is blocked due to unacked messages. */ + virtual bool isBlockedConsumerOnUnackedMsgs() const; + + /** Returns the Address of this consumer */ + virtual const std::string &getAddress() const; + + /** Returns the Timestamp of connection */ + virtual const std::string &getConnectedSince() const; + + /** Returns Whether this subscription is Exclusive or Shared or Failover */ + virtual const ConsumerType &getType() const; + + /** Returns the rate of messages expired on this subscription. msg/s */ + virtual double getMsgRateExpired() const; + + /** Returns the Number of messages in the subscription backlog */ + virtual uint64_t getMsgBacklog() const; + + friend std::ostream &operator<<(std::ostream &os, const BrokerConsumerStats &obj); +}; +typedef boost::function BrokerConsumerStatsCallback; + +} +#endif //PULSAR_CPP_BROKERCONSUMERSTATS_H diff --git a/pulsar-client-cpp/lib/BrokerConsumerStats.cc b/pulsar-client-cpp/lib/BrokerConsumerStats.cc new file mode 100644 index 0000000000000..5185dcc8d2920 --- /dev/null +++ b/pulsar-client-cpp/lib/BrokerConsumerStats.cc @@ -0,0 +1,76 @@ +// +// Created by Jai Asher on 3/20/17. +// +#include + +namespace pulsar { + bool BrokerConsumerStats::isValid() const { + return false; + } + + std::ostream &operator<<(std::ostream &os, const BrokerConsumerStats& obj) { + os << "\nBrokerConsumerStats [" + << ", msgRateOut_ = " << obj.getMsgRateOut() + << ", msgThroughputOut_ = " << obj.getMsgThroughputOut() + << ", msgRateRedeliver_ = " << obj.getMsgRateRedeliver() + << ", consumerName_ = " << obj.getConsumerName() + << ", availablePermits_ = " << obj.getAvailablePermits() + << ", unackedMessages_ = " << obj.getUnackedMessages() + << ", blockedConsumerOnUnackedMsgs_ = " << obj.isBlockedConsumerOnUnackedMsgs() + << ", address_ = " << obj.getAddress() + << ", connectedSince_ = " << obj.getConnectedSince() + << ", type_ = " << obj.getType() + << ", msgRateExpired_ = " << obj.getMsgRateExpired() + << ", msgBacklog_ = " << obj.getMsgBacklog() + << "]"; + return os; + } + + double BrokerConsumerStats::getMsgRateOut() const { + return 0; + } + + double BrokerConsumerStats::getMsgThroughputOut() const { + return 0; + } + + double BrokerConsumerStats::getMsgRateRedeliver() const { + return 0; + } + + const std::string& BrokerConsumerStats::getConsumerName() const { + return ""; + } + + uint64_t BrokerConsumerStats::getAvailablePermits() const { + return 0; + } + + uint64_t BrokerConsumerStats::getUnackedMessages() const { + return 0; + } + + bool BrokerConsumerStats::isBlockedConsumerOnUnackedMsgs() const { + return false; + } + + const std::string& BrokerConsumerStats::getAddress() const { + return ""; + } + + const std::string& BrokerConsumerStats::getConnectedSince() const { + return ""; + } + + const ConsumerType& BrokerConsumerStats::getType() const { + return ConsumerExclusive; + } + + double BrokerConsumerStats::getMsgRateExpired() const { + return 0; + } + + uint64_t BrokerConsumerStats::getMsgBacklog() const { + return 0; + } +} \ No newline at end of file diff --git a/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.cc b/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.cc new file mode 100644 index 0000000000000..138cb63775741 --- /dev/null +++ b/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.cc @@ -0,0 +1,117 @@ +// +// Created by Jai Asher on 3/20/17. +// +#include +#include + +namespace pulsar { + + const std::string BrokerConsumerStatsImpl::DELIMITER = ";"; + + BrokerConsumerStatsImpl::BrokerConsumerStatsImpl() : validTill_(boost::posix_time::microsec_clock::universal_time()) {}; + + BrokerConsumerStatsImpl::BrokerConsumerStatsImpl(double msgRateOut, double msgThroughputOut, + double msgRateRedeliver, std::string consumerName, + uint64_t availablePermits, + uint64_t unackedMessages, bool blockedConsumerOnUnackedMsgs, + std::string address, + std::string connectedSince, const std::string& type, + double msgRateExpired, uint64_t msgBacklog) : + msgRateOut_(msgRateOut), + msgThroughputOut_(msgThroughputOut), + msgRateRedeliver_(msgRateRedeliver), + consumerName_(consumerName), + availablePermits_(availablePermits), + unackedMessages_(unackedMessages), + blockedConsumerOnUnackedMsgs_(blockedConsumerOnUnackedMsgs), + address_(address), + connectedSince_(connectedSince), + type_(convertStringToConsumerType(type)), + msgRateExpired_(msgRateExpired), + msgBacklog_(msgBacklog) {} + + bool BrokerConsumerStatsImpl::isValid() const { + return boost::posix_time::microsec_clock::universal_time() <= validTill_; + } + + std::ostream &operator<<(std::ostream &os, const BrokerConsumerStatsImpl& obj) { + os << "\nBrokerConsumerStatsImpl [" + << "validTill_ = " << obj.validTill_ + << ", msgRateOut_ = " << obj.getMsgRateOut() + << ", msgThroughputOut_ = " << obj.getMsgThroughputOut() + << ", msgRateRedeliver_ = " << obj.getMsgRateRedeliver() + << ", consumerName_ = " << obj.getConsumerName() + << ", availablePermits_ = " << obj.getAvailablePermits() + << ", unackedMessages_ = " << obj.getUnackedMessages() + << ", blockedConsumerOnUnackedMsgs_ = " << obj.isBlockedConsumerOnUnackedMsgs() + << ", address_ = " << obj.getAddress() + << ", connectedSince_ = " << obj.getConnectedSince() + << ", type_ = " << obj.getType() + << ", msgRateExpired_ = " << obj.getMsgRateExpired() + << ", msgBacklog_ = " << obj.getMsgBacklog() + << "]"; + return os; + } + + double BrokerConsumerStatsImpl::getMsgRateOut() const { + return msgRateOut_; + } + + double BrokerConsumerStatsImpl::getMsgThroughputOut() const { + return msgThroughputOut_; + } + + double BrokerConsumerStatsImpl::getMsgRateRedeliver() const { + return msgRateRedeliver_; + } + + const std::string& BrokerConsumerStatsImpl::getConsumerName() const { + return consumerName_; + } + + uint64_t BrokerConsumerStatsImpl::getAvailablePermits() const { + return availablePermits_; + } + + uint64_t BrokerConsumerStatsImpl::getUnackedMessages() const { + return unackedMessages_; + } + + bool BrokerConsumerStatsImpl::isBlockedConsumerOnUnackedMsgs() const { + return blockedConsumerOnUnackedMsgs_; + } + + const std::string& BrokerConsumerStatsImpl::getAddress() const { + return address_; + } + + const std::string& BrokerConsumerStatsImpl::getConnectedSince() const { + return connectedSince_; + } + + const ConsumerType& BrokerConsumerStatsImpl::getType() const { + return type_; + } + + double BrokerConsumerStatsImpl::getMsgRateExpired() const { + return msgRateExpired_; + } + + uint64_t BrokerConsumerStatsImpl::getMsgBacklog() const { + return msgBacklog_; + } + + void BrokerConsumerStatsImpl::setCacheTime(uint64_t cacehTimeInMs) { + validTill_ = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(cacehTimeInMs); + } + + ConsumerType BrokerConsumerStatsImpl::convertStringToConsumerType(const std::string& str) { + if (str == "ConsumerFailover" || str == "Failover") { + return ConsumerFailover; + } else if (str == "ConsumerShared" || str == "Shared") { + return ConsumerShared; + } else { + return ConsumerExclusive; + } + } +} \ No newline at end of file diff --git a/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.h b/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.h new file mode 100644 index 0000000000000..9a77d2b835383 --- /dev/null +++ b/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.h @@ -0,0 +1,115 @@ +// +// Created by Jai Asher on 3/20/17. +// + +#ifndef PULSAR_CPP_BROKERCONSUMERSTATSIMPL_H +#define PULSAR_CPP_BROKERCONSUMERSTATSIMPL_H + +#include +#include +#include +#include +#include +#include + +namespace pulsar { +class BrokerConsumerStatsImpl : public BrokerConsumerStats { + private: + static const std::string DELIMITER; + + /** validTill_ - Stats will be valid till this time.*/ + boost::posix_time::ptime validTill_; + + /** Total rate of messages delivered to the consumer. msg/s */ + double msgRateOut_; + + /** Total throughput delivered to the consumer. bytes/s */ + double msgThroughputOut_; + + /** Total rate of messages redelivered by this consumer. msg/s */ + double msgRateRedeliver_; + + /** Name of the consumer */ + std::string consumerName_; + + /** Number of available message permits for the consumer */ + uint64_t availablePermits_; + + /** Number of unacknowledged messages for the consumer */ + uint64_t unackedMessages_; + + /** Flag to verify if consumer is blocked due to reaching threshold of unacked messages */ + bool blockedConsumerOnUnackedMsgs_; + + /** Address of this consumer */ + std::string address_; + + /** Timestamp of connection */ + std::string connectedSince_; + + /** Whether this subscription is Exclusive or Shared or Failover */ + ConsumerType type_; + + /** Total rate of messages expired on this subscription. msg/s */ + double msgRateExpired_; + + /** Number of messages in the subscription backlog */ + uint64_t msgBacklog_; + +public: + + BrokerConsumerStatsImpl(); + + BrokerConsumerStatsImpl(double msgRateOut, double msgThroughputOut, double msgRateRedeliver, + std::string consumerName, uint64_t availablePermits, + uint64_t unackedMessages, bool blockedConsumerOnUnackedMsgs, + std::string address, std::string connectedSince, const std::string& type, + double msgRateExpired, uint64_t msgBacklog); + + /** Returns true if the Message is Expired **/ + virtual bool isValid() const; + + /** Returns the rate of messages delivered to the consumer. msg/s */ + virtual double getMsgRateOut() const; + + /** Returns the throughput delivered to the consumer. bytes/s */ + virtual double getMsgThroughputOut() const; + + /** Returns the rate of messages redelivered by this consumer. msg/s */ + virtual double getMsgRateRedeliver() const; + + /** Returns the Name of the consumer */ + virtual const std::string &getConsumerName() const; + + /** Returns the Number of available message permits for the consumer */ + virtual uint64_t getAvailablePermits() const; + + /** Returns the Number of unacknowledged messages for the consumer */ + virtual uint64_t getUnackedMessages() const; + + /** Returns true if the consumer is blocked due to unacked messages. */ + virtual bool isBlockedConsumerOnUnackedMsgs() const; + + /** Returns the Address of this consumer */ + virtual const std::string &getAddress() const; + + /** Returns the Timestamp of connection */ + virtual const std::string &getConnectedSince() const; + + /** Returns Whether this subscription is Exclusive or Shared or Failover */ + virtual const ConsumerType &getType() const; + + /** Returns the rate of messages expired on this subscription. msg/s */ + virtual double getMsgRateExpired() const; + + /** Returns the Number of messages in the subscription backlog */ + virtual uint64_t getMsgBacklog() const; + + void setCacheTime(uint64_t cacehTimeInMs); + + friend std::ostream &operator<<(std::ostream &os, const BrokerConsumerStatsImpl &obj); + + static ConsumerType convertStringToConsumerType(const std::string& str); +}; +} +#endif //PULSAR_CPP_BROKERCONSUMERSTATSIMPL_H From f852087ac4a9d57daf263d3f8a5746c8a781d73f Mon Sep 17 00:00:00 2001 From: Jai ASher Date: Mon, 20 Mar 2017 22:30:49 -0700 Subject: [PATCH 6/7] Need to figure out how to handle future for partitioned topics --- .../include/pulsar/BrokerConsumerStats.h | 10 +- pulsar-client-cpp/lib/BrokerConsumerStats.cc | 11 +- .../lib/BrokerConsumerStatsImpl.cc | 15 +- .../lib/BrokerConsumerStatsImpl.h | 12 +- .../lib/PartitionedBrokerConsumerStatsImpl.cc | 154 ++++++++++++++++++ .../lib/PartitionedBrokerConsumerStatsImpl.h | 74 +++++++++ .../lib/PartitionedConsumerImpl.cc | 7 +- 7 files changed, 252 insertions(+), 31 deletions(-) create mode 100644 pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.cc create mode 100644 pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.h diff --git a/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h b/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h index f3696f5634b61..23232661bcab8 100644 --- a/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h +++ b/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h @@ -28,7 +28,7 @@ class BrokerConsumerStats { virtual double getMsgRateRedeliver() const; /** Returns the Name of the consumer */ - virtual const std::string &getConsumerName() const; + virtual const std::string getConsumerName() const; /** Returns the Number of available message permits for the consumer */ virtual uint64_t getAvailablePermits() const; @@ -40,13 +40,13 @@ class BrokerConsumerStats { virtual bool isBlockedConsumerOnUnackedMsgs() const; /** Returns the Address of this consumer */ - virtual const std::string &getAddress() const; + virtual const std::string getAddress() const; /** Returns the Timestamp of connection */ - virtual const std::string &getConnectedSince() const; + virtual const std::string getConnectedSince() const; /** Returns Whether this subscription is Exclusive or Shared or Failover */ - virtual const ConsumerType &getType() const; + virtual const ConsumerType getType() const; /** Returns the rate of messages expired on this subscription. msg/s */ virtual double getMsgRateExpired() const; @@ -54,7 +54,7 @@ class BrokerConsumerStats { /** Returns the Number of messages in the subscription backlog */ virtual uint64_t getMsgBacklog() const; - friend std::ostream &operator<<(std::ostream &os, const BrokerConsumerStats &obj); + friend std::ostream& operator<<(std::ostream &os, const BrokerConsumerStats &obj); }; typedef boost::function BrokerConsumerStatsCallback; diff --git a/pulsar-client-cpp/lib/BrokerConsumerStats.cc b/pulsar-client-cpp/lib/BrokerConsumerStats.cc index 5185dcc8d2920..249bcb8f4feb8 100644 --- a/pulsar-client-cpp/lib/BrokerConsumerStats.cc +++ b/pulsar-client-cpp/lib/BrokerConsumerStats.cc @@ -8,8 +8,9 @@ namespace pulsar { return false; } - std::ostream &operator<<(std::ostream &os, const BrokerConsumerStats& obj) { + std::ostream& operator<<(std::ostream &os, const BrokerConsumerStats& obj) { os << "\nBrokerConsumerStats [" + << "validTill_ = " << obj.isValid() << ", msgRateOut_ = " << obj.getMsgRateOut() << ", msgThroughputOut_ = " << obj.getMsgThroughputOut() << ", msgRateRedeliver_ = " << obj.getMsgRateRedeliver() @@ -38,7 +39,7 @@ namespace pulsar { return 0; } - const std::string& BrokerConsumerStats::getConsumerName() const { + const std::string BrokerConsumerStats::getConsumerName() const { return ""; } @@ -54,15 +55,15 @@ namespace pulsar { return false; } - const std::string& BrokerConsumerStats::getAddress() const { + const std::string BrokerConsumerStats::getAddress() const { return ""; } - const std::string& BrokerConsumerStats::getConnectedSince() const { + const std::string BrokerConsumerStats::getConnectedSince() const { return ""; } - const ConsumerType& BrokerConsumerStats::getType() const { + const ConsumerType BrokerConsumerStats::getType() const { return ConsumerExclusive; } diff --git a/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.cc b/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.cc index 138cb63775741..fe6e204132c34 100644 --- a/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.cc +++ b/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.cc @@ -5,9 +5,6 @@ #include namespace pulsar { - - const std::string BrokerConsumerStatsImpl::DELIMITER = ";"; - BrokerConsumerStatsImpl::BrokerConsumerStatsImpl() : validTill_(boost::posix_time::microsec_clock::universal_time()) {}; BrokerConsumerStatsImpl::BrokerConsumerStatsImpl(double msgRateOut, double msgThroughputOut, @@ -34,9 +31,9 @@ namespace pulsar { return boost::posix_time::microsec_clock::universal_time() <= validTill_; } - std::ostream &operator<<(std::ostream &os, const BrokerConsumerStatsImpl& obj) { + std::ostream& operator<<(std::ostream &os, const BrokerConsumerStatsImpl& obj) { os << "\nBrokerConsumerStatsImpl [" - << "validTill_ = " << obj.validTill_ + << "validTill_ = " << obj.isValid() << ", msgRateOut_ = " << obj.getMsgRateOut() << ", msgThroughputOut_ = " << obj.getMsgThroughputOut() << ", msgRateRedeliver_ = " << obj.getMsgRateRedeliver() @@ -65,7 +62,7 @@ namespace pulsar { return msgRateRedeliver_; } - const std::string& BrokerConsumerStatsImpl::getConsumerName() const { + const std::string BrokerConsumerStatsImpl::getConsumerName() const { return consumerName_; } @@ -81,15 +78,15 @@ namespace pulsar { return blockedConsumerOnUnackedMsgs_; } - const std::string& BrokerConsumerStatsImpl::getAddress() const { + const std::string BrokerConsumerStatsImpl::getAddress() const { return address_; } - const std::string& BrokerConsumerStatsImpl::getConnectedSince() const { + const std::string BrokerConsumerStatsImpl::getConnectedSince() const { return connectedSince_; } - const ConsumerType& BrokerConsumerStatsImpl::getType() const { + const ConsumerType BrokerConsumerStatsImpl::getType() const { return type_; } diff --git a/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.h b/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.h index 9a77d2b835383..67fb47b36a70d 100644 --- a/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.h +++ b/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.h @@ -15,8 +15,6 @@ namespace pulsar { class BrokerConsumerStatsImpl : public BrokerConsumerStats { private: - static const std::string DELIMITER; - /** validTill_ - Stats will be valid till this time.*/ boost::posix_time::ptime validTill_; @@ -79,7 +77,7 @@ class BrokerConsumerStatsImpl : public BrokerConsumerStats { virtual double getMsgRateRedeliver() const; /** Returns the Name of the consumer */ - virtual const std::string &getConsumerName() const; + virtual const std::string getConsumerName() const; /** Returns the Number of available message permits for the consumer */ virtual uint64_t getAvailablePermits() const; @@ -91,13 +89,13 @@ class BrokerConsumerStatsImpl : public BrokerConsumerStats { virtual bool isBlockedConsumerOnUnackedMsgs() const; /** Returns the Address of this consumer */ - virtual const std::string &getAddress() const; + virtual const std::string getAddress() const; /** Returns the Timestamp of connection */ - virtual const std::string &getConnectedSince() const; + virtual const std::string getConnectedSince() const; /** Returns Whether this subscription is Exclusive or Shared or Failover */ - virtual const ConsumerType &getType() const; + virtual const ConsumerType getType() const; /** Returns the rate of messages expired on this subscription. msg/s */ virtual double getMsgRateExpired() const; @@ -107,7 +105,7 @@ class BrokerConsumerStatsImpl : public BrokerConsumerStats { void setCacheTime(uint64_t cacehTimeInMs); - friend std::ostream &operator<<(std::ostream &os, const BrokerConsumerStatsImpl &obj); + friend std::ostream& operator<<(std::ostream &os, const BrokerConsumerStatsImpl &obj); static ConsumerType convertStringToConsumerType(const std::string& str); }; diff --git a/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.cc b/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.cc new file mode 100644 index 0000000000000..37d45b0d03fa7 --- /dev/null +++ b/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.cc @@ -0,0 +1,154 @@ +// +// Created by Jai Asher on 3/20/17. +// +#include +#include +#include +#include + +namespace pulsar { + + const std::string PartitionedBrokerConsumerStatsImpl::DELIMITER = ";"; + + PartitionedBrokerConsumerStatsImpl::PartitionedBrokerConsumerStatsImpl(size_t size) { + statsList_.resize(size); + } + + bool PartitionedBrokerConsumerStatsImpl::isValid() const { + bool isValid = true; + for (int i = 0; i +#include +#include +#include +#include +#include +#include + +namespace pulsar { +class PartitionedBrokerConsumerStatsImpl : public BrokerConsumerStats { + private: + std::vector statsList_; + static const std::string DELIMITER; + public: + + PartitionedBrokerConsumerStatsImpl(size_t size); + + /** Returns true if the Message is Expired **/ + virtual bool isValid() const; + + /** Returns the rate of messages delivered to the consumer. msg/s */ + virtual double getMsgRateOut() const; + + /** Returns the throughput delivered to the consumer. bytes/s */ + virtual double getMsgThroughputOut() const; + + /** Returns the rate of messages redelivered by this consumer. msg/s */ + virtual double getMsgRateRedeliver() const; + + /** Returns the Name of the consumer */ + virtual const std::string getConsumerName() const; + + /** Returns the Number of available message permits for the consumer */ + virtual uint64_t getAvailablePermits() const; + + /** Returns the Number of unacknowledged messages for the consumer */ + virtual uint64_t getUnackedMessages() const; + + /** Returns true if the consumer is blocked due to unacked messages. */ + virtual bool isBlockedConsumerOnUnackedMsgs() const; + + /** Returns the Address of this consumer */ + virtual const std::string getAddress() const; + + /** Returns the Timestamp of connection */ + virtual const std::string getConnectedSince() const; + + /** Returns Whether this subscription is Exclusive or Shared or Failover */ + virtual const ConsumerType getType() const; + + /** Returns the rate of messages expired on this subscription. msg/s */ + virtual double getMsgRateExpired() const; + + /** Returns the Number of messages in the subscription backlog */ + virtual uint64_t getMsgBacklog() const; + + /** Returns the BrokerConsumerStatsImpl at of ith partition */ + BrokerConsumerStatsImpl getBrokerConsumerStats(int index); + + void add(BrokerConsumerStatsImpl stats, int index); + + void clear(); + + friend std::ostream& operator<<(std::ostream &os, const PartitionedBrokerConsumerStatsImpl &obj); +}; +} +#endif //PULSAR_CPP_BROKERCONSUMERSTATSIMPL_H diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc index 1e7f0ee23d056..36c6407c1b993 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc @@ -15,11 +15,7 @@ */ #include "PartitionedConsumerImpl.h" -#include "LogUtils.h" -#include -#include "pulsar/Result.h" -#include "MessageImpl.h" -#include "Utils.h" +#include DECLARE_LOG_OBJECT() @@ -382,6 +378,7 @@ namespace pulsar { void PartitionedConsumerImpl::getConsumerStatsAsync(BrokerConsumerStatsCallback callback) { Lock lock(mutex_); + PartitionedBrokerConsumerStatsImpl stats; // TODO - think about this code change /* if (partitionIndex >= numPartitions_ || partitionIndex < 0 || partitionIndex >= consumers_.size()) From cfe19e9d1d7f2814dd3dfdafb802a69434af53cb Mon Sep 17 00:00:00 2001 From: Jai ASher Date: Tue, 21 Mar 2017 16:06:01 -0700 Subject: [PATCH 7/7] Able to compile --- .../include/pulsar/BrokerConsumerStats.h | 1 + pulsar-client-cpp/lib/Consumer.cc | 25 +- pulsar-client-cpp/lib/ConsumerImpl.cc | 5 +- .../lib/PartitionedBrokerConsumerStatsImpl.h | 4 + .../lib/PartitionedConsumerImpl.cc | 39 ++- .../lib/PartitionedConsumerImpl.h | 5 + pulsar-client-cpp/lib/ProducerImpl.cc | 2 +- pulsar-client-cpp/tests/ConsumerStatsTest.cc | 299 ++++++++++++++++++ 8 files changed, 363 insertions(+), 17 deletions(-) create mode 100644 pulsar-client-cpp/tests/ConsumerStatsTest.cc diff --git a/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h b/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h index 23232661bcab8..e24606059b2d5 100644 --- a/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h +++ b/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h @@ -56,6 +56,7 @@ class BrokerConsumerStats { friend std::ostream& operator<<(std::ostream &os, const BrokerConsumerStats &obj); }; +typedef boost::shared_ptr BrokerConsumerStatsPtr; typedef boost::function BrokerConsumerStatsCallback; } diff --git a/pulsar-client-cpp/lib/Consumer.cc b/pulsar-client-cpp/lib/Consumer.cc index ac01b7c364bce..36b135ab5b555 100644 --- a/pulsar-client-cpp/lib/Consumer.cc +++ b/pulsar-client-cpp/lib/Consumer.cc @@ -19,6 +19,7 @@ #include "ConsumerImpl.h" #include "Utils.h" #include +#include namespace pulsar { @@ -261,13 +262,31 @@ void Consumer::redeliverUnacknowledgedMessages() { } } +static void listener(Result result, BrokerConsumerStats& brokerConsumerStats, + BrokerConsumerStats* stats, Result &res, LatchPtr latchPtr) { + std::cout<<"JAI 1: "<<*stats; + std::cout<<"JAI 2: "<countdown(); +} + Result Consumer::getConsumerStats(BrokerConsumerStats& brokerConsumerStats) { if (!impl_) { return ResultConsumerNotInitialized; } - Promise promise; - impl_->getConsumerStatsAsync(WaitForCallbackValue(promise)); - return promise.getFuture().get(brokerConsumerStats); + // Can't use promises or future here since it leads to data being copied which leads to object splicing + Result res; + LatchPtr latchPtr = boost::make_shared(1); + getConsumerStatsAsync(boost::bind(listener, _1, _2, &brokerConsumerStats, boost::ref(res), latchPtr)); + latchPtr->wait(); + std::cout<<"JAI 2: "< #include #include +#include +#include namespace pulsar { class PartitionedBrokerConsumerStatsImpl : public BrokerConsumerStats { @@ -70,5 +72,7 @@ class PartitionedBrokerConsumerStatsImpl : public BrokerConsumerStats { friend std::ostream& operator<<(std::ostream &os, const PartitionedBrokerConsumerStatsImpl &obj); }; +typedef boost::shared_ptr PartitionedBrokerConsumerStatsPtr; + } #endif //PULSAR_CPP_BROKERCONSUMERSTATSIMPL_H diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc index 36c6407c1b993..917b765672e9e 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc @@ -15,7 +15,6 @@ */ #include "PartitionedConsumerImpl.h" -#include DECLARE_LOG_OBJECT() @@ -378,19 +377,37 @@ namespace pulsar { void PartitionedConsumerImpl::getConsumerStatsAsync(BrokerConsumerStatsCallback callback) { Lock lock(mutex_); - PartitionedBrokerConsumerStatsImpl stats; - // TODO - think about this code change - /* - if (partitionIndex >= numPartitions_ || partitionIndex < 0 || partitionIndex >= consumers_.size()) - { + PartitionedBrokerConsumerStatsPtr statsPtr = boost::make_shared(numPartitions_); + if (numPartitions_ != consumers_.size()) { lock.unlock(); - LOG_ERROR(getName() << " PartitionIndex must be positive and less than number of partitions") - return callback(ResultInvalidConfiguration, BrokerConsumerStats()); + return callback(ResultConsumerNotInitialized, *statsPtr); } - ConsumerImplPtr c = consumers_[partitionIndex]; + LatchPtr latchPtr = boost::make_shared(numPartitions_); + ConsumerList consumerList = consumers_; lock.unlock(); - return c->getConsumerStatsAsync(callback); - */ + for (int i = 0; igetConsumerStatsAsync(boost::bind(&PartitionedConsumerImpl::handleGetConsumerStats, + shared_from_this(), _1, _2, latchPtr, + statsPtr, i, callback)); + } + + } + + void PartitionedConsumerImpl::handleGetConsumerStats(Result res, BrokerConsumerStats& brokerConsumerStats, + LatchPtr latchPtr, PartitionedBrokerConsumerStatsPtr statsPtr, + size_t index, BrokerConsumerStatsCallback callback) { + Lock lock(mutex_); + if (res == ResultOk) { + latchPtr->countdown(); + statsPtr->add((BrokerConsumerStatsImpl&)brokerConsumerStats, index); + } else { + lock.unlock(); + return callback(res, *statsPtr); + } + if (latchPtr->getCount() == 0) { + lock.unlock(); + callback(ResultOk, *statsPtr); + } } } diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h index d3328cc0c28af..3f7c0fb77b57f 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h @@ -25,6 +25,9 @@ #include "boost/enable_shared_from_this.hpp" #include "ConsumerImplBase.h" #include "lib/UnAckedMessageTrackerDisabled.h" +#include +#include + namespace pulsar { class PartitionedConsumerImpl; class PartitionedConsumerImpl: public ConsumerImplBase, public boost::enable_shared_from_this { @@ -61,6 +64,8 @@ namespace pulsar { virtual const std::string& getName() const; virtual int getNumOfPrefetchedMessages() const ; virtual void getConsumerStatsAsync(BrokerConsumerStatsCallback callback); + void handleGetConsumerStats(Result , BrokerConsumerStats&, LatchPtr, + PartitionedBrokerConsumerStatsPtr, size_t, BrokerConsumerStatsCallback); private: const ClientImplPtr client_; diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index 3225dee3daf29..02868cf8786ee 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -281,7 +281,7 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) { // reserving a spot and going forward - not blocking if (!conf_.getBlockIfQueueFull() && !pendingMessagesQueue_.tryReserve(1)) { LOG_DEBUG(getName() << " - Producer Queue is full"); - // If queue is full sending the batch immediately, no point waiting till batchMessageimeout + // If queue is full sending the batch immediately, no point waiting till batchMessagetimeout if (batchMessageContainer) { LOG_DEBUG(getName() << " - sending batch message immediately"); batchMessageContainer->sendMessage(); diff --git a/pulsar-client-cpp/tests/ConsumerStatsTest.cc b/pulsar-client-cpp/tests/ConsumerStatsTest.cc new file mode 100644 index 0000000000000..917eaa8df2ead --- /dev/null +++ b/pulsar-client-cpp/tests/ConsumerStatsTest.cc @@ -0,0 +1,299 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include "boost/date_time/posix_time/posix_time.hpp" +#include "CustomRoutingPolicy.h" +#include +#include "lib/Future.h" +#include "lib/Utils.h" +#include "PulsarFriend.h" +#include "ConsumerTest.h" +#include "HttpHelper.h" +#include +#include +DECLARE_LOG_OBJECT(); + +using namespace pulsar; + +static std::string lookupUrl = "pulsar://localhost:8885"; +static std::string adminUrl = "http://localhost:8765/"; + +void callbackFunction(Result result, BrokerConsumerStats& brokerConsumerStats, long expectedBacklog, Latch& latch, int index) { + PartitionedBrokerConsumerStatsImpl stats = (PartitionedBrokerConsumerStatsImpl&) brokerConsumerStats; + LOG_DEBUG(stats); + ASSERT_EQ(expectedBacklog, stats.getBrokerConsumerStats(index).getMsgBacklog()); + latch.countdown(); +} + +void simpleCallbackFunction(Result result, BrokerConsumerStats& brokerConsumerStats, Result expectedResult, + uint64_t expectedBacklog, ConsumerType expectedConsumerType) { + LOG_DEBUG(brokerConsumerStats); + ASSERT_EQ(result, expectedResult); + ASSERT_EQ(brokerConsumerStats.getMsgBacklog(), expectedBacklog); + ASSERT_EQ(brokerConsumerStats.getType(), expectedConsumerType); +} +TEST(ConsumerStatsTest, testBacklogInfo) { + long epochTime=time(NULL); + std::string testName="testBacklogInfo-" + boost::lexical_cast(epochTime); + Client client(lookupUrl); + std::string topicName = "persistent://property/cluster/namespace/" + testName; + std::string subName = "subscription-name"; + Consumer consumer; + Promise consumerPromise; + client.subscribeAsync(topicName, subName, WaitForCallbackValue(consumerPromise)); + Future consumerFuture = consumerPromise.getFuture(); + Result result = consumerFuture.get(consumer); + ASSERT_EQ(ResultOk, result); + + // handling dangling subscriptions + consumer.unsubscribe(); + client.subscribe(topicName, subName, consumer); + + // Producing messages + Producer producer; + int numOfMessages = 10; + Promise producerPromise; + client.createProducerAsync(topicName, WaitForCallbackValue(producerPromise)); + Future producerFuture = producerPromise.getFuture(); + result = producerFuture.get(producer); + ASSERT_EQ(ResultOk, result); + + std::string prefix = testName + "-"; + for (int i = 0; i(i); + Message msg = MessageBuilder().build(); + producer.send(msg); + } + + LOG_DEBUG("Calling consumer.getConsumerStats"); + BrokerConsumerStats consumerStats; + consumer.getConsumerStatsAsync(boost::bind(simpleCallbackFunction, _1, _2, ResultOk, numOfMessages, ConsumerExclusive)); + + for (int i = numOfMessages; i<(numOfMessages*2); i++) { + std::string messageContent = prefix + boost::lexical_cast(i); + Message msg = MessageBuilder().build(); + producer.send(msg); + } + + usleep(35 * 1000 * 1000); + Result res = consumer.getConsumerStats(consumerStats); + ASSERT_EQ(res, ResultOk); + + LOG_DEBUG(consumerStats); + ASSERT_EQ(consumerStats.getMsgBacklog(), 2 * numOfMessages); + ASSERT_EQ(consumerStats.getType(), ConsumerExclusive); + consumer.unsubscribe(); +} + +TEST(ConsumerStatsTest, testFailure) { + long epochTime=time(NULL); + std::string testName="testFailure-" + boost::lexical_cast(epochTime); + Client client(lookupUrl); + std::string topicName = "persistent://property/cluster/namespace/" + testName; + std::string subName = "subscription-name"; + Consumer consumer; + Promise consumerPromise; + BrokerConsumerStats consumerStats; + client.subscribeAsync(topicName, subName, WaitForCallbackValue(consumerPromise)); + ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); + Future consumerFuture = consumerPromise.getFuture(); + Result result = consumerFuture.get(consumer); + ASSERT_EQ(ResultOk, result); + + // handling dangling subscriptions + consumer.unsubscribe(); + ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); + client.subscribe(topicName, subName, consumer); + + // Producing messages + Producer producer; + int numOfMessages = 5; + Promise producerPromise; + client.createProducerAsync(topicName, WaitForCallbackValue(producerPromise)); + Future producerFuture = producerPromise.getFuture(); + result = producerFuture.get(producer); + ASSERT_EQ(ResultOk, result); + + std::string prefix = testName + "-"; + for (int i = 0; i(i); + Message msg = MessageBuilder().build(); + producer.send(msg); + } + + ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats)); + + LOG_DEBUG(consumerStats); + ASSERT_EQ(consumerStats.getMsgBacklog(), numOfMessages); + + consumer.unsubscribe(); + ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); +} + +TEST(ConsumerStatsTest, testCachingMechanism) { + long epochTime=time(NULL); + std::string testName="testCachingMechanism-" + boost::lexical_cast(epochTime); + Client client(lookupUrl); + std::string topicName = "persistent://property/cluster/namespace/" + testName; + std::string subName = "subscription-name"; + Consumer consumer; + Promise consumerPromise; + BrokerConsumerStats consumerStats; + client.subscribeAsync(topicName, subName, WaitForCallbackValue(consumerPromise)); + ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); + Future consumerFuture = consumerPromise.getFuture(); + Result result = consumerFuture.get(consumer); + ASSERT_EQ(ResultOk, result); + + // handling dangling subscriptions + consumer.unsubscribe(); + ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); + client.subscribe(topicName, subName, consumer); + + // Producing messages + Producer producer; + int numOfMessages = 5; + Promise producerPromise; + client.createProducerAsync(topicName, WaitForCallbackValue(producerPromise)); + Future producerFuture = producerPromise.getFuture(); + result = producerFuture.get(producer); + ASSERT_EQ(ResultOk, result); + + std::string prefix = testName + "-"; + for (int i = 0; i(i); + Message msg = MessageBuilder().build(); + producer.send(msg); + } + + ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats)); + + LOG_DEBUG(consumerStats); + ASSERT_EQ(consumerStats.getMsgBacklog(), numOfMessages); + + for (int i = numOfMessages; i<(numOfMessages*2); i++) { + std::string messageContent = prefix + boost::lexical_cast(i); + Message msg = MessageBuilder().build(); + producer.send(msg); + } + + LOG_DEBUG("Expecting cached results"); + ASSERT_TRUE(consumerStats.isValid()); + ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats)); + LOG_DEBUG(consumerStats); + ASSERT_EQ(consumerStats.getMsgBacklog(), numOfMessages); + + LOG_DEBUG("Still Expecting cached results"); + usleep(10 * 1000 * 1000); + ASSERT_TRUE(consumerStats.isValid()); + ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats)); + + LOG_DEBUG(consumerStats); + ASSERT_EQ(consumerStats.getMsgBacklog(), numOfMessages); + + LOG_DEBUG("Now expecting new results"); + usleep(25 * 1000 * 1000); + ASSERT_FALSE(consumerStats.isValid()); + ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats)); + + LOG_DEBUG(consumerStats); + ASSERT_EQ(consumerStats.getMsgBacklog(), numOfMessages * 2); + + consumer.unsubscribe(); + ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); +} + + +TEST(ConsumerStatsTest, testAsyncCallOnPartitionedTopic) { + long epochTime=time(NULL); + std::string testName="testAsyncCallOnPartitionedTopic-" + boost::lexical_cast(epochTime); + Client client(lookupUrl); + std::string topicName = "persistent://property/cluster/namespace/" + testName; + std::string subName = "subscription-name"; + + // call admin api to create partitioned topics + std::string url = adminUrl + "admin/persistent/property/cluster/namespace/" + testName + "/partitions"; + int res = makePutRequest(url, "7"); + + LOG_INFO("res = "< consumerPromise; + BrokerConsumerStats consumerStats; + client.subscribeAsync(topicName, subName, WaitForCallbackValue(consumerPromise)); + ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); + Future consumerFuture = consumerPromise.getFuture(); + Result result = consumerFuture.get(consumer); + ASSERT_EQ(ResultOk, result); + + // handling dangling subscriptions + consumer.unsubscribe(); + ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); + client.subscribe(topicName, subName, consumer); + + // Producing messages + Producer producer; + int numOfMessages = 7 * 5; // 5 message per partition + Promise producerPromise; + ProducerConfiguration config; + config.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution); + client.createProducerAsync(topicName, config, WaitForCallbackValue(producerPromise)); + Future producerFuture = producerPromise.getFuture(); + result = producerFuture.get(producer); + ASSERT_EQ(ResultOk, result); + + std::string prefix = testName + "-"; + for (int i = 0; i(i); + Message msg = MessageBuilder().build(); + producer.send(msg); + } + + // Expecting return from 4 callbacks + Latch latch(4); + consumer.getConsumerStatsAsync(boost::bind(callbackFunction, _1, _2, 5, latch, 0)); + + // Now we have 10 messages per partition + for (int i = numOfMessages; i<(numOfMessages*2); i++) { + std::string messageContent = prefix + boost::lexical_cast(i); + Message msg = MessageBuilder().build(); + producer.send(msg); + } + + // Expecting cached result + consumer.getConsumerStatsAsync(boost::bind(callbackFunction, _1, _2, 5, latch, 0)); + + // Expecting fresh results since the partition index is different + consumer.getConsumerStatsAsync(boost::bind(callbackFunction, _1, _2, 10, latch, 2)); + + Message msg; + while (consumer.receive(msg)) { + // Do nothing + } + + // Expecting the backlog to be the same since we didn't acknowledge the messages + consumer.getConsumerStatsAsync(boost::bind(callbackFunction, _1, _2, 10, latch, 3)); + + // Wait for ten seconds only + ASSERT_TRUE(latch.wait(milliseconds(10 * 1000))); +} \ No newline at end of file