From 18dcdd55afa4694933d293cda4035365dcb720a4 Mon Sep 17 00:00:00 2001 From: A Date: Wed, 7 Dec 2022 10:13:35 +0700 Subject: [PATCH 1/5] Use boost::optional instead self-written Optional class (#138) Fixes #134 ### Motivation Switch to use boost::optional instead self-written Optional class ### Modifications Remove class Optional in Utils.h and change all usage to boost::optional --- lib/ClientConnection.cc | 5 ++-- lib/ClientConnection.h | 4 ++-- lib/Commands.cc | 8 +++---- lib/Commands.h | 22 ++++++++---------- lib/ConsumerImpl.cc | 40 ++++++++++++++++---------------- lib/ConsumerImpl.h | 17 +++++++------- lib/MultiTopicsConsumerImpl.cc | 15 ++++++------ lib/ProducerConfiguration.cc | 8 +++---- lib/ProducerConfigurationImpl.h | 7 +++--- lib/ProducerImpl.h | 4 ++-- lib/ReaderImpl.cc | 7 +++--- lib/SynchronizedHashMap.h | 17 +++++++------- lib/Utils.h | 32 ------------------------- tests/SynchronizedHashMapTest.cc | 18 +++++++------- 14 files changed, 84 insertions(+), 120 deletions(-) diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 5c144672..51a09f45 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -20,6 +20,7 @@ #include +#include #include #include "Commands.h" @@ -1093,9 +1094,9 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) { data.schemaVersion = producerSuccess.schema_version(); } if (producerSuccess.has_topic_epoch()) { - data.topicEpoch = Optional::of(producerSuccess.topic_epoch()); + data.topicEpoch = boost::make_optional(producerSuccess.topic_epoch()); } else { - data.topicEpoch = Optional::empty(); + data.topicEpoch = boost::none; } requestData.promise.setValue(data); requestData.timer->cancel(); diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index a07e2cd4..62a5e9bb 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -40,7 +41,6 @@ #include "LookupDataResult.h" #include "SharedBuffer.h" #include "UtilAllocator.h" -#include "Utils.h" namespace pulsar { @@ -83,7 +83,7 @@ struct ResponseData { std::string producerName; int64_t lastSequenceId; std::string schemaVersion; - Optional topicEpoch; + boost::optional topicEpoch; }; typedef std::shared_ptr> NamespaceTopicsPtr; diff --git a/lib/Commands.cc b/lib/Commands.cc index f97b0eb8..5bb9587e 100644 --- a/lib/Commands.cc +++ b/lib/Commands.cc @@ -296,7 +296,7 @@ SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& authentication, SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& subscription, uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType subType, const std::string& consumerName, SubscriptionMode subscriptionMode, - Optional startMessageId, bool readCompacted, + boost::optional startMessageId, bool readCompacted, const std::map& metadata, const std::map& subscriptionProperties, const SchemaInfo& schemaInfo, @@ -323,7 +323,7 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& subscribe->set_allocated_schema(getSchema(schemaInfo)); } - if (startMessageId.is_present()) { + if (startMessageId) { MessageIdData& messageIdData = *subscribe->mutable_start_message_id(); messageIdData.set_ledgerid(startMessageId.value().ledgerId()); messageIdData.set_entryid(startMessageId.value().entryId()); @@ -383,7 +383,7 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId const std::map& metadata, const SchemaInfo& schemaInfo, uint64_t epoch, bool userProvidedProducerName, bool encrypted, - ProducerAccessMode accessMode, Optional topicEpoch) { + ProducerAccessMode accessMode, boost::optional topicEpoch) { BaseCommand cmd; cmd.set_type(BaseCommand::PRODUCER); CommandProducer* producer = cmd.mutable_producer(); @@ -394,7 +394,7 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId producer->set_user_provided_producer_name(userProvidedProducerName); producer->set_encrypted(encrypted); producer->set_producer_access_mode(static_cast(accessMode)); - if (topicEpoch.is_present()) { + if (topicEpoch) { producer->set_topic_epoch(topicEpoch.value()); } diff --git a/lib/Commands.h b/lib/Commands.h index 6681f138..bbe96fdb 100644 --- a/lib/Commands.h +++ b/lib/Commands.h @@ -25,11 +25,11 @@ #include #include +#include #include #include "ProtoApiEnums.h" #include "SharedBuffer.h" -#include "Utils.h" using namespace pulsar; @@ -89,16 +89,14 @@ class Commands { uint64_t sequenceId, ChecksumType checksumType, const proto::MessageMetadata& metadata, const SharedBuffer& payload); - static SharedBuffer newSubscribe(const std::string& topic, const std::string& subscription, - uint64_t consumerId, uint64_t requestId, - CommandSubscribe_SubType subType, const std::string& consumerName, - SubscriptionMode subscriptionMode, Optional startMessageId, - bool readCompacted, const std::map& metadata, - const std::map& subscriptionProperties, - const SchemaInfo& schemaInfo, - CommandSubscribe_InitialPosition subscriptionInitialPosition, - bool replicateSubscriptionState, KeySharedPolicy keySharedPolicy, - int priorityLevel = 0); + static SharedBuffer newSubscribe( + const std::string& topic, const std::string& subscription, uint64_t consumerId, uint64_t requestId, + CommandSubscribe_SubType subType, const std::string& consumerName, SubscriptionMode subscriptionMode, + boost::optional startMessageId, bool readCompacted, + const std::map& metadata, + const std::map& subscriptionProperties, const SchemaInfo& schemaInfo, + CommandSubscribe_InitialPosition subscriptionInitialPosition, bool replicateSubscriptionState, + KeySharedPolicy keySharedPolicy, int priorityLevel = 0); static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId); @@ -107,7 +105,7 @@ class Commands { const std::map& metadata, const SchemaInfo& schemaInfo, uint64_t epoch, bool userProvidedProducerName, bool encrypted, - ProducerAccessMode accessMode, Optional topicEpoch); + ProducerAccessMode accessMode, boost::optional topicEpoch); static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t entryId, CommandAck_AckType ackType, CommandAck_ValidationError validationError); diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 41b13ae2..cf01e362 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -54,7 +54,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, const ExecutorServicePtr listenerExecutor /* = NULL by default */, bool hasParent /* = false by default */, const ConsumerTopicType consumerTopicType /* = NonPartitioned by default */, - Commands::SubscriptionMode subscriptionMode, Optional startMessageId) + Commands::SubscriptionMode subscriptionMode, + boost::optional startMessageId) : ConsumerImplBase(client, topic, Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf, listenerExecutor ? listenerExecutor : client->getListenerExecutorProvider()->get()), waitingForZeroQueueSizeMessage(false), @@ -191,9 +192,8 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) { Lock lockForMessageId(mutexForMessageId_); // Update startMessageId so that we can discard messages after delivery restarts const auto startMessageId = clearReceiveQueue(); - const auto subscribeMessageId = (subscriptionMode_ == Commands::SubscriptionModeNonDurable) - ? startMessageId - : Optional::empty(); + const auto subscribeMessageId = + (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? startMessageId : boost::none; startMessageId_ = startMessageId; lockForMessageId.unlock(); @@ -373,11 +373,11 @@ void ConsumerImpl::triggerCheckExpiredChunkedTimer() { }); } -Optional ConsumerImpl::processMessageChunk(const SharedBuffer& payload, - const proto::MessageMetadata& metadata, - const MessageId& messageId, - const proto::MessageIdData& messageIdData, - const ClientConnectionPtr& cnx) { +boost::optional ConsumerImpl::processMessageChunk(const SharedBuffer& payload, + const proto::MessageMetadata& metadata, + const MessageId& messageId, + const proto::MessageIdData& messageIdData, + const ClientConnectionPtr& cnx) { const auto chunkId = metadata.chunk_id(); const auto uuid = metadata.uuid(); LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " << uuid @@ -422,14 +422,14 @@ Optional ConsumerImpl::processMessageChunk(const SharedBuffer& pay lock.unlock(); increaseAvailablePermits(cnx); trackMessage(messageId); - return Optional::empty(); + return boost::none; } chunkedMsgCtx.appendChunk(messageId, payload); if (!chunkedMsgCtx.isCompleted()) { lock.unlock(); increaseAvailablePermits(cnx); - return Optional::empty(); + return boost::none; } LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx @@ -438,9 +438,9 @@ Optional ConsumerImpl::processMessageChunk(const SharedBuffer& pay auto wholePayload = chunkedMsgCtx.getBuffer(); chunkedMessageCache_.remove(uuid); if (uncompressMessageIfNeeded(cnx, messageIdData, metadata, wholePayload, false)) { - return Optional::of(wholePayload); + return wholePayload; } else { - return Optional::empty(); + return boost::none; } } @@ -477,7 +477,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: const auto& messageIdData = msg.message_id(); auto messageId = MessageIdBuilder::from(messageIdData).build(); auto optionalPayload = processMessageChunk(payload, metadata, messageId, messageIdData, cnx); - if (optionalPayload.is_present()) { + if (optionalPayload) { payload = optionalPayload.value(); } else { return; @@ -512,7 +512,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: m.impl_->convertPayloadToKeyValue(config_.getSchema()); const auto startMessageId = startMessageId_.get(); - if (isPersistent_ && startMessageId.is_present() && + if (isPersistent_ && startMessageId && m.getMessageId().ledgerId() == startMessageId.value().ledgerId() && m.getMessageId().entryId() == startMessageId.value().entryId() && isPriorEntryIndex(m.getMessageId().entryId())) { @@ -637,7 +637,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection msg.impl_->setTopicName(batchedMessage.getTopicName()); msg.impl_->convertPayloadToKeyValue(config_.getSchema()); - if (startMessageId.is_present()) { + if (startMessageId) { const MessageId& msgId = msg.getMessageId(); // If we are receiving a batch message, we need to discard messages that were prior @@ -921,10 +921,10 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) { * was * not seen by the application */ -Optional ConsumerImpl::clearReceiveQueue() { +boost::optional ConsumerImpl::clearReceiveQueue() { bool expectedDuringSeek = true; if (duringSeek_.compare_exchange_strong(expectedDuringSeek, false)) { - return Optional::of(seekMessageId_.get()); + return seekMessageId_.get(); } else if (subscriptionMode_ == Commands::SubscriptionModeDurable) { return startMessageId_.get(); } @@ -943,12 +943,12 @@ Optional ConsumerImpl::clearReceiveQueue() { .ledgerId(nextMessageId.ledgerId()) .entryId(nextMessageId.entryId() - 1) .build(); - return Optional::of(previousMessageId); + return previousMessageId; } else if (lastDequedMessageId_ != MessageId::earliest()) { // If the queue was empty we need to restart from the message just after the last one that has been // dequeued // in the past - return Optional::of(lastDequedMessageId_); + return lastDequedMessageId_; } else { // No message was received or dequeued by this consumer. Next message would still be the // startMessageId diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index ce6c3f09..e9af3f6e 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -21,6 +21,7 @@ #include +#include #include #include @@ -71,7 +72,7 @@ class ConsumerImpl : public ConsumerImplBase { const ExecutorServicePtr listenerExecutor = ExecutorServicePtr(), bool hasParent = false, const ConsumerTopicType consumerTopicType = NonPartitioned, Commands::SubscriptionMode = Commands::SubscriptionModeDurable, - Optional startMessageId = Optional::empty()); + boost::optional startMessageId = boost::none); ~ConsumerImpl(); void setPartitionIndex(int partitionIndex); int getPartitionIndex(); @@ -193,7 +194,7 @@ class ConsumerImpl : public ConsumerImplBase { const DeadlineTimerPtr& timer, BrokerGetLastMessageIdCallback callback); - Optional clearReceiveQueue(); + boost::optional clearReceiveQueue(); void seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId& seekId, long timestamp, ResultCallback callback); @@ -236,7 +237,7 @@ class ConsumerImpl : public ConsumerImplBase { MessageId lastMessageIdInBroker_{MessageId::earliest()}; std::atomic_bool duringSeek_{false}; - Synchronized> startMessageId_{Optional::empty()}; + Synchronized> startMessageId_; Synchronized seekMessageId_{MessageId::earliest()}; class ChunkedMessageCtx { @@ -321,11 +322,11 @@ class ConsumerImpl : public ConsumerImplBase { * @return the concatenated payload if chunks are concatenated into a completed message payload * successfully, else Optional::empty() */ - Optional processMessageChunk(const SharedBuffer& payload, - const proto::MessageMetadata& metadata, - const MessageId& messageId, - const proto::MessageIdData& messageIdData, - const ClientConnectionPtr& cnx); + boost::optional processMessageChunk(const SharedBuffer& payload, + const proto::MessageMetadata& metadata, + const MessageId& messageId, + const proto::MessageIdData& messageIdData, + const ClientConnectionPtr& cnx); friend class PulsarFriend; diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index b8d55b49..d9d38732 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -373,7 +373,7 @@ void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic, for (int i = 0; i < numberPartitions; i++) { std::string topicPartitionName = topicName->getTopicPartitionName(i); auto optConsumer = consumers_.find(topicPartitionName); - if (optConsumer.is_empty()) { + if (!optConsumer) { LOG_ERROR("TopicsConsumer not subscribed on topicPartitionName: " << topicPartitionName); callback(ResultUnknownError); continue; @@ -400,7 +400,7 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync( LOG_DEBUG("Successfully Unsubscribed one Consumer. topicPartitionName - " << topicPartitionName); auto optConsumer = consumers_.remove(topicPartitionName); - if (optConsumer.is_present()) { + if (optConsumer) { optConsumer.value()->pauseMessageListener(); } @@ -638,7 +638,7 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal const std::string& topicPartitionName = msgId.getTopicName(); auto optConsumer = consumers_.find(topicPartitionName); - if (optConsumer.is_present()) { + if (optConsumer) { unAckedMessageTrackerPtr_->remove(msgId); optConsumer.value()->acknowledgeAsync(msgId, callback); } else { @@ -674,7 +674,7 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdLis }; for (const auto& kv : topicToMessageId) { auto optConsumer = consumers_.find(kv.first); - if (optConsumer.is_present()) { + if (optConsumer) { unAckedMessageTrackerPtr_->remove(kv.second); optConsumer.value()->acknowledgeAsync(kv.second, cb); } else { @@ -691,7 +691,7 @@ void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, void MultiTopicsConsumerImpl::negativeAcknowledge(const MessageId& msgId) { auto optConsumer = consumers_.find(msgId.getTopicName()); - if (optConsumer.is_present()) { + if (optConsumer) { unAckedMessageTrackerPtr_->remove(msgId); optConsumer.value()->negativeAcknowledge(msgId); } @@ -868,9 +868,8 @@ bool MultiTopicsConsumerImpl::isConnected() const { return false; } - return consumers_ - .findFirstValueIf([](const ConsumerImplPtr& consumer) { return !consumer->isConnected(); }) - .is_empty(); + return !consumers_.findFirstValueIf( + [](const ConsumerImplPtr& consumer) { return !consumer->isConnected(); }); } uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() { diff --git a/lib/ProducerConfiguration.cc b/lib/ProducerConfiguration.cc index 9b3fdfbd..67e8b102 100644 --- a/lib/ProducerConfiguration.cc +++ b/lib/ProducerConfiguration.cc @@ -36,21 +36,21 @@ ProducerConfiguration& ProducerConfiguration::operator=(const ProducerConfigurat } ProducerConfiguration& ProducerConfiguration::setProducerName(const std::string& producerName) { - impl_->producerName = Optional::of(producerName); + impl_->producerName = boost::make_optional(producerName); return *this; } const std::string& ProducerConfiguration::getProducerName() const { - return impl_->producerName.is_present() ? impl_->producerName.value() : emptyString; + return !impl_->producerName ? emptyString : impl_->producerName.value(); } ProducerConfiguration& ProducerConfiguration::setInitialSequenceId(int64_t initialSequenceId) { - impl_->initialSequenceId = Optional::of(initialSequenceId); + impl_->initialSequenceId = boost::make_optional(initialSequenceId); return *this; } int64_t ProducerConfiguration::getInitialSequenceId() const { - return impl_->initialSequenceId.is_present() ? impl_->initialSequenceId.value() : -1ll; + return !impl_->initialSequenceId ? -1ll : impl_->initialSequenceId.value(); } ProducerConfiguration& ProducerConfiguration::setSendTimeout(int sendTimeoutMs) { diff --git a/lib/ProducerConfigurationImpl.h b/lib/ProducerConfigurationImpl.h index 6c2b19da..83723f61 100644 --- a/lib/ProducerConfigurationImpl.h +++ b/lib/ProducerConfigurationImpl.h @@ -21,16 +21,15 @@ #include +#include #include -#include "Utils.h" - namespace pulsar { struct ProducerConfigurationImpl { SchemaInfo schemaInfo; - Optional producerName; - Optional initialSequenceId; + boost::optional producerName; + boost::optional initialSequenceId; int sendTimeoutMs{30000}; CompressionType compressionType{CompressionNone}; int maxPendingMessages{1000}; diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h index cbf99ec7..928fca5e 100644 --- a/lib/ProducerImpl.h +++ b/lib/ProducerImpl.h @@ -19,6 +19,7 @@ #ifndef LIB_PRODUCERIMPL_H_ #define LIB_PRODUCERIMPL_H_ +#include #include #include "Future.h" @@ -31,7 +32,6 @@ #include "PeriodicTask.h" #include "ProducerImplBase.h" #include "Semaphore.h" -#include "Utils.h" namespace pulsar { @@ -195,7 +195,7 @@ class ProducerImpl : public HandlerBase, MemoryLimitController& memoryLimitController_; const bool chunkingEnabled_; - Optional topicEpoch{Optional::empty()}; + boost::optional topicEpoch; }; struct ProducerImplCmp { diff --git a/lib/ReaderImpl.cc b/lib/ReaderImpl.cc index d1b25b58..a80e2e50 100644 --- a/lib/ReaderImpl.cc +++ b/lib/ReaderImpl.cc @@ -80,10 +80,9 @@ void ReaderImpl::start(const MessageId& startMessageId, test::consumerConfigOfReader = consumerConf.clone(); } - consumer_ = std::make_shared(client_.lock(), topic_, subscription, consumerConf, - TopicName::get(topic_)->isPersistent(), ExecutorServicePtr(), - false, NonPartitioned, Commands::SubscriptionModeNonDurable, - Optional::of(startMessageId)); + consumer_ = std::make_shared( + client_.lock(), topic_, subscription, consumerConf, TopicName::get(topic_)->isPersistent(), + ExecutorServicePtr(), false, NonPartitioned, Commands::SubscriptionModeNonDurable, startMessageId); consumer_->setPartitionIndex(TopicName::getPartitionIndex(topic_)); auto self = shared_from_this(); consumer_->getConsumerCreatedFuture().addListener( diff --git a/lib/SynchronizedHashMap.h b/lib/SynchronizedHashMap.h index b8a8c91e..a274bd87 100644 --- a/lib/SynchronizedHashMap.h +++ b/lib/SynchronizedHashMap.h @@ -18,14 +18,13 @@ */ #pragma once +#include #include #include #include #include #include -#include "Utils.h" - namespace pulsar { // V must be default constructible and copyable @@ -35,7 +34,7 @@ class SynchronizedHashMap { using Lock = std::lock_guard; public: - using OptValue = Optional; + using OptValue = boost::optional; using PairVector = std::vector>; using MapType = std::unordered_map; using Iterator = typename MapType::iterator; @@ -85,9 +84,9 @@ class SynchronizedHashMap { Lock lock(mutex_); auto it = data_.find(key); if (it != data_.end()) { - return OptValue::of(it->second); + return it->second; } else { - return OptValue::empty(); + return boost::none; } } @@ -95,21 +94,21 @@ class SynchronizedHashMap { Lock lock(mutex_); for (const auto& kv : data_) { if (f(kv.second)) { - return OptValue::of(kv.second); + return kv.second; } } - return OptValue::empty(); + return boost::none; } OptValue remove(const K& key) { Lock lock(mutex_); auto it = data_.find(key); if (it != data_.end()) { - auto result = OptValue::of(std::move(it->second)); + auto result = boost::make_optional(std::move(it->second)); data_.erase(it); return result; } else { - return OptValue::empty(); + return boost::none; } } diff --git a/lib/Utils.h b/lib/Utils.h index 016f09f2..d6303a38 100644 --- a/lib/Utils.h +++ b/lib/Utils.h @@ -69,38 +69,6 @@ inline std::ostream& operator<<(std::ostream& os, const std::map -class Optional { - public: - const T& value() const { return value_; } - - bool is_present() const { return present_; } - - bool is_empty() const { return !present_; } - - /** - * Create an Optional with the bound value - */ - static Optional of(const T& value) { return Optional(value); } - static Optional of(T&& value) { return Optional(std::move(value)); } - - /** - * Create an empty optional - */ - static Optional empty() { return Optional(); } - - Optional() : value_(), present_(false) {} - - private: - Optional(const T& value) : value_(value), present_(true) {} - Optional(T&& value) : value_(std::move(value)), present_(true) {} - - T value_; - bool present_; -}; } // namespace pulsar #endif /* UTILS_HPP_ */ diff --git a/tests/SynchronizedHashMapTest.cc b/tests/SynchronizedHashMapTest.cc index 87cbe1e3..85378e03 100644 --- a/tests/SynchronizedHashMapTest.cc +++ b/tests/SynchronizedHashMapTest.cc @@ -59,25 +59,25 @@ TEST(SynchronizedHashMap, testRemoveAndFind) { OptValue optValue; optValue = m.findFirstValueIf([](const int& x) { return x == 200; }); - ASSERT_TRUE(optValue.is_present()); + ASSERT_TRUE(optValue); ASSERT_EQ(optValue.value(), 200); optValue = m.findFirstValueIf([](const int& x) { return x >= 301; }); - ASSERT_FALSE(optValue.is_present()); + ASSERT_FALSE(optValue); optValue = m.find(1); - ASSERT_TRUE(optValue.is_present()); + ASSERT_TRUE(optValue); ASSERT_EQ(optValue.value(), 100); - ASSERT_FALSE(m.find(0).is_present()); - ASSERT_FALSE(m.remove(0).is_present()); + ASSERT_FALSE(m.find(0)); + ASSERT_FALSE(m.remove(0)); optValue = m.remove(1); - ASSERT_TRUE(optValue.is_present()); + ASSERT_TRUE(optValue); ASSERT_EQ(optValue.value(), 100); - ASSERT_FALSE(m.remove(1).is_present()); - ASSERT_FALSE(m.find(1).is_present()); + ASSERT_FALSE(m.remove(1)); + ASSERT_FALSE(m.find(1)); } TEST(SynchronizedHashMapTest, testForEach) { @@ -99,7 +99,7 @@ TEST(SynchronizedHashMap, testRecursiveMutex) { m.forEach([&m, &optValue](const int& key, const int& value) { optValue = m.find(key); // the internal mutex was locked again }); - ASSERT_TRUE(optValue.is_present()); + ASSERT_TRUE(optValue); ASSERT_EQ(optValue.value(), 100); } From f046416a575a83acb62115eef6b738da38b475e4 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Tue, 6 Dec 2022 20:16:22 +0800 Subject: [PATCH 2/5] [feat] Support Dead Letter Topic. --- include/pulsar/ConsumerConfiguration.h | 15 + include/pulsar/DeadLetterPolicy.h | 71 ++++ include/pulsar/DeadLetterPolicyBuilder.h | 81 +++++ include/pulsar/ProducerConfiguration.h | 12 + lib/Commands.cc | 6 +- lib/Commands.h | 3 +- lib/ConsumerConfiguration.cc | 6 + lib/ConsumerConfigurationImpl.h | 1 + lib/ConsumerImpl.cc | 146 ++++++++- lib/ConsumerImpl.h | 16 + lib/DeadLetterPolicyBuilder.cc | 55 ++++ lib/DeadLetterPolicyImpl.cc | 38 +++ lib/DeadLetterPolicyImpl.h | 32 ++ lib/MessageIdImpl.h | 25 ++ lib/MultiTopicsConsumerImpl.cc | 18 +- lib/MultiTopicsConsumerImpl.h | 1 + lib/NegativeAcksTracker.cc | 2 +- lib/ProducerConfiguration.cc | 12 + lib/ProducerConfigurationImpl.h | 1 + lib/ProducerImpl.cc | 9 +- tests/ConsumerConfigurationTest.cc | 15 + tests/DeadLetterPolicyTest.cc | 45 +++ tests/DeadLetterQueueTest.cc | 393 +++++++++++++++++++++++ 23 files changed, 991 insertions(+), 12 deletions(-) create mode 100644 include/pulsar/DeadLetterPolicy.h create mode 100644 include/pulsar/DeadLetterPolicyBuilder.h create mode 100644 lib/DeadLetterPolicyBuilder.cc create mode 100644 lib/DeadLetterPolicyImpl.cc create mode 100644 lib/DeadLetterPolicyImpl.h create mode 100644 tests/DeadLetterPolicyTest.cc create mode 100644 tests/DeadLetterQueueTest.cc diff --git a/include/pulsar/ConsumerConfiguration.h b/include/pulsar/ConsumerConfiguration.h index 520901c2..071e3378 100644 --- a/include/pulsar/ConsumerConfiguration.h +++ b/include/pulsar/ConsumerConfiguration.h @@ -34,6 +34,7 @@ #include #include "BatchReceivePolicy.h" +#include "DeadLetterPolicy.h" namespace pulsar { @@ -398,6 +399,20 @@ class PULSAR_PUBLIC ConsumerConfiguration { */ const BatchReceivePolicy& getBatchReceivePolicy() const; + /** + * Set dead letter policy. + * + * @param deadLetterPolicy thd default is empty + */ + void setDeadLetterPolicy(const DeadLetterPolicy& deadLetterPolicy); + + /** + * Get dead letter policy. + * + * @return dead letter policy + */ + const DeadLetterPolicy& getDeadLetterPolicy() const; + /** * Set whether the subscription status should be replicated. * The default value is `false`. diff --git a/include/pulsar/DeadLetterPolicy.h b/include/pulsar/DeadLetterPolicy.h new file mode 100644 index 00000000..42d5b654 --- /dev/null +++ b/include/pulsar/DeadLetterPolicy.h @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef DEAD_LETTER_POLICY_HPP_ +#define DEAD_LETTER_POLICY_HPP_ + +#include + +#include +#include + +namespace pulsar { + +struct DeadLetterPolicyImpl; + +/** + * Configuration for the "dead letter queue" feature in consumer. + * + * see @DeadLetterPolicyBuilder + */ +class PULSAR_PUBLIC DeadLetterPolicy { + public: + DeadLetterPolicy(); + + /** + * Get dead letter topic + * + * @return + */ + std::string getDeadLetterTopic() const; + + /** + * Get max redeliver count + * + * @return + */ + int getMaxRedeliverCount() const; + + /** + * Get initial subscription name + * + * @return + */ + std::string getInitialSubscriptionName() const; + + private: + friend class DeadLetterPolicyBuilder; + + typedef std::shared_ptr DeadLetterPolicyImplPtr; + DeadLetterPolicyImplPtr impl_; + + explicit DeadLetterPolicy(const DeadLetterPolicyImplPtr& impl); +}; +} // namespace pulsar + +#endif /* DEAD_LETTER_POLICY_HPP_ */ diff --git a/include/pulsar/DeadLetterPolicyBuilder.h b/include/pulsar/DeadLetterPolicyBuilder.h new file mode 100644 index 00000000..dd7d4d46 --- /dev/null +++ b/include/pulsar/DeadLetterPolicyBuilder.h @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef DEAD_LETTER_POLICY_BUILD_HPP_ +#define DEAD_LETTER_POLICY_BUILD_HPP_ + +#include +#include + +#include + +namespace pulsar { + +struct DeadLetterPolicyImpl; + +/** + * The builder to build a DeadLetterPolicyBuilder + * + * Example of building DeadLetterPolicy: + * + * ```c++ + * DeadLetterPolicy dlqPolicy = DeadLetterPolicyBuilder() + * .deadLetterTopic("dlq-topic") + * .maxRedeliverCount(10) + * .initialSubscriptionName("init-sub-name") + * .build(); + * ``` + */ +class PULSAR_PUBLIC DeadLetterPolicyBuilder { + public: + DeadLetterPolicyBuilder(); + + /** + * Set dead letter topic + * + * @return + */ + DeadLetterPolicyBuilder& deadLetterTopic(const std::string& deadLetterTopic); + + /** + * Set max redeliver count + * + * @return + */ + DeadLetterPolicyBuilder& maxRedeliverCount(int maxRedeliverCount); + + /** + * Set initial subscription name + * + * @return + */ + DeadLetterPolicyBuilder& initialSubscriptionName(const std::string& initialSubscriptionName); + + /** + * Build DeadLetterPolicy. + * + * @return + */ + DeadLetterPolicy build(); + + private: + std::shared_ptr impl_; +}; +} // namespace pulsar + +#endif /* DEAD_LETTER_POLICY_BUILD_HPP_ */ diff --git a/include/pulsar/ProducerConfiguration.h b/include/pulsar/ProducerConfiguration.h index 873e1383..a4fae26c 100644 --- a/include/pulsar/ProducerConfiguration.h +++ b/include/pulsar/ProducerConfiguration.h @@ -532,6 +532,18 @@ class PULSAR_PUBLIC ProducerConfiguration { */ ProducerAccessMode getAccessMode() const; + /** + * Use this configuration to automatically create an initial subscription when creating a topic. + * + * If this field is not set, the initial subscription is not created. + */ + ProducerConfiguration& setInitialSubscriptionName(const std::string& initialSubscriptionName); + + /** + * Get initial subscription name. + */ + const std::string& getInitialSubscriptionName() const; + friend class PulsarWrapper; private: diff --git a/lib/Commands.cc b/lib/Commands.cc index 5bb9587e..4766daa1 100644 --- a/lib/Commands.cc +++ b/lib/Commands.cc @@ -383,7 +383,8 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId const std::map& metadata, const SchemaInfo& schemaInfo, uint64_t epoch, bool userProvidedProducerName, bool encrypted, - ProducerAccessMode accessMode, boost::optional topicEpoch) { + ProducerAccessMode accessMode, boost::optional topicEpoch, + std::string initialSubscriptionName) { BaseCommand cmd; cmd.set_type(BaseCommand::PRODUCER); CommandProducer* producer = cmd.mutable_producer(); @@ -397,6 +398,9 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId if (topicEpoch) { producer->set_topic_epoch(topicEpoch.value()); } + if (!initialSubscriptionName.empty()) { + producer->set_initial_subscription_name(initialSubscriptionName); + } for (std::map::const_iterator it = metadata.begin(); it != metadata.end(); it++) { diff --git a/lib/Commands.h b/lib/Commands.h index bbe96fdb..fd50ca53 100644 --- a/lib/Commands.h +++ b/lib/Commands.h @@ -105,7 +105,8 @@ class Commands { const std::map& metadata, const SchemaInfo& schemaInfo, uint64_t epoch, bool userProvidedProducerName, bool encrypted, - ProducerAccessMode accessMode, boost::optional topicEpoch); + ProducerAccessMode accessMode, boost::optional topicEpoch, + std::string initialSubscriptionName); static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t entryId, CommandAck_AckType ackType, CommandAck_ValidationError validationError); diff --git a/lib/ConsumerConfiguration.cc b/lib/ConsumerConfiguration.cc index f37e042d..fba142bb 100644 --- a/lib/ConsumerConfiguration.cc +++ b/lib/ConsumerConfiguration.cc @@ -287,4 +287,10 @@ const BatchReceivePolicy& ConsumerConfiguration::getBatchReceivePolicy() const { return impl_->batchReceivePolicy; } +void ConsumerConfiguration::setDeadLetterPolicy(const DeadLetterPolicy& deadLetterPolicy) { + impl_->deadLetterPolicy = deadLetterPolicy; +} + +const DeadLetterPolicy& ConsumerConfiguration::getDeadLetterPolicy() const { return impl_->deadLetterPolicy; } + } // namespace pulsar diff --git a/lib/ConsumerConfigurationImpl.h b/lib/ConsumerConfigurationImpl.h index 259b9354..f795ed6a 100644 --- a/lib/ConsumerConfigurationImpl.h +++ b/lib/ConsumerConfigurationImpl.h @@ -46,6 +46,7 @@ struct ConsumerConfigurationImpl { bool readCompacted{false}; InitialPosition subscriptionInitialPosition{InitialPosition::InitialPositionLatest}; BatchReceivePolicy batchReceivePolicy{}; + DeadLetterPolicy deadLetterPolicy; int patternAutoDiscoveryPeriod{60}; bool replicateSubscriptionStateEnabled{false}; std::map properties; diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index cf01e362..c4aac41c 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -18,6 +18,7 @@ */ #include "ConsumerImpl.h" +#include #include #include @@ -114,6 +115,21 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, msgCrypto_ = std::make_shared(consumerStr_, false); } + // Config dlq + auto deadLetterPolicy = conf.getDeadLetterPolicy(); + if (deadLetterPolicy.getMaxRedeliverCount() > 0) { + auto deadLetterPolicyBuilder = + DeadLetterPolicyBuilder() + .maxRedeliverCount(deadLetterPolicy.getMaxRedeliverCount()) + .initialSubscriptionName(deadLetterPolicy.getInitialSubscriptionName()); + if (deadLetterPolicy.getDeadLetterTopic().empty()) { + deadLetterPolicyBuilder.deadLetterTopic(topic + "-" + subscriptionName + DLQ_GROUP_TOPIC_SUFFIX); + } else { + deadLetterPolicyBuilder.deadLetterTopic(deadLetterPolicy.getDeadLetterTopic()); + } + deadLetterPolicy_ = deadLetterPolicyBuilder.build(); + } + checkExpiredChunkedTimer_ = executor_->createDeadlineTimer(); } @@ -460,6 +476,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: return; } + auto redeliveryCount = msg.redelivery_count(); const bool isMessageUndecryptable = metadata.encryption_keys_size() > 0 && !config_.getCryptoKeyReader().get() && config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME; @@ -520,6 +537,10 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: << startMessageId.value()); return; } + if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) { + possibleSendToDeadLetterTopicMessages_.emplace(m.getMessageId(), std::vector{m}); + increaseAvailablePermits(cnx); + } executeNotifyCallback(m); } @@ -630,6 +651,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection int skippedMessages = 0; + std::vector possibleToDeadLetter; for (int i = 0; i < batchSize; i++) { // This is a cheap copy since message contains only one shared pointer (impl_) Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i, batchSize); @@ -652,9 +674,17 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection } } + if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) { + possibleToDeadLetter.emplace_back(msg); + } + executeNotifyCallback(msg); } + if (!possibleToDeadLetter.empty()) { + possibleSendToDeadLetterTopicMessages_.emplace(batchedMessage.getMessageId(), possibleToDeadLetter); + } + if (skippedMessages > 0) { increaseAvailablePermits(cnx, skippedMessages); } @@ -1188,7 +1218,28 @@ void ConsumerImpl::redeliverUnacknowledgedMessages(const std::set& me redeliverUnacknowledgedMessages(); return; } - redeliverMessages(messageIds); + + ClientConnectionPtr cnx = getCnx().lock(); + if (cnx) { + if (cnx->getServerProtocolVersion() >= proto::v2) { + auto needRedeliverMsgs = std::make_shared>(); + auto needCallBack = std::make_shared>(messageIds.size()); + auto self = get_shared_this_ptr(); + for (const auto& msgId : messageIds) { + processPossibleToDLQ(msgId, + [self, needRedeliverMsgs, &msgId, needCallBack](bool processSuccess) { + if (!processSuccess) { + needRedeliverMsgs->emplace(msgId); + } + if (--(*needCallBack) == 0 && !needRedeliverMsgs->empty()) { + self->redeliverMessages(*needRedeliverMsgs); + } + }); + } + } + } else { + LOG_WARN("Connection not ready for Consumer - " << getConsumerId()); + } } void ConsumerImpl::redeliverMessages(const std::set& messageIds) { @@ -1199,7 +1250,7 @@ void ConsumerImpl::redeliverMessages(const std::set& messageIds) { LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for Consumer - " << getConsumerId()); } } else { - LOG_DEBUG("Connection not ready for Consumer - " << getConsumerId()); + LOG_WARN("Connection not ready for Consumer - " << getConsumerId()); } } @@ -1505,4 +1556,95 @@ void ConsumerImpl::cancelTimers() noexcept { checkExpiredChunkedTimer_->cancel(ec); } +void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack cb) { + auto messages = possibleSendToDeadLetterTopicMessages_.find(messageId); + if (messages.is_empty()) { + cb(false); + return; + } + + // Initialize deadLetterProducer_ + if (!deadLetterProducer_) { + Lock createLock(createProducerLock_); + if (!deadLetterProducer_) { + deadLetterProducer_ = std::make_shared>(); + ProducerConfiguration producerConfiguration; + producerConfiguration.setBlockIfQueueFull(false); + if (!deadLetterPolicy_.getInitialSubscriptionName().empty()) { + producerConfiguration.setInitialSubscriptionName( + deadLetterPolicy_.getInitialSubscriptionName()); + } + ClientImplPtr client = client_.lock(); + if (client) { + client->createProducerAsync( + deadLetterPolicy_.getDeadLetterTopic(), producerConfiguration, + [this](Result res, Producer producer) { + if (res == ResultOk) { + deadLetterProducer_->setValue(producer); + } else { + LOG_ERROR("Dead letter producer create exception with topic: " + << deadLetterPolicy_.getDeadLetterTopic() << " ex: " << res); + deadLetterProducer_.reset(); + } + }); + } else { + LOG_WARN(getName() << "Client is destroyed and cannot create dead letter producer."); + } + } + createLock.unlock(); + } + + for (const auto& message : messages.value()) { + auto self = get_shared_this_ptr(); + deadLetterProducer_->getFuture().addListener([self, message, cb](Result res, Producer producer) { + auto originMessageId = message.getMessageId(); + std::stringstream originMessageIdStr; + originMessageIdStr << originMessageId; + MessageBuilder msgBuilder; + msgBuilder.setAllocatedContent(const_cast(message.getData()), message.getLength()) + .setProperties(message.getProperties()) + .setProperty(PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr.str()) + .setProperty(SYSTEM_PROPERTY_REAL_TOPIC, message.getTopicName()); + if (message.hasPartitionKey()) { + msgBuilder.setPartitionKey(message.getPartitionKey()); + } + if (message.hasOrderingKey()) { + msgBuilder.setOrderingKey(message.getOrderingKey()); + } + producer.sendAsync(msgBuilder.build(), [self, originMessageId, cb](Result res, + const MessageId& messageId) { + if (res == ResultOk) { + if (self->state_ != Ready) { + LOG_WARN( + "Send to the DLQ successfully, but consumer is not ready. ignore acknowledge : " + << self->state_); + cb(false); + return; + } + self->acknowledgeAsync(originMessageId, [self, originMessageId, cb](Result result) { + if (result != ResultOk) { + LOG_WARN("{" << self->topic_ << "} {" << self->subscription_ << "} {" + << self->consumerName_ << "} Failed to acknowledge the message {" + << originMessageId + << "} of the original topic but send to the DLQ successfully : " + << result); + cb(false); + } else { + LOG_DEBUG("Send msg:" << originMessageId + << "to DLQ success and acknowledge success."); + cb(true); + } + }); + } else { + LOG_WARN("{" << self->topic_ << "} {" << self->subscription_ << "} {" + << self->consumerName_ << "} Failed to send DLQ message to {" + << self->deadLetterPolicy_.getDeadLetterTopic() << "} for message id " + << "{" << originMessageId << "} : " << res); + cb(false); + } + }); + }); + } +} + } /* namespace pulsar */ diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index e9af3f6e..2ed14de1 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -31,11 +31,13 @@ #include "CompressionCodec.h" #include "ConsumerImplBase.h" #include "MapCache.h" +#include "MessageIdImpl.h" #include "NegativeAcksTracker.h" #include "Synchronized.h" #include "TestUtil.h" #include "TimeUtils.h" #include "UnboundedBlockingQueue.h" +#include "lib/SynchronizedHashMap.h" namespace pulsar { class UnAckedMessageTrackerInterface; @@ -46,6 +48,7 @@ class MessageCrypto; class GetLastMessageIdResponse; typedef std::shared_ptr MessageCryptoPtr; typedef std::shared_ptr BackoffPtr; +typedef std::function ProcessDLQCallBack; class AckGroupingTracker; using AckGroupingTrackerPtr = std::shared_ptr; @@ -65,6 +68,10 @@ enum ConsumerTopicType Partitioned }; +const static std::string SYSTEM_PROPERTY_REAL_TOPIC = "REAL_TOPIC"; +const static std::string PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID"; +const static std::string DLQ_GROUP_TOPIC_SUFFIX = "-DLQ"; + class ConsumerImpl : public ConsumerImplBase { public: ConsumerImpl(const ClientImplPtr client, const std::string& topic, const std::string& subscriptionName, @@ -197,9 +204,11 @@ class ConsumerImpl : public ConsumerImplBase { boost::optional clearReceiveQueue(); void seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId& seekId, long timestamp, ResultCallback callback); + void processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack cb); std::mutex mutexForReceiveWithZeroQueueSize; const ConsumerConfiguration config_; + DeadLetterPolicy deadLetterPolicy_; const std::string subscription_; std::string originalSubscriptionName_; const bool isPersistent_; @@ -231,6 +240,10 @@ class ConsumerImpl : public ConsumerImplBase { MessageCryptoPtr msgCrypto_; const bool readCompacted_; + SynchronizedHashMap> possibleSendToDeadLetterTopicMessages_; + std::shared_ptr> deadLetterProducer_; + std::mutex createProducerLock_; + // Make the access to `lastDequedMessageId_` and `lastMessageIdInBroker_` thread safe mutable std::mutex mutexForMessageId_; MessageId lastDequedMessageId_{MessageId::earliest()}; @@ -336,6 +349,9 @@ class ConsumerImpl : public ConsumerImplBase { FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery); FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery); FRIEND_TEST(ConsumerTest, testBatchUnAckedMessageTracker); + FRIEND_TEST(DeadLetterQueueTest, testAutoSetDLQTopicName); + FRIEND_TEST(DeadLetterQueueTest, testSendDLQTriggerByAckTimeOutAndNeAck); + FRIEND_TEST(DeadLetterQueueTest, testWithoutConsumerReceiveImmediately); }; } /* namespace pulsar */ diff --git a/lib/DeadLetterPolicyBuilder.cc b/lib/DeadLetterPolicyBuilder.cc new file mode 100644 index 00000000..58284726 --- /dev/null +++ b/lib/DeadLetterPolicyBuilder.cc @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include + +#include "DeadLetterPolicyImpl.h" +#include "stdexcept" + +using namespace pulsar; + +namespace pulsar { + +DeadLetterPolicyBuilder::DeadLetterPolicyBuilder() : impl_(std::make_shared()) {} + +DeadLetterPolicyBuilder& DeadLetterPolicyBuilder::deadLetterTopic(const std::string& deadLetterTopic) { + impl_->deadLetterTopic = deadLetterTopic; + return *this; +} + +DeadLetterPolicyBuilder& DeadLetterPolicyBuilder::maxRedeliverCount(int maxRedeliverCount) { + impl_->maxRedeliverCount = maxRedeliverCount; + return *this; +} + +DeadLetterPolicyBuilder& DeadLetterPolicyBuilder::initialSubscriptionName( + const std::string& initialSubscriptionName) { + impl_->initialSubscriptionName = initialSubscriptionName; + return *this; +} + +DeadLetterPolicy DeadLetterPolicyBuilder::build() { + if (impl_->maxRedeliverCount <= 0) { + throw std::invalid_argument("maxRedeliverCount must be > 0."); + } + return DeadLetterPolicy(impl_); +} + +} // namespace pulsar diff --git a/lib/DeadLetterPolicyImpl.cc b/lib/DeadLetterPolicyImpl.cc new file mode 100644 index 00000000..cd172671 --- /dev/null +++ b/lib/DeadLetterPolicyImpl.cc @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "DeadLetterPolicyImpl.h" + +#include + +using namespace pulsar; + +namespace pulsar { + +DeadLetterPolicy::DeadLetterPolicy() : impl_(std::make_shared()) {} + +std::string DeadLetterPolicy::getDeadLetterTopic() const { return impl_->deadLetterTopic; } + +int DeadLetterPolicy::getMaxRedeliverCount() const { return impl_->maxRedeliverCount; } + +std::string DeadLetterPolicy::getInitialSubscriptionName() const { return impl_->initialSubscriptionName; } + +DeadLetterPolicy::DeadLetterPolicy(const DeadLetterPolicy::DeadLetterPolicyImplPtr& impl) : impl_(impl) {} + +} // namespace pulsar diff --git a/lib/DeadLetterPolicyImpl.h b/lib/DeadLetterPolicyImpl.h new file mode 100644 index 00000000..7c94b6b7 --- /dev/null +++ b/lib/DeadLetterPolicyImpl.h @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include "climits" +#include "string" + +namespace pulsar { + +struct DeadLetterPolicyImpl { + std::string deadLetterTopic; + int maxRedeliverCount{INT_MAX}; + std::string initialSubscriptionName; +}; + +} // namespace pulsar diff --git a/lib/MessageIdImpl.h b/lib/MessageIdImpl.h index 57d1c4eb..68f4237f 100644 --- a/lib/MessageIdImpl.h +++ b/lib/MessageIdImpl.h @@ -19,9 +19,34 @@ #pragma once +#include #include #include +namespace std { + +template <> +struct hash { + std::size_t operator()(const pulsar::MessageId& msgId) const { + using boost::hash_combine; + using boost::hash_value; + + // Start with a hash value of 0 . + std::size_t seed = 0; + + // Modify 'seed' by XORing and bit-shifting in + // one member of 'Key' after the other: + hash_combine(seed, hash_value(msgId.ledgerId())); + hash_combine(seed, hash_value(msgId.entryId())); + hash_combine(seed, hash_value(msgId.batchIndex())); + hash_combine(seed, hash_value(msgId.partition())); + + // Return the result. + return seed; + } +}; +} // namespace std + namespace pulsar { class MessageIdImpl { diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index d9d38732..4accac32 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -770,10 +770,22 @@ void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::setredeliverUnacknowledgedMessages(messageIds); - }); + std::unordered_map> topicToMessageId; + for (const MessageId& messageId : messageIds) { + auto topicName = messageId.getTopicName(); + topicToMessageId[topicName].emplace(messageId); + } + + for (const auto& kv : topicToMessageId) { + auto optConsumer = consumers_.find(kv.first); + if (optConsumer.is_present()) { + optConsumer.value()->redeliverUnacknowledgedMessages(kv.second); + } else { + LOG_ERROR("Message of topic: " << kv.first << " not in consumers"); + } + } } int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return incomingMessages_.size(); } diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h index da42b748..a3082c8c 100644 --- a/lib/MultiTopicsConsumerImpl.h +++ b/lib/MultiTopicsConsumerImpl.h @@ -167,6 +167,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery); FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery); + FRIEND_TEST(DeadLetterQueueTest, testSendDLQTriggerByAckTimeOutAndNeAck); }; typedef std::shared_ptr MultiTopicsConsumerImplPtr; diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc index 6ff322df..9dcca20f 100644 --- a/lib/NegativeAcksTracker.cc +++ b/lib/NegativeAcksTracker.cc @@ -80,7 +80,7 @@ void NegativeAcksTracker::handleTimer(const boost::system::error_code &ec) { } if (!messagesToRedeliver.empty()) { - consumer_.redeliverMessages(messagesToRedeliver); + consumer_.redeliverUnacknowledgedMessages(messagesToRedeliver); } scheduleTimer(); } diff --git a/lib/ProducerConfiguration.cc b/lib/ProducerConfiguration.cc index 67e8b102..a7130c15 100644 --- a/lib/ProducerConfiguration.cc +++ b/lib/ProducerConfiguration.cc @@ -16,6 +16,8 @@ * specific language governing permissions and limitations * under the License. */ +#include "pulsar/ProducerConfiguration.h" + #include #include "ProducerConfigurationImpl.h" @@ -266,4 +268,14 @@ ProducerConfiguration::ProducerAccessMode ProducerConfiguration::getAccessMode() return impl_->accessMode; } +ProducerConfiguration& ProducerConfiguration::setInitialSubscriptionName( + const std::string& initialSubscriptionName) { + impl_->initialSubscriptionName = initialSubscriptionName; + return *this; +} + +const std::string& ProducerConfiguration::getInitialSubscriptionName() const { + return impl_->initialSubscriptionName; +} + } // namespace pulsar diff --git a/lib/ProducerConfigurationImpl.h b/lib/ProducerConfigurationImpl.h index 83723f61..4c8fcdb1 100644 --- a/lib/ProducerConfigurationImpl.h +++ b/lib/ProducerConfigurationImpl.h @@ -50,6 +50,7 @@ struct ProducerConfigurationImpl { std::map properties; bool chunkingEnabled{false}; ProducerConfiguration::ProducerAccessMode accessMode{ProducerConfiguration::Shared}; + std::string initialSubscriptionName; }; } // namespace pulsar diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index a3a5a95a..c1d82df7 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -148,10 +148,11 @@ void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) { ClientImplPtr client = client_.lock(); int requestId = client->newRequestId(); - SharedBuffer cmd = Commands::newProducer( - topic_, producerId_, producerName_, requestId, conf_.getProperties(), conf_.getSchema(), epoch_, - userProvidedProducerName_, conf_.isEncryptionEnabled(), - static_cast(conf_.getAccessMode()), topicEpoch); + SharedBuffer cmd = Commands::newProducer(topic_, producerId_, producerName_, requestId, + conf_.getProperties(), conf_.getSchema(), epoch_, + userProvidedProducerName_, conf_.isEncryptionEnabled(), + static_cast(conf_.getAccessMode()), + topicEpoch, conf_.getInitialSubscriptionName()); cnx->sendRequestWithId(cmd, requestId) .addListener(std::bind(&ProducerImpl::handleCreateProducer, shared_from_this(), cnx, std::placeholders::_1, std::placeholders::_2)); diff --git a/tests/ConsumerConfigurationTest.cc b/tests/ConsumerConfigurationTest.cc index fde87364..265cb194 100644 --- a/tests/ConsumerConfigurationTest.cc +++ b/tests/ConsumerConfigurationTest.cc @@ -20,12 +20,15 @@ #include #include +#include + #include "NoOpsCryptoKeyReader.h" DECLARE_LOG_OBJECT() #include "../lib/Future.h" #include "../lib/Utils.h" +#include "pulsar/DeadLetterPolicyBuilder.h" using namespace pulsar; @@ -317,3 +320,15 @@ TEST(ConsumerConfigurationTest, testResetAckTimeOut) { config.setUnAckedMessagesTimeoutMs(0); ASSERT_EQ(0, config.getUnAckedMessagesTimeoutMs()); } + +TEST(ConsumerConfigurationTest, testDeadLetterPolicy) { + ConsumerConfiguration config; + auto dlqPolicy = config.getDeadLetterPolicy(); + ASSERT_TRUE(dlqPolicy.getDeadLetterTopic().empty()); + ASSERT_EQ(dlqPolicy.getMaxRedeliverCount(), INT_MAX); + ASSERT_TRUE(dlqPolicy.getInitialSubscriptionName().empty()); + + config.setDeadLetterPolicy(DeadLetterPolicyBuilder().maxRedeliverCount(10).build()); + auto dlqPolicy2 = config.getDeadLetterPolicy(); + ASSERT_EQ(dlqPolicy2.getMaxRedeliverCount(), 10); +} diff --git a/tests/DeadLetterPolicyTest.cc b/tests/DeadLetterPolicyTest.cc new file mode 100644 index 00000000..4c412830 --- /dev/null +++ b/tests/DeadLetterPolicyTest.cc @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +#include + +using namespace pulsar; + +TEST(DeadLetterPolicy, testDeadLetterPolicy) { + // test default value. + DeadLetterPolicy deadLetterPolicy; + ASSERT_EQ(deadLetterPolicy.getMaxRedeliverCount(), INT_MAX); + ASSERT_TRUE(deadLetterPolicy.getDeadLetterTopic().empty()); + ASSERT_TRUE(deadLetterPolicy.getInitialSubscriptionName().empty()); + + // test don't allowed max redeliver count less than 0. + ASSERT_THROW(DeadLetterPolicyBuilder().maxRedeliverCount(-1).build(), std::invalid_argument); + + // test create DeadLetterPolicy by builder. + deadLetterPolicy = DeadLetterPolicyBuilder() + .maxRedeliverCount(10) + .deadLetterTopic("topic-subscription-DLQ") + .initialSubscriptionName("init-DLQ-subscription") + .build(); + ASSERT_EQ(deadLetterPolicy.getMaxRedeliverCount(), 10); + ASSERT_EQ(deadLetterPolicy.getDeadLetterTopic(), "topic-subscription-DLQ"); + ASSERT_EQ(deadLetterPolicy.getInitialSubscriptionName(), "init-DLQ-subscription"); +} diff --git a/tests/DeadLetterQueueTest.cc b/tests/DeadLetterQueueTest.cc new file mode 100644 index 00000000..2849b903 --- /dev/null +++ b/tests/DeadLetterQueueTest.cc @@ -0,0 +1,393 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +#include "HttpHelper.h" +#include "PulsarFriend.h" +#include "lib/ClientConnection.h" +#include "lib/LogUtils.h" +#include "lib/MessageIdUtil.h" +#include "lib/UnAckedMessageTrackerEnabled.h" +#include "lib/Utils.h" +#include "pulsar/DeadLetterPolicyBuilder.h" + +static const std::string lookupUrl = "pulsar://localhost:6650"; +static const std::string adminUrl = "http://localhost:8080/"; + +DECLARE_LOG_OBJECT() + +namespace pulsar { + +// Because isSchemaValidationEnforced config defaults to false. +// Therefore, in scenarios where AUTO_PUBLISH schemas are not supported now, the unit test can pass. +// When implementing AUTO_PUBLISH schema, we should set isSchemaValidationEnforced to true to revalidate. +TEST(DeadLetterQueueTest, testAutoSchema) { + Client client(lookupUrl); + const std::string topic = "testAutoSchema-" + std::to_string(time(nullptr)); + const std::string subName = "dlq-sub"; + + static const std::string jsonSchema = + R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})"; + SchemaInfo schemaInfo(AUTO_PUBLISH, "test-json", jsonSchema); + + auto dlqPolicy = DeadLetterPolicyBuilder() + .maxRedeliverCount(3) + .deadLetterTopic(topic + subName + "-DLQ") + .initialSubscriptionName("init-sub") + .build(); + ConsumerConfiguration consumerConfig; + consumerConfig.setDeadLetterPolicy(dlqPolicy); + consumerConfig.setNegativeAckRedeliveryDelayMs(100); + consumerConfig.setConsumerType(ConsumerType::ConsumerShared); + consumerConfig.setSchema(schemaInfo); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer)); + + // Initialize the DLQ subscription first and make sure that DLQ topic is created and a schema exists. + ConsumerConfiguration dlqConsumerConfig; + dlqConsumerConfig.setConsumerType(ConsumerType::ConsumerShared); + dlqConsumerConfig.setSchema(schemaInfo); + Consumer deadLetterConsumer; + ASSERT_EQ(ResultOk, client.subscribe(dlqPolicy.getDeadLetterTopic(), subName, dlqConsumerConfig, + deadLetterConsumer)); + + Producer producer; + ProducerConfiguration producerConfig; + producerConfig.setSchema(schemaInfo); + ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfig, producer)); + std::string data = "{\"re\":2.1,\"im\":1.23}"; + const int num = 1; + for (int i = 0; i < num; ++i) { + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(data).build())); + } + + // nack all msg. + Message msg; + for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) { + ASSERT_EQ(ResultOk, consumer.receive(msg)); + consumer.negativeAcknowledge(msg); + } + + // assert dlq msg. + for (int i = 0; i < num; i++) { + ASSERT_EQ(ResultOk, deadLetterConsumer.receive(msg, 5000)); + ASSERT_TRUE(!msg.getDataAsString().empty()); + ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic)); + ASSERT_TRUE(!msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty()); + } + ASSERT_EQ(ResultTimeout, deadLetterConsumer.receive(msg, 200)); + + client.close(); +} + +// If the user never receives this message, the message should not be delivered to the DLQ. +TEST(DeadLetterQueueTest, testWithoutConsumerReceiveImmediately) { + Client client(lookupUrl); + const std::string topic = "testWithoutConsumerReceiveImmediately-" + std::to_string(time(nullptr)); + const std::string subName = "dlq-sub"; + auto dlqPolicy = + DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build(); + ConsumerConfiguration consumerConfig; + consumerConfig.setDeadLetterPolicy(dlqPolicy); + consumerConfig.setNegativeAckRedeliveryDelayMs(100); + consumerConfig.setConsumerType(ConsumerType::ConsumerShared); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer)); + + // set ack timeout is 10 ms. + auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer); + consumerImpl.unAckedMessageTrackerPtr_.reset( + new UnAckedMessageTrackerEnabled(10, PulsarFriend::getClientImplPtr(client), consumerImpl)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); + producer.send(MessageBuilder().setContent("msg").build()); + + // Wait a while, message should not be send to DLQ + sleep(2); + + Message msg; + ASSERT_EQ(ResultOk, consumer.receive(msg)); + client.close(); +} + +TEST(DeadLetterQueueTest, testAutoSetDLQTopicName) { + Client client(lookupUrl); + const std::string topic = "testAutoSetDLQName-" + std::to_string(time(nullptr)); + const std::string subName = "dlq-sub"; + const std::string dlqTopic = "persistent://public/default/" + topic + "-" + subName + "-DLQ"; + auto dlqPolicy = + DeadLetterPolicyBuilder().maxRedeliverCount(3).initialSubscriptionName("init-sub").build(); + ConsumerConfiguration consumerConfig; + consumerConfig.setDeadLetterPolicy(dlqPolicy); + consumerConfig.setNegativeAckRedeliveryDelayMs(100); + consumerConfig.setConsumerType(ConsumerType::ConsumerShared); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer)); + + auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer); + ASSERT_EQ(consumerImpl.deadLetterPolicy_.getDeadLetterTopic(), dlqTopic); + + client.close(); +} + +class DeadLetterQueueTest : public ::testing::TestWithParam> { + public: + void SetUp() override { + isProducerBatch_ = std::get<0>(GetParam()); + isMultiConsumer_ = std::get<1>(GetParam()); + consumerType_ = std::get<2>(GetParam()); + producerConf_ = ProducerConfiguration().setBatchingEnabled(isProducerBatch_); + } + + void TearDown() override { client_.close(); } + + void initTopic(std::string topicName) { + if (isMultiConsumer_) { + // call admin api to make it partitioned + std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions"; + int res = makePutRequest(url, "5"); + LOG_INFO("res = " << res); + ASSERT_FALSE(res != 204 && res != 409); + } + } + + protected: + Client client_{lookupUrl}; + ProducerConfiguration producerConf_; + bool isMultiConsumer_; + bool isProducerBatch_; + ConsumerType consumerType_; +}; + +TEST_P(DeadLetterQueueTest, testSendDLQTriggerByAckTimeOutAndNeAck) { + Client client(lookupUrl); + const std::string topic = "testSendDLQTriggerByAckTimeOut-" + std::to_string(time(nullptr)) + + std::to_string(isMultiConsumer_) + std::to_string(isProducerBatch_) + + std::to_string(consumerType_); + const std::string subName = "dlq-sub"; + const std::string dlqTopic = topic + "-" + subName + "-DLQ"; + initTopic(topic); + + auto dlqPolicy = DeadLetterPolicyBuilder().maxRedeliverCount(3).deadLetterTopic(dlqTopic).build(); + ConsumerConfiguration consumerConfig; + consumerConfig.setDeadLetterPolicy(dlqPolicy); + consumerConfig.setNegativeAckRedeliveryDelayMs(100); + consumerConfig.setConsumerType(consumerType_); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer)); + + // Reset the unack timeout + long unackTimeOut = 200; + if (isMultiConsumer_) { + auto multiConsumer = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer); + multiConsumer->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled( + unackTimeOut, PulsarFriend::getClientImplPtr(client), *multiConsumer)); + multiConsumer->consumers_.forEachValue([&client, unackTimeOut](ConsumerImplPtr consumer) { + consumer->unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled( + unackTimeOut, PulsarFriend::getClientImplPtr(client), *consumer)); + }); + } else { + auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer); + consumerImpl.unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled( + unackTimeOut, PulsarFriend::getClientImplPtr(client), consumerImpl)); + } + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf_, producer)); + const int num = 100; + Message msg; + for (int i = 0; i < num; ++i) { + msg = MessageBuilder() + .setContent(std::to_string(i)) + .setPartitionKey("p-key") + .setOrderingKey("o-key") + .setProperty("pk-1", "pv-1") + .build(); + producer.sendAsync(msg, [](Result res, const MessageId &msgId) { ASSERT_EQ(res, ResultOk); }); + } + + // receive messages and don't ack. + for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) { + ASSERT_EQ(ResultOk, consumer.receive(msg)); + // Randomly specify some messages manually negativeAcknowledge. + if (rand() % 2 == 0) { + consumer.negativeAcknowledge(msg); + } + } + + // assert dlq msg. + Consumer deadLetterQueueConsumer; + ConsumerConfiguration dlqConsumerConfig; + dlqConsumerConfig.setSubscriptionInitialPosition(InitialPositionEarliest); + ASSERT_EQ(ResultOk, client.subscribe(dlqTopic, "dlq-sub", dlqConsumerConfig, deadLetterQueueConsumer)); + for (int i = 0; i < num; i++) { + ASSERT_EQ(ResultOk, deadLetterQueueConsumer.receive(msg)); + ASSERT_TRUE(!msg.getDataAsString().empty()); + ASSERT_EQ(msg.getPartitionKey(), "p-key"); + ASSERT_EQ(msg.getOrderingKey(), "o-key"); + ASSERT_EQ(msg.getProperty("pk-1"), "pv-1"); + ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic)); + ASSERT_TRUE(!msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty()); + } + + ASSERT_EQ(ResultTimeout, deadLetterQueueConsumer.receive(msg, 200)); +} + +TEST_P(DeadLetterQueueTest, testSendDLQTriggerByNegativeAcknowledge) { + Client client(lookupUrl); + const std::string topic = "testSendDLQTriggerByNegativeAcknowledge-" + std::to_string(time(nullptr)) + + std::to_string(isMultiConsumer_) + std::to_string(isProducerBatch_) + + std::to_string(consumerType_); + const std::string subName = "dlq-sub"; + const std::string dlqTopic = topic + subName + "DLQ"; + initTopic(topic); + + auto dlqPolicy = DeadLetterPolicyBuilder().maxRedeliverCount(3).deadLetterTopic(dlqTopic).build(); + ConsumerConfiguration consumerConfig; + consumerConfig.setDeadLetterPolicy(dlqPolicy); + consumerConfig.setNegativeAckRedeliveryDelayMs(100); + consumerConfig.setConsumerType(consumerType_); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf_, producer)); + + const int num = 10; + Message msg; + for (int i = 0; i < num; ++i) { + msg = MessageBuilder() + .setContent(std::to_string(i)) + .setPartitionKey("p-key") + .setOrderingKey("o-key") + .setProperty("pk-1", "pv-1") + .build(); + producer.sendAsync(msg, [](Result res, const MessageId &msgId) { ASSERT_EQ(res, ResultOk); }); + } + + // nack all msg. + for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) { + ASSERT_EQ(ResultOk, consumer.receive(msg)); + consumer.negativeAcknowledge(msg); + } + + // assert dlq msg. + Consumer deadLetterQueueConsumer; + ConsumerConfiguration dlqConsumerConfig; + dlqConsumerConfig.setSubscriptionInitialPosition(InitialPositionEarliest); + ASSERT_EQ(ResultOk, client.subscribe(dlqTopic, "dlq-sub", dlqConsumerConfig, deadLetterQueueConsumer)); + for (int i = 0; i < num; i++) { + ASSERT_EQ(ResultOk, deadLetterQueueConsumer.receive(msg)); + ASSERT_TRUE(!msg.getDataAsString().empty()); + ASSERT_EQ(msg.getPartitionKey(), "p-key"); + ASSERT_EQ(msg.getOrderingKey(), "o-key"); + ASSERT_EQ(msg.getProperty("pk-1"), "pv-1"); + ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic)); + ASSERT_TRUE(!msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty()); + } + ASSERT_EQ(ResultTimeout, deadLetterQueueConsumer.receive(msg, 200)); +} + +TEST_P(DeadLetterQueueTest, testInitSubscription) { + Client client(lookupUrl); + const std::string topic = "testInitSubscription-" + std::to_string(time(nullptr)) + + std::to_string(isMultiConsumer_) + std::to_string(isProducerBatch_) + + std::to_string(consumerType_); + const std::string subName = "dlq-sub"; + const std::string dlqTopic = topic + subName + "DLQ"; + const std::string dlqInitSub = "dlq-init-sub"; + initTopic(topic); + + auto dlqPolicy = DeadLetterPolicyBuilder() + .maxRedeliverCount(3) + .initialSubscriptionName(dlqInitSub) + .deadLetterTopic(dlqTopic) + .build(); + ConsumerConfiguration consumerConfig; + consumerConfig.setDeadLetterPolicy(dlqPolicy); + consumerConfig.setNegativeAckRedeliveryDelayMs(100); + consumerConfig.setConsumerType(consumerType_); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer)); + + Consumer deadLetterQueueConsumer; + ConsumerConfiguration dlqConsumerConfig; + dlqConsumerConfig.setSubscriptionInitialPosition(InitialPositionEarliest); + ASSERT_EQ(ResultOk, client.subscribe(dlqTopic, "dlq-sub", dlqConsumerConfig, deadLetterQueueConsumer)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf_, producer)); + + const int num = 10; + Message msg; + for (int i = 0; i < num; ++i) { + msg = MessageBuilder().setContent(std::to_string(i)).build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + } + + // nack all msg. + for (int i = 0; i < dlqPolicy.getMaxRedeliverCount() * num + num; ++i) { + ASSERT_EQ(ResultOk, consumer.receive(msg)); + consumer.negativeAcknowledge(msg); + } + + // Use this subscription to ensure that messages are sent to the DLQ. + for (int i = 0; i < num; i++) { + ASSERT_EQ(ResultOk, deadLetterQueueConsumer.receive(msg)); + ASSERT_TRUE(!msg.getDataAsString().empty()); + ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic)); + ASSERT_TRUE(!msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty()); + } + + // If there is no initial subscription, then the subscription will not receive the DLQ messages sent + // before the subscription. + Consumer initDLQConsumer; + ConsumerConfiguration initDLQConsumerConfig; + dlqConsumerConfig.setSubscriptionInitialPosition(InitialPositionLatest); + ASSERT_EQ(ResultOk, client.subscribe(dlqTopic, dlqInitSub, initDLQConsumerConfig, initDLQConsumer)); + for (int i = 0; i < num; i++) { + ASSERT_EQ(ResultOk, initDLQConsumer.receive(msg, 1000)); + ASSERT_TRUE(!msg.getDataAsString().empty()); + ASSERT_TRUE(msg.getProperty(SYSTEM_PROPERTY_REAL_TOPIC).find(topic)); + ASSERT_TRUE(!msg.getProperty(PROPERTY_ORIGIN_MESSAGE_ID).empty()); + } + ASSERT_EQ(ResultTimeout, initDLQConsumer.receive(msg, 200)); +} + +bool isBatchs[2] = {true, false}; +bool isMultiTopics[2] = {true, false}; +ConsumerType subTypes[2] = {ConsumerType::ConsumerShared, ConsumerType::ConsumerKeyShared}; + +std::vector> getValues() { + std::vector> values; + for (const auto isBatch : isBatchs) { + for (const auto isMultiTopic : isMultiTopics) { + for (const auto subType : subTypes) { + values.emplace_back(std::make_tuple(isBatch, isMultiTopic, subType)); + } + } + } + return values; +} + +INSTANTIATE_TEST_CASE_P(Pulsar, DeadLetterQueueTest, ::testing::ValuesIn(getValues())); + +} // namespace pulsar From 1ded98f7295351f5bb2f38d9cebc91818583e8da Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Tue, 6 Dec 2022 20:25:36 +0800 Subject: [PATCH 3/5] add comment. --- tests/DeadLetterQueueTest.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/DeadLetterQueueTest.cc b/tests/DeadLetterQueueTest.cc index 2849b903..7cfcf440 100644 --- a/tests/DeadLetterQueueTest.cc +++ b/tests/DeadLetterQueueTest.cc @@ -35,6 +35,7 @@ DECLARE_LOG_OBJECT() namespace pulsar { +// Original issue: https://github.com/apache/pulsar/pull/9970 // Because isSchemaValidationEnforced config defaults to false. // Therefore, in scenarios where AUTO_PUBLISH schemas are not supported now, the unit test can pass. // When implementing AUTO_PUBLISH schema, we should set isSchemaValidationEnforced to true to revalidate. From 9d803d8de893dbcb4056ceb7dcf4fdb4aa81e506 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Wed, 7 Dec 2022 11:44:27 +0800 Subject: [PATCH 4/5] use boost opt --- lib/ConsumerImpl.cc | 2 +- lib/MultiTopicsConsumerImpl.cc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index c4aac41c..ed04d18d 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -1558,7 +1558,7 @@ void ConsumerImpl::cancelTimers() noexcept { void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack cb) { auto messages = possibleSendToDeadLetterTopicMessages_.find(messageId); - if (messages.is_empty()) { + if (!messages) { cb(false); return; } diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index 4accac32..51d8ffcc 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -780,7 +780,7 @@ void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::setredeliverUnacknowledgedMessages(kv.second); } else { LOG_ERROR("Message of topic: " << kv.first << " not in consumers"); From 326d60d254c2084872c05ddd992a1c8a4e9df9e6 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Wed, 7 Dec 2022 19:59:41 +0800 Subject: [PATCH 5/5] Force set isSchemaValidationEnforced=false. --- test-conf/standalone-ssl.conf | 3 +++ tests/DeadLetterQueueTest.cc | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/test-conf/standalone-ssl.conf b/test-conf/standalone-ssl.conf index 2ee44322..0cb20232 100644 --- a/test-conf/standalone-ssl.conf +++ b/test-conf/standalone-ssl.conf @@ -50,6 +50,9 @@ brokerShutdownTimeoutMs=3000 # Enable backlog quota check. Enforces action on topic when the quota is reached backlogQuotaCheckEnabled=true +# Disable schema validation: If a producer doesn’t carry a schema, the producer is allowed to connect to the topic and produce data. +isSchemaValidationEnforced=false + # How often to check for topics that have reached the quota backlogQuotaCheckIntervalInSeconds=60 diff --git a/tests/DeadLetterQueueTest.cc b/tests/DeadLetterQueueTest.cc index 7cfcf440..aed620e8 100644 --- a/tests/DeadLetterQueueTest.cc +++ b/tests/DeadLetterQueueTest.cc @@ -46,7 +46,7 @@ TEST(DeadLetterQueueTest, testAutoSchema) { static const std::string jsonSchema = R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})"; - SchemaInfo schemaInfo(AUTO_PUBLISH, "test-json", jsonSchema); + SchemaInfo schemaInfo(JSON, "test-json", jsonSchema); auto dlqPolicy = DeadLetterPolicyBuilder() .maxRedeliverCount(3)