diff --git a/include/pulsar/BatchReceivePolicy.h b/include/pulsar/BatchReceivePolicy.h new file mode 100644 index 00000000..3c66da2f --- /dev/null +++ b/include/pulsar/BatchReceivePolicy.h @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef BATCH_RECEIVE_POLICY_HPP_ +#define BATCH_RECEIVE_POLICY_HPP_ + +#include +#include + +namespace pulsar { + +struct BatchReceivePolicyImpl; + +/** + * Configuration for message batch receive {@link Consumer#batchReceive()} {@link + * Consumer#batchReceiveAsync()}. + * + *

Batch receive policy can limit the number and bytes of messages in a single batch, and can specify a + * timeout for waiting for enough messages for this batch. + * + *

A batch receive action is completed as long as any one of the + * conditions (the batch has enough number or size of messages, or the waiting timeout is passed) are met. + * + *

Examples: + * 1.If set maxNumMessages = 10, maxSizeOfMessages = 1MB and without timeout, it + * means {@link Consumer#batchReceive()} will always wait until there is enough messages. + * 2.If set maxNumberOfMessages = 0, maxNumBytes = 0 and timeout = 100ms, it + * means {@link Consumer#batchReceive()} will wait for 100ms no matter whether there are enough messages. + * + *

Note: + * Must specify messages limitation(maxNumMessages, maxNumBytes) or wait timeout. + * Otherwise, {@link Messages} ingest {@link Message} will never end. + * + * @since 2.4.1 + */ +class PULSAR_PUBLIC BatchReceivePolicy { + public: + /** + * Default value: {maxNumMessage: -1, maxNumBytes: 10 * 1024 * 1024, timeoutMs: 100} + */ + BatchReceivePolicy(); + + /** + * + * @param maxNumMessage Max num message, if less than 0, it means no limit. + * @param maxNumBytes Max num bytes, if less than 0, it means no limit. + * @param timeoutMs If less than 0, it means no limit. + */ + BatchReceivePolicy(int maxNumMessage, long maxNumBytes, long timeoutMs); + + /** + * Get max time out ms. + * + * @return + */ + long getTimeoutMs() const; + + /** + * Get the maximum number of messages. + * @return + */ + int getMaxNumMessages() const; + + /** + * Get max num bytes. + * @return + */ + long getMaxNumBytes() const; + + private: + std::shared_ptr impl_; +}; +} // namespace pulsar + +#endif /* BATCH_RECEIVE_POLICY_HPP_ */ diff --git a/include/pulsar/Consumer.h b/include/pulsar/Consumer.h index 6c0ab27b..c7911b98 100644 --- a/include/pulsar/Consumer.h +++ b/include/pulsar/Consumer.h @@ -113,6 +113,31 @@ class PULSAR_PUBLIC Consumer { */ void receiveAsync(ReceiveCallback callback); + /** + * Batch receiving messages. + * + *

This calls blocks until has enough messages or wait timeout, more details to see {@link + * BatchReceivePolicy}. + * + * @param msgs a non-const reference where the received messages will be copied + * @return ResultOk when a message is received + * @return ResultInvalidConfiguration if a message listener had been set in the configuration + */ + Result batchReceive(Messages& msgs); + + /** + * Async Batch receiving messages. + *

+ * Retrieves a message when it will be available and completes callback with received message. + *

+ *

+ * batchReceiveAsync() should be called subsequently once callback gets completed with received message. + * Else it creates backlog of receive requests in the application. + *

+ * @param BatchReceiveCallback will be completed when messages are available. + */ + void batchReceiveAsync(BatchReceiveCallback callback); + /** * Acknowledge the reception of a single message. * diff --git a/include/pulsar/ConsumerConfiguration.h b/include/pulsar/ConsumerConfiguration.h index 4347c3b2..13d5cc02 100644 --- a/include/pulsar/ConsumerConfiguration.h +++ b/include/pulsar/ConsumerConfiguration.h @@ -31,6 +31,7 @@ #include #include #include +#include "BatchReceivePolicy.h" namespace pulsar { @@ -38,8 +39,10 @@ class Consumer; class PulsarWrapper; /// Callback definition for non-data operation +typedef std::vector Messages; typedef std::function ResultCallback; typedef std::function ReceiveCallback; +typedef std::function BatchReceiveCallback; typedef std::function GetLastMessageIdCallback; /// Callback definition for MessageListener @@ -378,6 +381,21 @@ class PULSAR_PUBLIC ConsumerConfiguration { */ InitialPosition getSubscriptionInitialPosition() const; + /** + * Set batch receive policy. + * + * @param batchReceivePolicy the default is + * {maxNumMessage: -1, maxNumBytes: 10 * 1024 * 1024, timeoutMs: 100} + */ + void setBatchReceivePolicy(const BatchReceivePolicy& batchReceivePolicy); + + /** + * Get batch receive policy. + * + * @return batch receive policy + */ + const BatchReceivePolicy& getBatchReceivePolicy() const; + /** * Set whether the subscription status should be replicated. * The default value is `false`. diff --git a/lib/BatchReceivePolicy.cc b/lib/BatchReceivePolicy.cc new file mode 100644 index 00000000..08aa3687 --- /dev/null +++ b/lib/BatchReceivePolicy.cc @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include "BatchReceivePolicyImpl.h" +#include "LogUtils.h" + +using namespace pulsar; + +namespace pulsar { + +DECLARE_LOG_OBJECT() + +BatchReceivePolicy::BatchReceivePolicy() : BatchReceivePolicy(-1, 10 * 1024 * 1024, 100) {} + +BatchReceivePolicy::BatchReceivePolicy(int maxNumMessage, long maxNumBytes, long timeoutMs) + : impl_(std::make_shared()) { + if (maxNumMessage <= 0 && maxNumBytes <= 0 && timeoutMs <= 0) { + throw std::invalid_argument( + "At least one of maxNumMessages, maxNumBytes and timeoutMs must be specified."); + } + if (maxNumMessage <= 0 && maxNumBytes <= 0) { + impl_->maxNumMessage = -1; + impl_->maxNumBytes = 10 * 1024 * 1024; + LOG_WARN( + "BatchReceivePolicy maxNumMessages and maxNumBytes is less than 0. Reset to default: " + "maxNumMessage(-1), maxNumBytes(10 * 1024 * 10)"); + } else { + impl_->maxNumMessage = maxNumMessage; + impl_->maxNumBytes = maxNumBytes; + } + impl_->timeoutMs = timeoutMs; +} + +long BatchReceivePolicy::getTimeoutMs() const { return impl_->timeoutMs; } + +int BatchReceivePolicy::getMaxNumMessages() const { return impl_->maxNumMessage; } + +long BatchReceivePolicy::getMaxNumBytes() const { return impl_->maxNumBytes; } + +} // namespace pulsar diff --git a/lib/BatchReceivePolicyImpl.h b/lib/BatchReceivePolicyImpl.h new file mode 100644 index 00000000..e7ba4317 --- /dev/null +++ b/lib/BatchReceivePolicyImpl.h @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +namespace pulsar { + +struct BatchReceivePolicyImpl { + int maxNumMessage; + long maxNumBytes; + long timeoutMs; +}; + +} // namespace pulsar diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index 466461ae..e8e77082 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -28,14 +28,12 @@ #include #include #include "ProducerImplBase.h" -#include "ConsumerImplBase.h" #include #include #include "ServiceNameResolver.h" namespace pulsar { -class ClientImpl; class PulsarFriend; typedef std::shared_ptr ClientImplPtr; typedef std::weak_ptr ClientImplWeakPtr; @@ -44,6 +42,9 @@ class ReaderImpl; typedef std::shared_ptr ReaderImplPtr; typedef std::weak_ptr ReaderImplWeakPtr; +class ConsumerImplBase; +typedef std::weak_ptr ConsumerImplBaseWeakPtr; + std::string generateRandomName(); class ClientImpl : public std::enable_shared_from_this { diff --git a/lib/Consumer.cc b/lib/Consumer.cc index 5d163629..13fb9f4a 100644 --- a/lib/Consumer.cc +++ b/lib/Consumer.cc @@ -82,6 +82,24 @@ void Consumer::receiveAsync(ReceiveCallback callback) { impl_->receiveAsync(callback); } +Result Consumer::batchReceive(Messages& msgs) { + if (!impl_) { + return ResultConsumerNotInitialized; + } + Promise promise; + impl_->batchReceiveAsync(WaitForCallbackValue(promise)); + return promise.getFuture().get(msgs); +} + +void Consumer::batchReceiveAsync(BatchReceiveCallback callback) { + if (!impl_) { + Messages msgs; + callback(ResultConsumerNotInitialized, msgs); + return; + } + impl_->batchReceiveAsync(callback); +} + Result Consumer::acknowledge(const Message& message) { return acknowledge(message.getMessageId()); } Result Consumer::acknowledge(const MessageId& messageId) { diff --git a/lib/ConsumerConfiguration.cc b/lib/ConsumerConfiguration.cc index f9fe499b..0705cca3 100644 --- a/lib/ConsumerConfiguration.cc +++ b/lib/ConsumerConfiguration.cc @@ -19,6 +19,7 @@ #include #include +#include namespace pulsar { @@ -267,4 +268,12 @@ ConsumerConfiguration& ConsumerConfiguration::setStartMessageIdInclusive(bool st bool ConsumerConfiguration::isStartMessageIdInclusive() const { return impl_->startMessageIdInclusive; } +void ConsumerConfiguration::setBatchReceivePolicy(const BatchReceivePolicy& batchReceivePolicy) { + impl_->batchReceivePolicy = batchReceivePolicy; +} + +const BatchReceivePolicy& ConsumerConfiguration::getBatchReceivePolicy() const { + return impl_->batchReceivePolicy; +} + } // namespace pulsar diff --git a/lib/ConsumerConfigurationImpl.h b/lib/ConsumerConfigurationImpl.h index cca83a38..444fedf9 100644 --- a/lib/ConsumerConfigurationImpl.h +++ b/lib/ConsumerConfigurationImpl.h @@ -45,6 +45,7 @@ struct ConsumerConfigurationImpl { ConsumerCryptoFailureAction cryptoFailureAction{ConsumerCryptoFailureAction::FAIL}; bool readCompacted{false}; InitialPosition subscriptionInitialPosition{InitialPosition::InitialPositionLatest}; + BatchReceivePolicy batchReceivePolicy{}; int patternAutoDiscoveryPeriod{60}; bool replicateSubscriptionStateEnabled{false}; std::map properties; diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 37fcd952..54e346f0 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -18,6 +18,7 @@ */ #include "ConsumerImpl.h" #include "MessageImpl.h" +#include "MessagesImpl.h" #include "Commands.h" #include "LogUtils.h" #include "TimeUtils.h" @@ -43,7 +44,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, bool hasParent /* = false by default */, const ConsumerTopicType consumerTopicType /* = NonPartitioned by default */, Commands::SubscriptionMode subscriptionMode, Optional startMessageId) - : HandlerBase(client, topic, Backoff(milliseconds(100), seconds(60), milliseconds(0))), + : ConsumerImplBase(client, topic, Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf, + listenerExecutor ? listenerExecutor : client->getListenerExecutorProvider()->get()), waitingForZeroQueueSizeMessage(false), config_(conf), subscription_(subscriptionName), @@ -85,13 +87,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled()); } - // Initialize listener executor. - if (listenerExecutor) { - listenerExecutor_ = listenerExecutor; - } else { - listenerExecutor_ = client->getListenerExecutorProvider()->get(); - } - // Setup stats reporter. unsigned int statsIntervalInSeconds = client->getClientConfig().getStatsIntervalInSeconds(); if (statsIntervalInSeconds) { @@ -145,12 +140,12 @@ const std::string& ConsumerImpl::getTopic() const { return topic_; } void ConsumerImpl::start() { HandlerBase::start(); - // Initialize ackGroupingTrackerPtr_ here because the shared_from_this() was not initialized until the + // Initialize ackGroupingTrackerPtr_ here because the get_shared_this_ptr() was not initialized until the // constructor completed. if (TopicName::get(topic_)->isPersistent()) { if (config_.getAckGroupingTimeMs() > 0) { ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled( - client_.lock(), shared_from_this(), consumerId_, config_.getAckGroupingTimeMs(), + client_.lock(), get_shared_this_ptr(), consumerId_, config_.getAckGroupingTimeMs(), config_.getAckGroupingMaxSize())); } else { ackGroupingTrackerPtr_.reset(new AckGroupingTrackerDisabled(*this, consumerId_)); @@ -169,7 +164,7 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) { // Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after // sending the subscribe request. - cnx->registerConsumer(consumerId_, shared_from_this()); + cnx->registerConsumer(consumerId_, get_shared_this_ptr()); if (duringSeek_) { ackGroupingTrackerPtr_->flushAndClean(); @@ -195,13 +190,13 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) { config_.getSchema(), getInitialPosition(), config_.isReplicateSubscriptionStateEnabled(), config_.getKeySharedPolicy(), config_.getPriorityLevel()); cnx->sendRequestWithId(cmd, requestId) - .addListener( - std::bind(&ConsumerImpl::handleCreateConsumer, shared_from_this(), cnx, std::placeholders::_1)); + .addListener(std::bind(&ConsumerImpl::handleCreateConsumer, get_shared_this_ptr(), cnx, + std::placeholders::_1)); } void ConsumerImpl::connectionFailed(Result result) { // Keep a reference to ensure object is kept alive - ConsumerImplPtr ptr = shared_from_this(); + auto ptr = get_shared_this_ptr(); if (consumerCreatedPromise_.setFailed(result)) { state_ = Failed; @@ -244,7 +239,7 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r sendFlowPermitsToBroker(cnx, 1); } } - consumerCreatedPromise_.setValue(shared_from_this()); + consumerCreatedPromise_.setValue(get_shared_this_ptr()); } else { if (result == ResultTimeout) { // Creating the consumer has timed out. We need to ensure the broker closes the consumer @@ -257,12 +252,12 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r if (consumerCreatedPromise_.isComplete()) { // Consumer had already been initially created, we need to retry connecting in any case LOG_WARN(getName() << "Failed to reconnect consumer: " << strResult(result)); - scheduleReconnection(shared_from_this()); + scheduleReconnection(get_shared_this_ptr()); } else { // Consumer was not yet created, retry to connect to broker if it's possible if (isRetriableError(result) && (creationTimestamp_ + operationTimeut_ < TimeUtils::now())) { LOG_WARN(getName() << "Temporary error in creating consumer : " << strResult(result)); - scheduleReconnection(shared_from_this()); + scheduleReconnection(get_shared_this_ptr()); } else { LOG_ERROR(getName() << "Failed to create consumer: " << strResult(result)); consumerCreatedPromise_.setFailed(result); @@ -292,7 +287,7 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback callback) { int requestId = client->newRequestId(); SharedBuffer cmd = Commands::newUnsubscribe(consumerId_, requestId); cnx->sendRequestWithId(cmd, requestId) - .addListener(std::bind(&ConsumerImpl::handleUnsubscribe, shared_from_this(), + .addListener(std::bind(&ConsumerImpl::handleUnsubscribe, get_shared_this_ptr(), std::placeholders::_1, callback)); } else { Result result = ResultNotConnected; @@ -460,34 +455,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: << startMessageId.value()); return; } - - Lock lock(pendingReceiveMutex_); - // if asyncReceive is waiting then notify callback without adding to incomingMessages queue - bool asyncReceivedWaiting = !pendingReceives_.empty(); - ReceiveCallback callback; - if (asyncReceivedWaiting) { - callback = pendingReceives_.front(); - pendingReceives_.pop(); - } - lock.unlock(); - - if (asyncReceivedWaiting) { - listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback, - shared_from_this(), ResultOk, m, callback)); - return; - } - - // config_.getReceiverQueueSize() != 0 or waiting For ZeroQueueSize Message` - if (config_.getReceiverQueueSize() != 0 || - (config_.getReceiverQueueSize() == 0 && messageListener_)) { - incomingMessages_.push(m); - } else { - Lock lock(mutex_); - if (waitingForZeroQueueSizeMessage) { - lock.unlock(); - incomingMessages_.push(m); - } - } + executeNotifyCallback(m); } if (messageListener_) { @@ -496,7 +464,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: } // Trigger message listener callback in a separate thread while (numOfMessageReceived--) { - listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, shared_from_this())); + listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, get_shared_this_ptr())); } } } @@ -504,16 +472,16 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: void ConsumerImpl::activeConsumerChanged(bool isActive) { if (eventListener_) { listenerExecutor_->postWork( - std::bind(&ConsumerImpl::internalConsumerChangeListener, shared_from_this(), isActive)); + std::bind(&ConsumerImpl::internalConsumerChangeListener, get_shared_this_ptr(), isActive)); } } void ConsumerImpl::internalConsumerChangeListener(bool isActive) { try { if (isActive) { - eventListener_->becameActive(Consumer(shared_from_this()), partitionIndex_); + eventListener_->becameActive(Consumer(get_shared_this_ptr()), partitionIndex_); } else { - eventListener_->becameInactive(Consumer(shared_from_this()), partitionIndex_); + eventListener_->becameInactive(Consumer(get_shared_this_ptr()), partitionIndex_); } } catch (const std::exception& e) { LOG_ERROR(getName() << "Exception thrown from event listener " << e.what()); @@ -527,11 +495,56 @@ void ConsumerImpl::failPendingReceiveCallback() { ReceiveCallback callback = pendingReceives_.front(); pendingReceives_.pop(); listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback, - shared_from_this(), ResultAlreadyClosed, msg, callback)); + get_shared_this_ptr(), ResultAlreadyClosed, msg, callback)); } lock.unlock(); } +void ConsumerImpl::executeNotifyCallback(Message& msg) { + Lock lock(pendingReceiveMutex_); + // if asyncReceive is waiting then notify callback without adding to incomingMessages queue + bool asyncReceivedWaiting = !pendingReceives_.empty(); + ReceiveCallback callback; + if (asyncReceivedWaiting) { + callback = pendingReceives_.front(); + pendingReceives_.pop(); + } + lock.unlock(); + + // has pending receive, direct callback. + if (asyncReceivedWaiting) { + listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback, + get_shared_this_ptr(), ResultOk, msg, callback)); + return; + } + + // try to add incoming messages. + // config_.getReceiverQueueSize() != 0 or waiting For ZeroQueueSize Message` + if (messageListener_ || config_.getReceiverQueueSize() != 0 || waitingForZeroQueueSizeMessage) { + incomingMessages_.push(msg); + incomingMessagesSize_.fetch_add(msg.getLength()); + } + + // try trigger pending batch messages + Lock batchOptionLock(batchReceiveOptionMutex_); + if (hasEnoughMessagesForBatchReceive()) { + ConsumerImplBase::notifyBatchPendingReceivedCallback(); + } +} + +void ConsumerImpl::notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) { + auto messages = std::make_shared(batchReceivePolicy_.getMaxNumMessages(), + batchReceivePolicy_.getMaxNumBytes()); + Message peekMsg; + while (incomingMessages_.pop(peekMsg, std::chrono::milliseconds(0)) && messages->canAdd(peekMsg)) { + messageProcessed(peekMsg); + messages->add(peekMsg); + } + auto self = get_shared_this_ptr(); + listenerExecutor_->postWork( + [callback, messages, self]() { callback(ResultOk, messages->getMessageList()); }); +} + void ConsumerImpl::notifyPendingReceivedCallback(Result result, Message& msg, const ReceiveCallback& callback) { if (result == ResultOk && config_.getReceiverQueueSize() != 0) { @@ -573,19 +586,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection } } - // - Lock lock(pendingReceiveMutex_); - if (!pendingReceives_.empty()) { - ReceiveCallback callback = pendingReceives_.front(); - pendingReceives_.pop(); - lock.unlock(); - listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback, - shared_from_this(), ResultOk, msg, callback)); - } else { - // Regular path, append individual message to incoming messages queue - incomingMessages_.push(msg); - lock.unlock(); - } + executeNotifyCallback(msg); } if (skippedMessages > 0) { @@ -698,7 +699,7 @@ void ConsumerImpl::internalListener() { try { consumerStatsBasePtr_->receivedMessage(msg, ResultOk); lastDequedMessageId_ = msg.getMessageId(); - messageListener_(Consumer(shared_from_this()), msg); + messageListener_(Consumer(get_shared_this_ptr()), msg); } catch (const std::exception& e) { LOG_ERROR(getName() << "Exception thrown from listener" << e.what()); } @@ -721,9 +722,7 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) { getName() << "The incoming message queue should never be greater than 0 when Queue size is 0"); incomingMessages_.clear(); } - Lock localLock(mutex_); waitingForZeroQueueSizeMessage = true; - localLock.unlock(); sendFlowPermitsToBroker(currentCnx, 1); @@ -745,7 +744,6 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) { } } } - return ResultOk; } Result ConsumerImpl::receive(Message& msg) { @@ -837,6 +835,8 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) { lastDequedMessageId_ = msg.getMessageId(); lock.unlock(); + incomingMessagesSize_.fetch_sub(msg.getLength()); + ClientConnectionPtr currentCnx = getCnx().lock(); if (currentCnx && msg.impl_->cnx_ != currentCnx.get()) { LOG_DEBUG(getName() << "Not adding permit since connection is different."); @@ -934,7 +934,7 @@ void ConsumerImpl::statsCallback(Result res, ResultCallback callback, proto::Com } void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) { - ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, shared_from_this(), std::placeholders::_1, + ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, get_shared_this_ptr(), std::placeholders::_1, callback, proto::CommandAck_AckType_Individual); if (msgId.batchIndex() != -1 && !batchAcknowledgementTracker_.isBatchReady(msgId, proto::CommandAck_AckType_Individual)) { @@ -945,7 +945,7 @@ void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callb } void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) { - ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, shared_from_this(), std::placeholders::_1, + ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, get_shared_this_ptr(), std::placeholders::_1, callback, proto::CommandAck_AckType_Cumulative); if (!isCumulativeAcknowledgementAllowed(config_.getConsumerType())) { cb(ResultCumulativeAcknowledgementNotAllowedError); @@ -993,12 +993,12 @@ void ConsumerImpl::disconnectConsumer() { Lock lock(mutex_); connection_.reset(); lock.unlock(); - scheduleReconnection(shared_from_this()); + scheduleReconnection(get_shared_this_ptr()); } void ConsumerImpl::closeAsync(ResultCallback callback) { // Keep a reference to ensure object is kept alive - ConsumerImplPtr ptr = shared_from_this(); + ConsumerImplPtr ptr = get_shared_this_ptr(); if (state_ != Ready) { if (callback) { @@ -1041,12 +1041,16 @@ void ConsumerImpl::closeAsync(ResultCallback callback) { cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId); if (callback) { // Pass the shared pointer "ptr" to the handler to prevent the object from being destroyed - future.addListener( - std::bind(&ConsumerImpl::handleClose, shared_from_this(), std::placeholders::_1, callback, ptr)); + future.addListener(std::bind(&ConsumerImpl::handleClose, get_shared_this_ptr(), std::placeholders::_1, + callback, ptr)); } // fail pendingReceive callback failPendingReceiveCallback(); + failPendingBatchReceiveCallback(); + + // cancel timer + batchReceiveTimer_->cancel(); } void ConsumerImpl::handleClose(Result result, ResultCallback callback, ConsumerImplPtr consumer) { @@ -1102,7 +1106,7 @@ Result ConsumerImpl::resumeMessageListener() { for (size_t i = 0; i < count; i++) { // Trigger message listener callback in a separate thread - listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, shared_from_this())); + listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, get_shared_this_ptr())); } // Check current permits and determine whether to send FLOW command this->increaseAvailablePermits(getCnx().lock(), 0); @@ -1167,7 +1171,7 @@ void ConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callb << ", requestId - " << requestId); cnx->newConsumerStats(consumerId_, requestId) - .addListener(std::bind(&ConsumerImpl::brokerConsumerStatsListener, shared_from_this(), + .addListener(std::bind(&ConsumerImpl::brokerConsumerStatsListener, get_shared_this_ptr(), std::placeholders::_1, std::placeholders::_2, callback)); return; } else { @@ -1303,7 +1307,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, Time LOG_DEBUG(getName() << " Sending getLastMessageId Command for Consumer - " << getConsumerId() << ", requestId - " << requestId); - auto self = shared_from_this(); + auto self = get_shared_this_ptr(); cnx->newGetLastMessageId(consumerId_, requestId) .addListener([this, self, callback](Result result, const GetLastMessageIdResponse& response) { if (result == ResultOk) { @@ -1384,7 +1388,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const Me LOG_INFO(getName() << " Seeking subscription to " << seekId); } - std::weak_ptr weakSelf{shared_from_this()}; + std::weak_ptr weakSelf{get_shared_this_ptr()}; cnx->sendRequestWithId(seek, requestId) .addListener([this, weakSelf, callback, originalSeekMessageId](Result result, @@ -1419,4 +1423,18 @@ bool ConsumerImpl::isPriorEntryIndex(int64_t idx) { : idx <= startMessageId_.get().value().entryId(); } +bool ConsumerImpl::hasEnoughMessagesForBatchReceive() const { + if (batchReceivePolicy_.getMaxNumMessages() <= 0 && batchReceivePolicy_.getMaxNumBytes() <= 0) { + return false; + } + return (batchReceivePolicy_.getMaxNumMessages() > 0 && + incomingMessages_.size() >= batchReceivePolicy_.getMaxNumMessages()) || + (batchReceivePolicy_.getMaxNumBytes() > 0 && + incomingMessagesSize_ >= batchReceivePolicy_.getMaxNumBytes()); +} + +std::shared_ptr ConsumerImpl::get_shared_this_ptr() { + return std::dynamic_pointer_cast(shared_from_this()); +} + } /* namespace pulsar */ diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 1ad3a4c3..09d2c5cf 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -65,9 +65,7 @@ enum ConsumerTopicType Partitioned }; -class ConsumerImpl : public ConsumerImplBase, - public HandlerBase, - public std::enable_shared_from_this { +class ConsumerImpl : public ConsumerImplBase { public: ConsumerImpl(const ClientImplPtr client, const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration&, bool isPersistent, @@ -147,7 +145,10 @@ class ConsumerImpl : public ConsumerImplBase, // overrided methods from HandlerBase void connectionOpened(const ClientConnectionPtr& cnx) override; void connectionFailed(Result result) override; - HandlerBaseWeakPtr get_weak_from_this() override { return shared_from_this(); } + + // impl methods from ConsumerImpl base + bool hasEnoughMessagesForBatchReceive() const override; + void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) override; void handleCreateConsumer(const ClientConnectionPtr& cnx, Result result); @@ -159,7 +160,8 @@ class ConsumerImpl : public ConsumerImplBase, ConsumerStatsBasePtr consumerStatsBasePtr_; private: - bool waitingForZeroQueueSizeMessage; + std::atomic_bool waitingForZeroQueueSizeMessage; + std::shared_ptr get_shared_this_ptr(); bool uncompressMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::MessageIdData& messageIdData, const proto::MessageMetadata& metadata, SharedBuffer& payload, bool checkMaxMessageSize); @@ -180,6 +182,7 @@ class ConsumerImpl : public ConsumerImplBase, Result receiveHelper(Message& msg); Result receiveHelper(Message& msg, int timeout); void statsCallback(Result, ResultCallback, proto::CommandAck_AckType); + void executeNotifyCallback(Message& msg); void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback); void failPendingReceiveCallback(); void setNegativeAcknowledgeEnabledForTesting(bool enabled) override; @@ -199,13 +202,13 @@ class ConsumerImpl : public ConsumerImplBase, const bool isPersistent_; MessageListener messageListener_; ConsumerEventListenerPtr eventListener_; - ExecutorServicePtr listenerExecutor_; bool hasParent_; ConsumerTopicType consumerTopicType_; const Commands::SubscriptionMode subscriptionMode_; UnboundedBlockingQueue incomingMessages_; + std::atomic_int incomingMessagesSize_ = {0}; std::queue pendingReceives_; std::atomic_int availablePermits_; const int receiverQueueRefillThreshold_; diff --git a/lib/ConsumerImplBase.cc b/lib/ConsumerImplBase.cc new file mode 100644 index 00000000..4a8c0276 --- /dev/null +++ b/lib/ConsumerImplBase.cc @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "ConsumerImpl.h" +#include "MessageImpl.h" +#include "MessagesImpl.h" +#include "LogUtils.h" +#include "TimeUtils.h" +#include "pulsar/Result.h" +#include "MessageIdUtil.h" +#include "AckGroupingTracker.h" +#include "ConsumerImplBase.h" + +#include + +DECLARE_LOG_OBJECT() + +namespace pulsar { + +ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topic, Backoff backoff, + const ConsumerConfiguration& conf, ExecutorServicePtr listenerExecutor) + : HandlerBase(client, topic, backoff), + listenerExecutor_(listenerExecutor), + batchReceivePolicy_(conf.getBatchReceivePolicy()) { + auto userBatchReceivePolicy = conf.getBatchReceivePolicy(); + if (userBatchReceivePolicy.getMaxNumMessages() > conf.getReceiverQueueSize()) { + batchReceivePolicy_ = + BatchReceivePolicy(conf.getReceiverQueueSize(), userBatchReceivePolicy.getMaxNumBytes(), + userBatchReceivePolicy.getTimeoutMs()); + LOG_WARN("BatchReceivePolicy maxNumMessages: {" << userBatchReceivePolicy.getMaxNumMessages() + << "} is greater than maxReceiverQueueSize: {" + << conf.getReceiverQueueSize() + << "}, reset to " + "maxReceiverQueueSize. "); + } + batchReceiveTimer_ = listenerExecutor_->createDeadlineTimer(); +} + +void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) { + if (timeoutMs > 0) { + batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs)); + std::weak_ptr weakSelf{shared_from_this()}; + batchReceiveTimer_->async_wait([weakSelf](const boost::system::error_code& ec) { + auto self = weakSelf.lock(); + if (self && !ec) { + self->doBatchReceiveTimeTask(); + } + }); + } +} + +void ConsumerImplBase::doBatchReceiveTimeTask() { + if (state_ != Ready) { + return; + } + + bool hasPendingReceives = false; + long timeToWaitMs; + + Lock lock(batchPendingReceiveMutex_); + while (!batchPendingReceives_.empty()) { + OpBatchReceive& batchReceive = batchPendingReceives_.front(); + long diff = + batchReceivePolicy_.getTimeoutMs() - (TimeUtils::currentTimeMillis() - batchReceive.createAt_); + if (diff <= 0) { + Lock batchOptionLock(batchReceiveOptionMutex_); + notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_); + batchOptionLock.unlock(); + batchPendingReceives_.pop(); + } else { + hasPendingReceives = true; + timeToWaitMs = diff; + break; + } + } + lock.unlock(); + + if (hasPendingReceives) { + triggerBatchReceiveTimerTask(timeToWaitMs); + } +} + +void ConsumerImplBase::failPendingBatchReceiveCallback() { + Lock lock(batchPendingReceiveMutex_); + while (!batchPendingReceives_.empty()) { + OpBatchReceive opBatchReceive = batchPendingReceives_.front(); + batchPendingReceives_.pop(); + listenerExecutor_->postWork( + [opBatchReceive]() { opBatchReceive.batchReceiveCallback_(ResultAlreadyClosed, {}); }); + } +} + +void ConsumerImplBase::notifyBatchPendingReceivedCallback() { + Lock lock(batchPendingReceiveMutex_); + if (!batchPendingReceives_.empty()) { + OpBatchReceive& batchReceive = batchPendingReceives_.front(); + batchPendingReceives_.pop(); + lock.unlock(); + notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_); + } +} + +void ConsumerImplBase::batchReceiveAsync(BatchReceiveCallback callback) { + // fail the callback if consumer is closing or closed + if (state_ != Ready) { + callback(ResultAlreadyClosed, Messages()); + return; + } + + Lock batchOptionLock(batchReceiveOptionMutex_); + if (hasEnoughMessagesForBatchReceive()) { + notifyBatchPendingReceivedCallback(callback); + batchOptionLock.unlock(); + } else { + OpBatchReceive opBatchReceive(callback); + Lock lock(batchPendingReceiveMutex_); + batchPendingReceives_.emplace(opBatchReceive); + lock.unlock(); + triggerBatchReceiveTimerTask(batchReceivePolicy_.getTimeoutMs()); + } +} + +OpBatchReceive::OpBatchReceive(const BatchReceiveCallback& batchReceiveCallback) + : batchReceiveCallback_(batchReceiveCallback), createAt_(TimeUtils::currentTimeMillis()) {} + +} /* namespace pulsar */ diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h index 693d4da9..18b8bc1c 100644 --- a/lib/ConsumerImplBase.h +++ b/lib/ConsumerImplBase.h @@ -20,23 +20,38 @@ #define PULSAR_CONSUMER_IMPL_BASE_HEADER #include #include - +#include "HandlerBase.h" +#include #include namespace pulsar { class ConsumerImplBase; +class HandlerBase; typedef std::weak_ptr ConsumerImplBaseWeakPtr; -class ConsumerImplBase { +class OpBatchReceive { public: - virtual ~ConsumerImplBase() {} + OpBatchReceive(); + explicit OpBatchReceive(const BatchReceiveCallback& batchReceiveCallback); + const BatchReceiveCallback batchReceiveCallback_; + const int64_t createAt_; +}; + +class ConsumerImplBase : public HandlerBase, public std::enable_shared_from_this { + public: + virtual ~ConsumerImplBase(){}; + ConsumerImplBase(ClientImplPtr client, const std::string& topic, Backoff backoff, + const ConsumerConfiguration& conf, ExecutorServicePtr listenerExecutor); + + // interface by consumer virtual Future getConsumerCreatedFuture() = 0; - virtual const std::string& getSubscriptionName() const = 0; virtual const std::string& getTopic() const = 0; + virtual const std::string& getSubscriptionName() const = 0; virtual Result receive(Message& msg) = 0; virtual Result receive(Message& msg, int timeout) = 0; virtual void receiveAsync(ReceiveCallback& callback) = 0; + void batchReceiveAsync(BatchReceiveCallback callback); virtual void unsubscribeAsync(ResultCallback callback) = 0; virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback callback) = 0; virtual void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) = 0; @@ -49,7 +64,6 @@ class ConsumerImplBase { virtual Result resumeMessageListener() = 0; virtual void redeliverUnacknowledgedMessages() = 0; virtual void redeliverUnacknowledgedMessages(const std::set& messageIds) = 0; - virtual const std::string& getName() const = 0; virtual int getNumOfPrefetchedMessages() const = 0; virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0; virtual void seekAsync(const MessageId& msgId, ResultCallback callback) = 0; @@ -57,6 +71,27 @@ class ConsumerImplBase { virtual void negativeAcknowledge(const MessageId& msgId) = 0; virtual bool isConnected() const = 0; virtual uint64_t getNumberOfConnectedConsumer() = 0; + // overrided methods from HandlerBase + virtual const std::string& getName() const override = 0; + + protected: + // overrided methods from HandlerBase + void connectionOpened(const ClientConnectionPtr& cnx) override {} + void connectionFailed(Result result) override {} + HandlerBaseWeakPtr get_weak_from_this() override { return shared_from_this(); } + + // consumer impl generic method. + ExecutorServicePtr listenerExecutor_; + std::queue batchPendingReceives_; + BatchReceivePolicy batchReceivePolicy_; + DeadlineTimerPtr batchReceiveTimer_; + std::mutex batchReceiveOptionMutex_; + void triggerBatchReceiveTimerTask(long timeoutMs); + void doBatchReceiveTimeTask(); + void failPendingBatchReceiveCallback(); + void notifyBatchPendingReceivedCallback(); + virtual void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) = 0; + virtual bool hasEnoughMessagesForBatchReceive() const = 0; private: virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled) = 0; diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h index 1184746d..6fc3603d 100644 --- a/lib/HandlerBase.h +++ b/lib/HandlerBase.h @@ -90,6 +90,7 @@ class HandlerBase { ExecutorServicePtr executor_; mutable std::mutex mutex_; std::mutex pendingReceiveMutex_; + std::mutex batchPendingReceiveMutex_; ptime creationTimestamp_; const TimeDuration operationTimeut_; diff --git a/lib/MessagesImpl.cc b/lib/MessagesImpl.cc new file mode 100644 index 00000000..7d45cddc --- /dev/null +++ b/lib/MessagesImpl.cc @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "MessagesImpl.h" +#include "stdexcept" + +MessagesImpl::MessagesImpl(int maxNumberOfMessages, long maxSizeOfMessages) + : maxNumberOfMessages_(maxNumberOfMessages), + maxSizeOfMessages_(maxSizeOfMessages), + currentSizeOfMessages_(0) {} + +const std::vector& MessagesImpl::getMessageList() const { return messageList_; } + +bool MessagesImpl::canAdd(const Message& message) const { + if (messageList_.size() == 0) { + return true; + } + + if (maxNumberOfMessages_ > 0 && messageList_.size() + 1 > maxNumberOfMessages_) { + return false; + } + + if (maxSizeOfMessages_ > 0 && currentSizeOfMessages_ + message.getLength() > maxSizeOfMessages_) { + return false; + } + + return true; +} + +void MessagesImpl::add(const Message& message) { + if (!canAdd(message)) { + throw std::invalid_argument("No more space to add messages."); + } + currentSizeOfMessages_ += message.getLength(); + messageList_.emplace_back(message); +} + +int MessagesImpl::size() const { return messageList_.size(); } + +void MessagesImpl::clear() { + currentSizeOfMessages_ = 0; + messageList_.clear(); +} diff --git a/lib/MessagesImpl.h b/lib/MessagesImpl.h new file mode 100644 index 00000000..0c12768f --- /dev/null +++ b/lib/MessagesImpl.h @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef PULSAR_CPP_MESSAGESIMPL_H +#define PULSAR_CPP_MESSAGESIMPL_H + +#include +#include + +using namespace pulsar; + +namespace pulsar { + +class MessagesImpl { + public: + MessagesImpl(const int maxNumberOfMessages, const long maxSizeOfMessages); + const std::vector& getMessageList() const; + bool canAdd(const Message& message) const; + void add(const Message& message); + int size() const; + void clear(); + + private: + std::vector messageList_; + const int maxNumberOfMessages_; + const long maxSizeOfMessages_; + long currentSizeOfMessages_; +}; + +} // namespace pulsar +#endif // PULSAR_CPP_MESSAGESIMPL_H diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index 0d730e15..573c33d9 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -18,6 +18,7 @@ */ #include "MultiTopicsConsumerImpl.h" #include "MultiResultCallback.h" +#include "MessagesImpl.h" DECLARE_LOG_OBJECT() @@ -27,12 +28,13 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std const std::string& subscriptionName, TopicNamePtr topicName, const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr) - : client_(client), + : ConsumerImplBase(client, topicName ? topicName->toString() : "EmptyTopics", + Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf, + client->getListenerExecutorProvider()->get()), + client_(client), subscriptionName_(subscriptionName), - topic_(topicName ? topicName->toString() : "EmptyTopics"), conf_(conf), - messages_(conf.getReceiverQueueSize()), - listenerExecutor_(client->getListenerExecutorProvider()->get()), + incomingMessages_(conf.getReceiverQueueSize()), messageListener_(conf.getMessageListener()), lookupServicePtr_(lookupServicePtr), numberTopicPartitions_(std::make_shared>(0)), @@ -59,14 +61,16 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval); lookupServicePtr_ = client_->getLookup(); } + + state_ = Pending; } void MultiTopicsConsumerImpl::start() { if (topics_.empty()) { - MultiTopicsConsumerState state = Pending; + State state = Pending; if (state_.compare_exchange_strong(state, Ready)) { LOG_DEBUG("No topics passed in when create MultiTopicsConsumer."); - multiTopicsConsumerCreatedPromise_.setValue(shared_from_this()); + multiTopicsConsumerCreatedPromise_.setValue(get_shared_this_ptr()); return; } else { LOG_ERROR("Consumer " << consumerStr_ << " in wrong state: " << state_); @@ -81,7 +85,7 @@ void MultiTopicsConsumerImpl::start() { // subscribe for each passed in topic for (std::vector::const_iterator itr = topics_.begin(); itr != topics_.end(); itr++) { subscribeOneTopicAsync(*itr).addListener(std::bind(&MultiTopicsConsumerImpl::handleOneTopicSubscribed, - shared_from_this(), std::placeholders::_1, + get_shared_this_ptr(), std::placeholders::_1, std::placeholders::_2, *itr, topicsNeedCreate)); } } @@ -100,10 +104,10 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c } if (--(*topicsNeedCreate) == 0) { - MultiTopicsConsumerState state = Pending; + State state = Pending; if (state_.compare_exchange_strong(state, Ready)) { LOG_INFO("Successfully Subscribed to Topics"); - multiTopicsConsumerCreatedPromise_.setValue(shared_from_this()); + multiTopicsConsumerCreatedPromise_.setValue(get_shared_this_ptr()); } else { LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result); // unsubscribed all of the successfully subscribed partitioned consumers @@ -162,7 +166,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN ConsumerConfiguration config = conf_.clone(); ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get(); - config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, shared_from_this(), + config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, get_shared_this_ptr(), std::placeholders::_1, std::placeholders::_2)); int partitions = numPartitions == 0 ? 1 : numPartitions; @@ -186,8 +190,8 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN topicName->isPersistent(), internalListenerExecutor, true, NonPartitioned); consumer->getConsumerCreatedFuture().addListener(std::bind( - &MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(), std::placeholders::_1, - std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise)); + &MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(), + std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise)); consumers_.emplace(topicName->toString(), consumer); LOG_DEBUG("Creating Consumer for - " << topicName << " - " << consumerStr_); consumer->start(); @@ -199,7 +203,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN topicName->isPersistent(), internalListenerExecutor, true, Partitioned); consumer->getConsumerCreatedFuture().addListener(std::bind( - &MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(), + &MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(), std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise)); consumer->setPartitionIndex(i); consumers_.emplace(topicPartitionName, consumer); @@ -236,7 +240,7 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated( if (partitionsUpdateTimer_) { runPartitionUpdateTask(); } - topicSubResultPromise->setValue(Consumer(shared_from_this())); + topicSubResultPromise->setValue(Consumer(get_shared_this_ptr())); } } @@ -252,7 +256,7 @@ void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback callback) { state_ = Closing; std::shared_ptr> consumerUnsubed = std::make_shared>(0); - auto self = shared_from_this(); + auto self = get_shared_this_ptr(); int numConsumers = 0; consumers_.forEachValue( [&numConsumers, &consumerUnsubed, &self, callback](const ConsumerImplPtr& consumer) { @@ -329,7 +333,7 @@ void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic, } optConsumer.value()->unsubscribeAsync( - std::bind(&MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync, shared_from_this(), + std::bind(&MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync, get_shared_this_ptr(), std::placeholders::_1, consumerUnsubed, numberPartitions, topicName, topicPartitionName, callback)); } @@ -385,7 +389,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { state_ = Closing; - std::weak_ptr weakSelf{shared_from_this()}; + std::weak_ptr weakSelf{get_shared_this_ptr()}; int numConsumers = 0; consumers_.clear( [this, weakSelf, &numConsumers, callback](const std::string& name, const ConsumerImplPtr& consumer) { @@ -414,7 +418,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { } // closed all consumers if (numConsumersLeft == 0) { - messages_.clear(); + incomingMessages_.clear(); topicsPartitions_.clear(); unAckedMessageTrackerPtr_->clear(); @@ -440,6 +444,10 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { // fail pending receive failPendingReceiveCallback(); + failPendingBatchReceiveCallback(); + + // cancel timer + batchReceiveTimer_->cancel(); } void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& msg) { @@ -454,25 +462,39 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& pendingReceives_.pop(); lock.unlock(); listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback, - shared_from_this(), ResultOk, msg, callback)); - } else { - if (messages_.full()) { - lock.unlock(); - } + get_shared_this_ptr(), ResultOk, msg, callback)); + return; + } - if (messages_.push(msg) && messageListener_) { - listenerExecutor_->postWork( - std::bind(&MultiTopicsConsumerImpl::internalListener, shared_from_this(), consumer)); - } + if (incomingMessages_.full()) { + lock.unlock(); + } + + // add message to block queue. + // when messages queue is full, will block listener thread on ConsumerImpl, + // then will not send permits to broker, will broker stop push message. + incomingMessages_.push(msg); + incomingMessagesSize_.fetch_add(msg.getLength()); + + // try trigger pending batch messages + Lock batchOptionLock(batchReceiveOptionMutex_); + if (hasEnoughMessagesForBatchReceive()) { + ConsumerImplBase::notifyBatchPendingReceivedCallback(); + } + batchOptionLock.unlock(); + + if (messageListener_) { + listenerExecutor_->postWork( + std::bind(&MultiTopicsConsumerImpl::internalListener, get_shared_this_ptr(), consumer)); } } void MultiTopicsConsumerImpl::internalListener(Consumer consumer) { Message m; - messages_.pop(m); - unAckedMessageTrackerPtr_->add(m.getMessageId()); + incomingMessages_.pop(m); try { - messageListener_(Consumer(shared_from_this()), m); + messageListener_(Consumer(get_shared_this_ptr()), m); + messageProcessed(m); } catch (const std::exception& e) { LOG_ERROR("Exception thrown from listener of Partitioned Consumer" << e.what()); } @@ -487,9 +509,9 @@ Result MultiTopicsConsumerImpl::receive(Message& msg) { LOG_ERROR("Can not receive when a listener has been set"); return ResultInvalidConfiguration; } - messages_.pop(msg); + incomingMessages_.pop(msg); + messageProcessed(msg); - unAckedMessageTrackerPtr_->add(msg.getMessageId()); return ResultOk; } @@ -503,8 +525,8 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) { return ResultInvalidConfiguration; } - if (messages_.pop(msg, std::chrono::milliseconds(timeout))) { - unAckedMessageTrackerPtr_->add(msg.getMessageId()); + if (incomingMessages_.pop(msg, std::chrono::milliseconds(timeout))) { + messageProcessed(msg); return ResultOk; } else { if (state_ != Ready) { @@ -524,9 +546,9 @@ void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) { } Lock lock(pendingReceiveMutex_); - if (messages_.pop(msg, std::chrono::milliseconds(0))) { + if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) { lock.unlock(); - unAckedMessageTrackerPtr_->add(msg.getMessageId()); + messageProcessed(msg); callback(ResultOk, msg); } else { pendingReceives_.push(callback); @@ -536,14 +558,14 @@ void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) { void MultiTopicsConsumerImpl::failPendingReceiveCallback() { Message msg; - messages_.close(); + incomingMessages_.close(); Lock lock(pendingReceiveMutex_); while (!pendingReceives_.empty()) { ReceiveCallback callback = pendingReceives_.front(); pendingReceives_.pop(); listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback, - shared_from_this(), ResultAlreadyClosed, msg, callback)); + get_shared_this_ptr(), ResultAlreadyClosed, msg, callback)); } lock.unlock(); } @@ -649,7 +671,7 @@ void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::set(numberTopicPartitions_->load()); lock.unlock(); - auto self = shared_from_this(); + auto self = get_shared_this_ptr(); size_t i = 0; consumers_.forEachValue([&self, &latchPtr, &statsPtr, &i, callback](const ConsumerImplPtr& consumer) { size_t index = i++; @@ -750,7 +772,7 @@ uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() { } void MultiTopicsConsumerImpl::runPartitionUpdateTask() { partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_); - std::weak_ptr weakSelf{shared_from_this()}; + std::weak_ptr weakSelf{get_shared_this_ptr()}; partitionsUpdateTimer_->async_wait([weakSelf](const boost::system::error_code& ec) { // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it // cannot continue at this time, and the request needs to be ignored. @@ -769,7 +791,7 @@ void MultiTopicsConsumerImpl::topicPartitionUpdate() { auto topicName = TopicName::get(item.first); auto currentNumPartitions = item.second; lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( - std::bind(&MultiTopicsConsumerImpl::handleGetPartitions, shared_from_this(), topicName, + std::bind(&MultiTopicsConsumerImpl::handleGetPartitions, get_shared_this_ptr(), topicName, std::placeholders::_1, std::placeholders::_2, currentNumPartitions)); } } @@ -810,7 +832,7 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer( std::shared_ptr> partitionsNeedCreate) { ConsumerConfiguration config = conf_.clone(); ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get(); - config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, shared_from_this(), + config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, get_shared_this_ptr(), std::placeholders::_1, std::placeholders::_2)); // Apply total limit of receiver queue size across partitions @@ -824,7 +846,7 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer( topicName->isPersistent(), internalListenerExecutor, true, Partitioned); consumer->getConsumerCreatedFuture().addListener( - std::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(), + std::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(), std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise)); consumer->setPartitionIndex(partitionIndex); consumer->start(); @@ -832,3 +854,35 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer( LOG_INFO("Add Creating Consumer for - " << topicPartitionName << " - " << consumerStr_ << " consumerSize: " << consumers_.size()); } + +bool MultiTopicsConsumerImpl::hasEnoughMessagesForBatchReceive() const { + if (batchReceivePolicy_.getMaxNumMessages() <= 0 && batchReceivePolicy_.getMaxNumBytes() <= 0) { + return false; + } + return (batchReceivePolicy_.getMaxNumMessages() > 0 && + incomingMessages_.size() >= batchReceivePolicy_.getMaxNumMessages()) || + (batchReceivePolicy_.getMaxNumBytes() > 0 && + incomingMessagesSize_ >= batchReceivePolicy_.getMaxNumBytes()); +} + +void MultiTopicsConsumerImpl::notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) { + auto messages = std::make_shared(batchReceivePolicy_.getMaxNumMessages(), + batchReceivePolicy_.getMaxNumBytes()); + Message peekMsg; + while (incomingMessages_.pop(peekMsg, std::chrono::milliseconds(0)) && messages->canAdd(peekMsg)) { + messageProcessed(peekMsg); + messages->add(peekMsg); + } + auto self = get_shared_this_ptr(); + listenerExecutor_->postWork( + [callback, messages, self]() { callback(ResultOk, messages->getMessageList()); }); +} + +void MultiTopicsConsumerImpl::messageProcessed(Message& msg) { + incomingMessagesSize_.fetch_sub(msg.getLength()); + unAckedMessageTrackerPtr_->add(msg.getMessageId()); +} + +std::shared_ptr MultiTopicsConsumerImpl::get_shared_this_ptr() { + return std::dynamic_pointer_cast(shared_from_this()); +} diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h index 8769d59b..044f4173 100644 --- a/lib/MultiTopicsConsumerImpl.h +++ b/lib/MultiTopicsConsumerImpl.h @@ -38,17 +38,8 @@ namespace pulsar { typedef std::shared_ptr> ConsumerSubResultPromisePtr; class MultiTopicsConsumerImpl; -class MultiTopicsConsumerImpl : public ConsumerImplBase, - public std::enable_shared_from_this { +class MultiTopicsConsumerImpl : public ConsumerImplBase { public: - enum MultiTopicsConsumerState - { - Pending, - Ready, - Closing, - Closed, - Failed - }; MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector& topics, const std::string& subscriptionName, TopicNamePtr topicName, const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr_); @@ -99,16 +90,14 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, const ClientImplPtr client_; const std::string subscriptionName_; std::string consumerStr_; - std::string topic_; const ConsumerConfiguration conf_; typedef SynchronizedHashMap ConsumerMap; ConsumerMap consumers_; std::map topicsPartitions_; mutable std::mutex mutex_; std::mutex pendingReceiveMutex_; - std::atomic state_{Pending}; - BlockingQueue messages_; - const ExecutorServicePtr listenerExecutor_; + BlockingQueue incomingMessages_; + std::atomic_int incomingMessagesSize_ = {0}; MessageListener messageListener_; DeadlineTimerPtr partitionsUpdateTimer_; boost::posix_time::time_duration partitionsUpdateInterval_; @@ -125,6 +114,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, unsigned int partitionIndex); void notifyResult(CloseCallback closeCallback); void messageReceived(Consumer consumer, const Message& msg); + void messageProcessed(Message& msg); void internalListener(Consumer consumer); void receiveMessages(); void failPendingReceiveCallback(); @@ -149,8 +139,12 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, void subscribeSingleNewConsumer(int numPartitions, TopicNamePtr topicName, int partitionIndex, ConsumerSubResultPromisePtr topicSubResultPromise, std::shared_ptr> partitionsNeedCreate); + // impl consumer base virtual method + bool hasEnoughMessagesForBatchReceive() const override; + void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) override; private: + std::shared_ptr get_shared_this_ptr(); void setNegativeAcknowledgeEnabledForTesting(bool enabled) override; FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery); diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc index 54a07bbc..d3e424e6 100644 --- a/tests/BasicEndToEndTest.cc +++ b/tests/BasicEndToEndTest.cc @@ -4098,3 +4098,184 @@ TEST(BasicEndToEndTest, testUnAckedMessageTrackerEnabledCumulativeAck) { consumer.close(); client.close(); } + +void testBatchReceive(bool multiConsumer) { + ClientConfiguration config; + Client client(lookupUrl); + + std::string uniqueChunk = unique_str(); + std::string topicName = "persistent://public/default/test-batch-receive" + uniqueChunk; + + if (multiConsumer) { + // call admin api to make it partitioned + std::string url = + adminUrl + "admin/v2/persistent/public/default/test-batch-receive" + uniqueChunk + "/partitions"; + int res = makePutRequest(url, "5"); + LOG_INFO("res = " << res); + ASSERT_FALSE(res != 204 && res != 409); + } + + std::string subName = "subscription-name"; + Producer producer; + + Promise producerPromise; + client.createProducerAsync(topicName, WaitForCallbackValue(producerPromise)); + Future producerFuture = producerPromise.getFuture(); + Result result = producerFuture.get(producer); + ASSERT_EQ(ResultOk, result); + + Consumer consumer; + ConsumerConfiguration consumerConfig; + // when receiver queue size > maxNumMessages, use receiver queue size. + consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, -1)); + consumerConfig.setReceiverQueueSize(10); + consumerConfig.setProperty("consumer-name", "test-consumer-name"); + consumerConfig.setProperty("consumer-id", "test-consumer-id"); + Promise consumerPromise; + client.subscribeAsync(topicName, subName, consumerConfig, + WaitForCallbackValue(consumerPromise)); + Future consumerFuture = consumerPromise.getFuture(); + result = consumerFuture.get(consumer); + ASSERT_EQ(ResultOk, result); + + // sync batch receive test + std::string prefix = "batch-receive-msg"; + int numOfMessages = 10; + for (int i = 0; i < numOfMessages; i++) { + std::string messageContent = prefix + std::to_string(i); + Message msg = MessageBuilder().setContent(messageContent).build(); + producer.send(msg); + } + + Messages messages; + Result receive = consumer.batchReceive(messages); + ASSERT_EQ(receive, ResultOk); + ASSERT_EQ(messages.size(), numOfMessages); + + // async batch receive test + Latch latch(1); + BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result result, Messages messages) { + ASSERT_EQ(result, ResultOk); + ASSERT_EQ(messages.size(), numOfMessages); + latch.countdown(); + }; + consumer.batchReceiveAsync(batchReceiveCallback); + for (int i = 0; i < numOfMessages; i++) { + std::string messageContent = prefix + std::to_string(i); + Message msg = MessageBuilder().setContent(messageContent).build(); + producer.send(msg); + } + ASSERT_TRUE(latch.wait(std::chrono::seconds(10))); + + producer.close(); + consumer.close(); + client.close(); +} + +TEST(BasicEndToEndTest, testBatchReceive) { testBatchReceive(false); } + +TEST(BasicEndToEndTest, testBatchReceiveWithMultiConsumer) { testBatchReceive(true); } + +void testBatchReceiveTimeout(bool multiConsumer) { + ClientConfiguration config; + Client client(lookupUrl); + std::string uniqueChunk = unique_str(); + std::string topicName = "persistent://public/default/test-batch-receive-timeout" + uniqueChunk; + + if (multiConsumer) { + // call admin api to make it partitioned + std::string url = adminUrl + "admin/v2/persistent/public/default/test-batch-receive-timeout" + + uniqueChunk + "/partitions"; + int res = makePutRequest(url, "5"); + LOG_INFO("res = " << res); + ASSERT_FALSE(res != 204 && res != 409); + } + + std::string subName = "subscription-name"; + Producer producer; + + Promise producerPromise; + client.createProducerAsync(topicName, WaitForCallbackValue(producerPromise)); + Future producerFuture = producerPromise.getFuture(); + Result result = producerFuture.get(producer); + ASSERT_EQ(ResultOk, result); + + Consumer consumer; + ConsumerConfiguration consumerConfig; + consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, 1000)); + Promise consumerPromise; + client.subscribeAsync(topicName, subName, consumerConfig, + WaitForCallbackValue(consumerPromise)); + Future consumerFuture = consumerPromise.getFuture(); + result = consumerFuture.get(consumer); + ASSERT_EQ(ResultOk, result); + + std::string prefix = "batch-receive-msg"; + int numOfMessages = 10; + + for (int i = 0; i < numOfMessages; i++) { + std::string messageContent = prefix + std::to_string(i); + Message msg = MessageBuilder().setContent(messageContent).build(); + producer.send(msg); + } + + Latch latch(1); + BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result result, Messages messages) { + ASSERT_EQ(result, ResultOk); + ASSERT_EQ(messages.size(), numOfMessages); + latch.countdown(); + }; + consumer.batchReceiveAsync(batchReceiveCallback); + ASSERT_TRUE(latch.wait(std::chrono::seconds(10))); + + producer.close(); + consumer.close(); + client.close(); +} + +TEST(BasicEndToEndTest, testBatchReceiveTimeout) { testBatchReceiveTimeout(false); } + +TEST(BasicEndToEndTest, testBatchReceiveTimeoutWithMultiConsumer) { testBatchReceiveTimeout(true); } + +void testBatchReceiveClose(bool multiConsumer) { + ClientConfiguration config; + Client client(lookupUrl); + + std::string uniqueChunk = unique_str(); + std::string topicName = "persistent://public/default/test-batch-receive-close" + uniqueChunk; + + if (multiConsumer) { + // call admin api to make it partitioned + std::string url = adminUrl + "admin/v2/persistent/public/default/test-batch-receive-close" + + uniqueChunk + "/partitions"; + int res = makePutRequest(url, "5"); + LOG_INFO("res = " << res); + ASSERT_FALSE(res != 204 && res != 409); + } + + std::string subName = "subscription-name"; + Consumer consumer; + ConsumerConfiguration consumerConfig; + consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, 1000)); + Promise consumerPromise; + client.subscribeAsync(topicName, subName, consumerConfig, + WaitForCallbackValue(consumerPromise)); + Future consumerFuture = consumerPromise.getFuture(); + Result result = consumerFuture.get(consumer); + ASSERT_EQ(ResultOk, result); + + Latch latch(1); + BatchReceiveCallback batchReceiveCallback = [&latch](Result result, Messages messages) { + ASSERT_EQ(result, ResultAlreadyClosed); + latch.countdown(); + }; + consumer.batchReceiveAsync(batchReceiveCallback); + consumer.close(); + client.close(); + + ASSERT_TRUE(latch.wait(std::chrono::seconds(10))); +} + +TEST(BasicEndToEndTest, testBatchReceiveClose) { testBatchReceiveClose(false); } + +TEST(BasicEndToEndTest, testBatchReceiveCloseWithMultiConsumer) { testBatchReceiveClose(true); } diff --git a/tests/BatchReceivePolicyTest.cc b/tests/BatchReceivePolicyTest.cc new file mode 100644 index 00000000..ab9ffcc0 --- /dev/null +++ b/tests/BatchReceivePolicyTest.cc @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +using namespace pulsar; + +TEST(BatchReceivePolicyTest, testBatchReceivePolicy) { + ASSERT_THROW(BatchReceivePolicy(-1, -1, -1), std::invalid_argument); + + { + BatchReceivePolicy batchReceivePolicy; + ASSERT_EQ(batchReceivePolicy.getMaxNumMessages(), -1); + ASSERT_EQ(batchReceivePolicy.getMaxNumBytes(), 10 * 1024 * 1024); + ASSERT_EQ(batchReceivePolicy.getTimeoutMs(), 100); + } + + { + BatchReceivePolicy batchReceivePolicy(-1, -1, 123); + ASSERT_EQ(batchReceivePolicy.getMaxNumMessages(), -1); + ASSERT_EQ(batchReceivePolicy.getMaxNumBytes(), 10 * 1024 * 1024); + ASSERT_EQ(batchReceivePolicy.getTimeoutMs(), 123); + } +} diff --git a/tests/ConsumerConfigurationTest.cc b/tests/ConsumerConfigurationTest.cc index 24f541b5..20cd8f4b 100644 --- a/tests/ConsumerConfigurationTest.cc +++ b/tests/ConsumerConfigurationTest.cc @@ -61,6 +61,9 @@ TEST(ConsumerConfigurationTest, testDefaultConfig) { ASSERT_EQ(conf.getPriorityLevel(), 0); ASSERT_EQ(conf.getMaxPendingChunkedMessage(), 10); ASSERT_EQ(conf.isAutoAckOldestChunkedMessageOnQueueFull(), false); + ASSERT_EQ(conf.getBatchReceivePolicy().getMaxNumMessages(), -1); + ASSERT_EQ(conf.getBatchReceivePolicy().getMaxNumBytes(), 10 * 1024 * 1024); + ASSERT_EQ(conf.getBatchReceivePolicy().getTimeoutMs(), 100); } TEST(ConsumerConfigurationTest, testCustomConfig) { @@ -151,6 +154,11 @@ TEST(ConsumerConfigurationTest, testCustomConfig) { conf.setAutoAckOldestChunkedMessageOnQueueFull(true); ASSERT_TRUE(conf.isAutoAckOldestChunkedMessageOnQueueFull()); + + conf.setBatchReceivePolicy(BatchReceivePolicy(10, 10, 100)); + ASSERT_EQ(conf.getBatchReceivePolicy().getMaxNumMessages(), 10); + ASSERT_EQ(conf.getBatchReceivePolicy().getMaxNumBytes(), 10); + ASSERT_EQ(conf.getBatchReceivePolicy().getTimeoutMs(), 100); } TEST(ConsumerConfigurationTest, testReadCompactPersistentExclusive) { diff --git a/tests/MessagesImplTest.cc b/tests/MessagesImplTest.cc new file mode 100644 index 00000000..e963501a --- /dev/null +++ b/tests/MessagesImplTest.cc @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include +#include "pulsar/MessageBuilder.h" + +using namespace pulsar; + +TEST(MessagesImplTest, testMessage) { + // 0. test not limits + { + MessagesImpl messages(-1, -1); + ASSERT_TRUE(messages.canAdd(Message())); + } + + // 1. test max number of messages. + { + Message msg = MessageBuilder().setContent("c").build(); + MessagesImpl messages(10, -1); + for (int i = 0; i < 10; i++) { + messages.add(msg); + } + ASSERT_FALSE(messages.canAdd(msg)); + ASSERT_EQ(messages.size(), 10); + ASSERT_THROW(messages.add(msg), std::invalid_argument); + messages.clear(); + ASSERT_TRUE(messages.canAdd(msg)); + ASSERT_EQ(messages.size(), 0); + } + + // 2. test max size of messages. + { + Message msg = MessageBuilder().setContent("c").build(); + MessagesImpl messages(-1, 10); + for (int i = 0; i < 10; i++) { + messages.add(msg); + } + ASSERT_FALSE(messages.canAdd(msg)); + ASSERT_EQ(messages.size(), 10); + ASSERT_THROW(messages.add(msg), std::invalid_argument); + messages.clear(); + ASSERT_TRUE(messages.canAdd(msg)); + ASSERT_EQ(messages.size(), 0); + } +}