From 83f022dc7e18a14f0535000bc013bbee017f049d Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Mon, 27 Mar 2023 22:38:51 +0800 Subject: [PATCH 1/2] [feat] Support pattern subscribe non-persistent topic. --- include/pulsar/ConsumerConfiguration.h | 14 ++++ include/pulsar/RegexSubscriptionMode.h | 42 ++++++++++ lib/BinaryProtoLookupService.cc | 11 +-- lib/BinaryProtoLookupService.h | 8 +- lib/ClientConnection.cc | 6 +- lib/ClientConnection.h | 4 +- lib/ClientImpl.cc | 36 ++++++-- lib/ClientImpl.h | 7 +- lib/Commands.cc | 4 +- lib/Commands.h | 3 +- lib/ConsumerConfiguration.cc | 10 +++ lib/ConsumerConfigurationImpl.h | 2 + lib/HTTPLookupService.cc | 19 ++++- lib/HTTPLookupService.h | 4 +- lib/LookupService.h | 5 +- lib/MultiTopicsConsumerImpl.h | 1 + lib/PatternMultiTopicsConsumerImpl.cc | 20 +++-- lib/PatternMultiTopicsConsumerImpl.h | 4 +- lib/ProtoApiEnums.h | 5 ++ lib/RetryableLookupService.h | 6 +- lib/TopicName.cc | 12 +++ lib/TopicName.h | 2 + test-conf/standalone-ssl.conf | 4 + tests/ConsumerTest.cc | 97 ++++++++++++++++++++++ tests/LookupServiceTest.cc | 109 ++++++++++--------------- tests/TopicNameTest.cc | 17 ++++ 26 files changed, 348 insertions(+), 104 deletions(-) create mode 100644 include/pulsar/RegexSubscriptionMode.h diff --git a/include/pulsar/ConsumerConfiguration.h b/include/pulsar/ConsumerConfiguration.h index 0e6634df..8bd95dad 100644 --- a/include/pulsar/ConsumerConfiguration.h +++ b/include/pulsar/ConsumerConfiguration.h @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -383,6 +384,19 @@ class PULSAR_PUBLIC ConsumerConfiguration { */ int getPatternAutoDiscoveryPeriod() const; + /** + * Determines which topics this consumer should be subscribed to - Persistent, Non-Persistent, or + * AllTopics. Only used with pattern subscriptions. + * + * @param regexSubscriptionMode The default value is `PersistentOnly`. + */ + ConsumerConfiguration& setRegexSubscriptionMode(RegexSubscriptionMode regexSubscriptionMode); + + /** + * @return the regex subscription mode for the pattern consumer. + */ + RegexSubscriptionMode getRegexSubscriptionMode() const; + /** * The default value is `InitialPositionLatest`. * diff --git a/include/pulsar/RegexSubscriptionMode.h b/include/pulsar/RegexSubscriptionMode.h new file mode 100644 index 00000000..2143c1c9 --- /dev/null +++ b/include/pulsar/RegexSubscriptionMode.h @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef PULSAR_CPP_REGEX_SUB_MODE_H +#define PULSAR_CPP_REGEX_SUB_MODE_H + +namespace pulsar { +enum RegexSubscriptionMode +{ + /** + * Only subscribe to persistent topics. + */ + PersistentOnly = 0, + + /** + * Only subscribe to non-persistent topics. + */ + NonPersistentOnly = 1, + + /** + * Subscribe to both persistent and non-persistent topics. + */ + AllTopics = 2 +}; +} + +#endif // PULSAR_CPP_REGEX_SUB_MODE_H diff --git a/lib/BinaryProtoLookupService.cc b/lib/BinaryProtoLookupService.cc index 502f7a8d..0b23493b 100644 --- a/lib/BinaryProtoLookupService.cc +++ b/lib/BinaryProtoLookupService.cc @@ -151,7 +151,7 @@ uint64_t BinaryProtoLookupService::newRequestId() { } Future BinaryProtoLookupService::getTopicsOfNamespaceAsync( - const NamespaceNamePtr& nsName) { + const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) { NamespaceTopicsPromisePtr promise = std::make_shared>(); if (!nsName) { promise->setFailed(ResultInvalidTopicName); @@ -160,7 +160,7 @@ Future BinaryProtoLookupService::getTopicsOfNamespac std::string namespaceName = nsName->toString(); cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost()) .addListener(std::bind(&BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest, this, - namespaceName, std::placeholders::_1, std::placeholders::_2, promise)); + namespaceName, mode, std::placeholders::_1, std::placeholders::_2, promise)); return promise->getFuture(); } @@ -201,7 +201,9 @@ void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName }); } -void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string& nsName, Result result, +void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string& nsName, + CommandGetTopicsOfNamespace_Mode mode, + Result result, const ClientConnectionWeakPtr& clientCnx, NamespaceTopicsPromisePtr promise) { if (result != ResultOk) { @@ -212,8 +214,7 @@ void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string ClientConnectionPtr conn = clientCnx.lock(); uint64_t requestId = newRequestId(); LOG_DEBUG("sendGetTopicsOfNamespaceRequest. requestId: " << requestId << " nsName: " << nsName); - - conn->newGetTopicsOfNamespace(nsName, requestId) + conn->newGetTopicsOfNamespace(nsName, mode, requestId) .addListener(std::bind(&BinaryProtoLookupService::getTopicsOfNamespaceListener, this, std::placeholders::_1, std::placeholders::_2, promise)); } diff --git a/lib/BinaryProtoLookupService.h b/lib/BinaryProtoLookupService.h index cfac023c..5d795cb1 100644 --- a/lib/BinaryProtoLookupService.h +++ b/lib/BinaryProtoLookupService.h @@ -49,7 +49,9 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { Future getPartitionMetadataAsync(const TopicNamePtr& topicName) override; - Future getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) override; + Future getTopicsOfNamespaceAsync( + const NamespaceNamePtr& nsName, + CommandGetTopicsOfNamespace_Mode mode = CommandGetTopicsOfNamespace_Mode_PERSISTENT) override; Future> getSchema(const TopicNamePtr& topicName) override; @@ -75,8 +77,8 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { const ClientConnectionWeakPtr& clientCnx, LookupDataResultPromisePtr promise); - void sendGetTopicsOfNamespaceRequest(const std::string& nsName, Result result, - const ClientConnectionWeakPtr& clientCnx, + void sendGetTopicsOfNamespaceRequest(const std::string& nsName, CommandGetTopicsOfNamespace_Mode mode, + Result result, const ClientConnectionWeakPtr& clientCnx, NamespaceTopicsPromisePtr promise); void sendGetSchemaRequest(const std::string& topiName, Result result, diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 5837c502..81eb36dc 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -1293,8 +1293,8 @@ Future ClientConnection::newGetLastMessageId(u return promise.getFuture(); } -Future ClientConnection::newGetTopicsOfNamespace(const std::string& nsName, - uint64_t requestId) { +Future ClientConnection::newGetTopicsOfNamespace( + const std::string& nsName, CommandGetTopicsOfNamespace_Mode mode, uint64_t requestId) { Lock lock(mutex_); Promise promise; if (isClosed()) { @@ -1306,7 +1306,7 @@ Future ClientConnection::newGetTopicsOfNamespace(con pendingGetNamespaceTopicsRequests_.insert(std::make_pair(requestId, promise)); lock.unlock(); - sendCommand(Commands::newGetTopicsOfNamespace(nsName, requestId)); + sendCommand(Commands::newGetTopicsOfNamespace(nsName, mode, requestId)); return promise.getFuture(); } diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 95bb1ab9..04763162 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -181,7 +181,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this newGetLastMessageId(uint64_t consumerId, uint64_t requestId); - Future newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId); + Future newGetTopicsOfNamespace(const std::string& nsName, + CommandGetTopicsOfNamespace_Mode mode, + uint64_t requestId); Future> newGetSchema(const std::string& topicName, uint64_t requestId); diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index 3b9606a0..38ac689a 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -338,29 +338,53 @@ void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const } } - NamespaceNamePtr nsName = topicNamePtr->getNamespaceName(); + if (TopicName::containsDomain(regexPattern)) { + LOG_WARN("Ignore invalid domain: " + << topicNamePtr->getDomain() + << ", use the RegexSubscriptionMode parameter to set the topic type"); + } + + CommandGetTopicsOfNamespace_Mode mode; + auto regexSubscriptionMode = conf.getRegexSubscriptionMode(); + switch (regexSubscriptionMode) { + case PersistentOnly: + mode = CommandGetTopicsOfNamespace_Mode_PERSISTENT; + break; + case NonPersistentOnly: + mode = CommandGetTopicsOfNamespace_Mode_NON_PERSISTENT; + break; + case AllTopics: + mode = CommandGetTopicsOfNamespace_Mode_ALL; + break; + default: + LOG_ERROR("RegexSubscriptionMode not valid: " << regexSubscriptionMode); + callback(ResultInvalidConfiguration, Consumer()); + return; + } - lookupServicePtr_->getTopicsOfNamespaceAsync(nsName).addListener( - std::bind(&ClientImpl::createPatternMultiTopicsConsumer, shared_from_this(), std::placeholders::_1, - std::placeholders::_2, regexPattern, subscriptionName, conf, callback)); + lookupServicePtr_->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode) + .addListener(std::bind(&ClientImpl::createPatternMultiTopicsConsumer, shared_from_this(), + std::placeholders::_1, std::placeholders::_2, regexPattern, mode, + subscriptionName, conf, callback)); } void ClientImpl::createPatternMultiTopicsConsumer(const Result result, const NamespaceTopicsPtr topics, const std::string& regexPattern, + CommandGetTopicsOfNamespace_Mode mode, const std::string& subscriptionName, const ConsumerConfiguration& conf, SubscribeCallback callback) { if (result == ResultOk) { ConsumerImplBasePtr consumer; - PULSAR_REGEX_NAMESPACE::regex pattern(regexPattern); + PULSAR_REGEX_NAMESPACE::regex pattern(TopicName::removeDomain(regexPattern)); NamespaceTopicsPtr matchTopics = PatternMultiTopicsConsumerImpl::topicsPatternFilter(*topics, pattern); auto interceptors = std::make_shared(conf.getInterceptors()); - consumer = std::make_shared(shared_from_this(), regexPattern, + consumer = std::make_shared(shared_from_this(), regexPattern, mode, *matchTopics, subscriptionName, conf, lookupServicePtr_, interceptors); diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index dd7ffdd9..a7b36ffb 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -28,6 +28,7 @@ #include "Future.h" #include "LookupDataResult.h" #include "MemoryLimitController.h" +#include "ProtoApiEnums.h" #include "ServiceNameResolver.h" #include "SynchronizedHashMap.h" @@ -151,8 +152,10 @@ class ClientImpl : public std::enable_shared_from_this { void handleClose(Result result, SharedInt remaining, ResultCallback callback); void createPatternMultiTopicsConsumer(const Result result, const NamespaceTopicsPtr topics, - const std::string& regexPattern, const std::string& consumerName, - const ConsumerConfiguration& conf, SubscribeCallback callback); + const std::string& regexPattern, + CommandGetTopicsOfNamespace_Mode mode, + const std::string& consumerName, const ConsumerConfiguration& conf, + SubscribeCallback callback); enum State { diff --git a/lib/Commands.cc b/lib/Commands.cc index 829be579..a260b29c 100644 --- a/lib/Commands.cc +++ b/lib/Commands.cc @@ -609,12 +609,14 @@ SharedBuffer Commands::newGetLastMessageId(uint64_t consumerId, uint64_t request return buffer; } -SharedBuffer Commands::newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId) { +SharedBuffer Commands::newGetTopicsOfNamespace(const std::string& nsName, + CommandGetTopicsOfNamespace_Mode mode, uint64_t requestId) { BaseCommand cmd; cmd.set_type(BaseCommand::GET_TOPICS_OF_NAMESPACE); CommandGetTopicsOfNamespace* getTopics = cmd.mutable_gettopicsofnamespace(); getTopics->set_request_id(requestId); getTopics->set_namespace_(nsName); + getTopics->set_mode(static_cast(mode)); const SharedBuffer buffer = writeMessageWithSize(cmd); cmd.clear_gettopicsofnamespace(); diff --git a/lib/Commands.h b/lib/Commands.h index 0331173d..adf4fe0d 100644 --- a/lib/Commands.h +++ b/lib/Commands.h @@ -156,7 +156,8 @@ class Commands { static SharedBuffer newSeek(uint64_t consumerId, uint64_t requestId, const MessageId& messageId); static SharedBuffer newSeek(uint64_t consumerId, uint64_t requestId, uint64_t timestamp); static SharedBuffer newGetLastMessageId(uint64_t consumerId, uint64_t requestId); - static SharedBuffer newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId); + static SharedBuffer newGetTopicsOfNamespace(const std::string& nsName, + CommandGetTopicsOfNamespace_Mode mode, uint64_t requestId); static bool peerSupportsGetLastMessageId(int32_t peerVersion); static bool peerSupportsActiveConsumerListener(int32_t peerVersion); diff --git a/lib/ConsumerConfiguration.cc b/lib/ConsumerConfiguration.cc index 4764b469..044518a5 100644 --- a/lib/ConsumerConfiguration.cc +++ b/lib/ConsumerConfiguration.cc @@ -317,4 +317,14 @@ ConsumerConfiguration& ConsumerConfiguration::setAckReceiptEnabled(bool ackRecei bool ConsumerConfiguration::isAckReceiptEnabled() const { return impl_->ackReceiptEnabled; } +ConsumerConfiguration& ConsumerConfiguration::setRegexSubscriptionMode( + RegexSubscriptionMode regexSubscriptionMode) { + impl_->regexSubscriptionMode = regexSubscriptionMode; + return *this; +} + +RegexSubscriptionMode ConsumerConfiguration::getRegexSubscriptionMode() const { + return impl_->regexSubscriptionMode; +} + } // namespace pulsar diff --git a/lib/ConsumerConfigurationImpl.h b/lib/ConsumerConfigurationImpl.h index e84aa0ac..20a1daed 100644 --- a/lib/ConsumerConfigurationImpl.h +++ b/lib/ConsumerConfigurationImpl.h @@ -48,6 +48,8 @@ struct ConsumerConfigurationImpl { BatchReceivePolicy batchReceivePolicy{}; DeadLetterPolicy deadLetterPolicy; int patternAutoDiscoveryPeriod{60}; + RegexSubscriptionMode regexSubscriptionMode{RegexSubscriptionMode::PersistentOnly}; + bool replicateSubscriptionStateEnabled{false}; std::map properties; std::map subscriptionProperties; diff --git a/lib/HTTPLookupService.cc b/lib/HTTPLookupService.cc index 0ce68391..920592d0 100644 --- a/lib/HTTPLookupService.cc +++ b/lib/HTTPLookupService.cc @@ -126,17 +126,30 @@ Future HTTPLookupService::getPartitionMetadataAsync } Future HTTPLookupService::getTopicsOfNamespaceAsync( - const NamespaceNamePtr &nsName) { + const NamespaceNamePtr &nsName, CommandGetTopicsOfNamespace_Mode mode) { NamespaceTopicsPromise promise; std::stringstream completeUrlStream; + auto convertRegexSubMode = [](CommandGetTopicsOfNamespace_Mode mode) { + switch (mode) { + case CommandGetTopicsOfNamespace_Mode_PERSISTENT: + return "PERSISTENT"; + case CommandGetTopicsOfNamespace_Mode_NON_PERSISTENT: + return "NON_PERSISTENT"; + case CommandGetTopicsOfNamespace_Mode_ALL: + return "ALL"; + default: + return "PERSISTENT"; + } + }; + const auto &url = serviceNameResolver_.resolveHost(); if (nsName->isV2()) { completeUrlStream << url << ADMIN_PATH_V2 << "namespaces" << '/' << nsName->toString() << '/' - << "topics"; + << "topics?mode=" << convertRegexSubMode(mode); } else { completeUrlStream << url << ADMIN_PATH_V1 << "namespaces" << '/' << nsName->toString() << '/' - << "destinations"; + << "destinations?mode=" << convertRegexSubMode(mode); } executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleNamespaceTopicsHTTPRequest, diff --git a/lib/HTTPLookupService.h b/lib/HTTPLookupService.h index 91356773..b1a209eb 100644 --- a/lib/HTTPLookupService.h +++ b/lib/HTTPLookupService.h @@ -79,7 +79,9 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t Future> getSchema(const TopicNamePtr& topicName) override; - Future getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) override; + Future getTopicsOfNamespaceAsync( + const NamespaceNamePtr& nsName, + CommandGetTopicsOfNamespace_Mode mode = CommandGetTopicsOfNamespace_Mode_PERSISTENT) override; }; } // namespace pulsar diff --git a/lib/LookupService.h b/lib/LookupService.h index f1c5de83..6e460f6c 100644 --- a/lib/LookupService.h +++ b/lib/LookupService.h @@ -29,6 +29,7 @@ #include "Future.h" #include "LookupDataResult.h" +#include "ProtoApiEnums.h" namespace pulsar { using NamespaceTopicsPtr = std::shared_ptr>; @@ -72,7 +73,9 @@ class LookupService { * * Returns all the topics name for a given namespace. */ - virtual Future getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) = 0; + virtual Future getTopicsOfNamespaceAsync( + const NamespaceNamePtr& nsName, + CommandGetTopicsOfNamespace_Mode mode = CommandGetTopicsOfNamespace_Mode_PERSISTENT) = 0; /** * returns current SchemaInfo {@link SchemaInfo} for a given topic. diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h index 35e15047..b00b0f23 100644 --- a/lib/MultiTopicsConsumerImpl.h +++ b/lib/MultiTopicsConsumerImpl.h @@ -180,6 +180,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery); FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery); FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition); + FRIEND_TEST(ConsumerTest, testPatternSubscribeTopic); }; typedef std::shared_ptr MultiTopicsConsumerImplPtr; diff --git a/lib/PatternMultiTopicsConsumerImpl.cc b/lib/PatternMultiTopicsConsumerImpl.cc index 02a77037..4c5cc028 100644 --- a/lib/PatternMultiTopicsConsumerImpl.cc +++ b/lib/PatternMultiTopicsConsumerImpl.cc @@ -28,13 +28,15 @@ DECLARE_LOG_OBJECT() using namespace pulsar; PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl( - ClientImplPtr client, const std::string pattern, const std::vector& topics, - const std::string& subscriptionName, const ConsumerConfiguration& conf, - const LookupServicePtr lookupServicePtr_, const ConsumerInterceptorsPtr interceptors) + ClientImplPtr client, const std::string pattern, CommandGetTopicsOfNamespace_Mode getTopicsMode, + const std::vector& topics, const std::string& subscriptionName, + const ConsumerConfiguration& conf, const LookupServicePtr lookupServicePtr_, + const ConsumerInterceptorsPtr interceptors) : MultiTopicsConsumerImpl(client, topics, subscriptionName, TopicName::get(pattern), conf, lookupServicePtr_, interceptors), patternString_(pattern), - pattern_(PULSAR_REGEX_NAMESPACE::regex(pattern)), + pattern_(PULSAR_REGEX_NAMESPACE::regex(TopicName::removeDomain(pattern))), + getTopicsMode_(getTopicsMode), autoDiscoveryTimer_(client->getIOExecutorProvider()->get()->createDeadlineTimer()), autoDiscoveryRunning_(false) { namespaceName_ = TopicName::get(pattern)->getNamespaceName(); @@ -75,7 +77,7 @@ void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const boost::system: // already get namespace from pattern. assert(namespaceName_); - lookupServicePtr_->getTopicsOfNamespaceAsync(namespaceName_) + lookupServicePtr_->getTopicsOfNamespaceAsync(namespaceName_, getTopicsMode_) .addListener(std::bind(&PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace, this, std::placeholders::_1, std::placeholders::_2)); } @@ -193,10 +195,10 @@ void PatternMultiTopicsConsumerImpl::onTopicsRemoved(NamespaceTopicsPtr removedT NamespaceTopicsPtr PatternMultiTopicsConsumerImpl::topicsPatternFilter( const std::vector& topics, const PULSAR_REGEX_NAMESPACE::regex& pattern) { NamespaceTopicsPtr topicsResultPtr = std::make_shared>(); - - for (std::vector::const_iterator itr = topics.begin(); itr != topics.end(); itr++) { - if (PULSAR_REGEX_NAMESPACE::regex_match(*itr, pattern)) { - topicsResultPtr->push_back(*itr); + for (const auto& it : topics) { + auto topic = TopicName::removeDomain(it); + if (PULSAR_REGEX_NAMESPACE::regex_match(topic, pattern)) { + topicsResultPtr->push_back(std::move(it)); } } return topicsResultPtr; diff --git a/lib/PatternMultiTopicsConsumerImpl.h b/lib/PatternMultiTopicsConsumerImpl.h index 87e301e7..f13750a9 100644 --- a/lib/PatternMultiTopicsConsumerImpl.h +++ b/lib/PatternMultiTopicsConsumerImpl.h @@ -48,6 +48,7 @@ class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl { // when subscribe, client will first get all topics that match given pattern. // `topics` contains the topics that match `patternString`. PatternMultiTopicsConsumerImpl(ClientImplPtr client, const std::string patternString, + CommandGetTopicsOfNamespace_Mode getTopicsMode, const std::vector& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf, const LookupServicePtr lookupServicePtr_, @@ -57,7 +58,7 @@ class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl { void autoDiscoveryTimerTask(const boost::system::error_code& err); - // filter input `topics` with given `pattern`, return matched topics + // filter input `topics` with given `pattern`, return matched topics. Do not match topic domain. static NamespaceTopicsPtr topicsPatternFilter(const std::vector& topics, const PULSAR_REGEX_NAMESPACE::regex& pattern); @@ -72,6 +73,7 @@ class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl { private: const std::string patternString_; const PULSAR_REGEX_NAMESPACE::regex pattern_; + const CommandGetTopicsOfNamespace_Mode getTopicsMode_; typedef std::shared_ptr TimerPtr; TimerPtr autoDiscoveryTimer_; bool autoDiscoveryRunning_; diff --git a/lib/ProtoApiEnums.h b/lib/ProtoApiEnums.h index 1f1a79fd..5f1876bd 100644 --- a/lib/ProtoApiEnums.h +++ b/lib/ProtoApiEnums.h @@ -49,6 +49,11 @@ constexpr int CommandSubscribe_SubType_Shared = 1; constexpr int CommandSubscribe_SubType_Failover = 2; constexpr int CommandSubscribe_SubType_Key_Shared = 3; +using CommandGetTopicsOfNamespace_Mode = int; +constexpr int CommandGetTopicsOfNamespace_Mode_PERSISTENT = 0; +constexpr int CommandGetTopicsOfNamespace_Mode_NON_PERSISTENT = 1; +constexpr int CommandGetTopicsOfNamespace_Mode_ALL = 2; + using CommandAck_ValidationError = int; constexpr CommandAck_ValidationError CommandAck_ValidationError_UncompressedSizeCorruption = 0; constexpr CommandAck_ValidationError CommandAck_ValidationError_DecompressionError = 1; diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h index 4fc9a733..b417247f 100644 --- a/lib/RetryableLookupService.h +++ b/lib/RetryableLookupService.h @@ -60,10 +60,12 @@ class RetryableLookupService : public LookupService, [this, topicName] { return lookupService_->getPartitionMetadataAsync(topicName); }); } - Future getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) override { + Future getTopicsOfNamespaceAsync( + const NamespaceNamePtr& nsName, + CommandGetTopicsOfNamespace_Mode mode = CommandGetTopicsOfNamespace_Mode_PERSISTENT) override { return executeAsync( "get-topics-of-namespace-" + nsName->toString(), - [this, nsName] { return lookupService_->getTopicsOfNamespaceAsync(nsName); }); + [this, nsName, mode] { return lookupService_->getTopicsOfNamespaceAsync(nsName, mode); }); } Future> getSchema(const TopicNamePtr& topicName) override { diff --git a/lib/TopicName.cc b/lib/TopicName.cc index 48c52c4a..5b892fc8 100644 --- a/lib/TopicName.cc +++ b/lib/TopicName.cc @@ -256,4 +256,16 @@ int TopicName::getPartitionIndex(const std::string& topic) { NamespaceNamePtr TopicName::getNamespaceName() { return namespaceName_; } +std::string TopicName::removeDomain(const std::string& topicName) { + auto index = topicName.find("://"); + if (index != std::string::npos) { + return topicName.substr(index + 3, topicName.length()); + } + return topicName; +} + +bool TopicName::containsDomain(const std::string& topicName) { + return topicName.find("://") != std::string::npos; +} + } // namespace pulsar diff --git a/lib/TopicName.h b/lib/TopicName.h index 51f701fb..8cc9cb53 100644 --- a/lib/TopicName.h +++ b/lib/TopicName.h @@ -67,6 +67,8 @@ class PULSAR_PUBLIC TopicName : public ServiceUnitId { static std::shared_ptr get(const std::string& topicName); bool operator==(const TopicName& other); static std::string getEncodedName(const std::string& nameBeforeEncoding); + static std::string removeDomain(const std::string& topicName); + static bool containsDomain(const std::string& topicName); std::string getTopicPartitionName(unsigned int partition) const; static int getPartitionIndex(const std::string& topic); diff --git a/test-conf/standalone-ssl.conf b/test-conf/standalone-ssl.conf index 1e543604..4b150076 100644 --- a/test-conf/standalone-ssl.conf +++ b/test-conf/standalone-ssl.conf @@ -19,6 +19,10 @@ ### --- General broker settings --- ### +# Disable system topic +systemTopicEnabled=false +topicLevelPoliciesEnabled=false + # Zookeeper quorum connection string zookeeperServers= diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index 0ecf82cc..23ea6879 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -1025,6 +1025,103 @@ TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) { ASSERT_EQ(ResultOk, client.close()); } +TEST(ConsumerTest, testPatternSubscribeTopic) { + Client client(lookupUrl); + auto topicName = "testPatternSubscribeTopic" + std::to_string(time(nullptr)); + std::string topicName1 = "persistent://public/default/" + topicName + "1"; + std::string topicName2 = "persistent://public/default/" + topicName + "2"; + std::string topicName3 = "non-persistent://public/default/" + topicName + "3np"; + // This will not match pattern + std::string topicName4 = "persistent://public/default/noMatch" + topicName; + + // 0. trigger create topic + Producer producer1; + Result result = client.createProducer(topicName1, producer1); + ASSERT_EQ(ResultOk, result); + Producer producer2; + result = client.createProducer(topicName2, producer2); + ASSERT_EQ(ResultOk, result); + Producer producer3; + result = client.createProducer(topicName3, producer3); + ASSERT_EQ(ResultOk, result); + Producer producer4; + result = client.createProducer(topicName4, producer4); + ASSERT_EQ(ResultOk, result); + + // verify sub persistent and non-persistent topic + { + // 1. Use pattern to sub topic1, topic2, topic3 + ConsumerConfiguration consConfig; + consConfig.setConsumerType(ConsumerShared); + consConfig.setRegexSubscriptionMode(RegexSubscriptionMode::AllTopics); + Consumer consumer; + std::string pattern = "public/default/" + topicName + ".*"; + ASSERT_EQ(ResultOk, client.subscribeWithRegex(pattern, "sub-all", consConfig, consumer)); + auto multiConsumerImplPtr = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer); + ASSERT_EQ(multiConsumerImplPtr->consumers_.size(), 3); + ASSERT_TRUE(multiConsumerImplPtr->consumers_.find(topicName1)); + ASSERT_TRUE(multiConsumerImplPtr->consumers_.find(topicName2)); + ASSERT_TRUE(multiConsumerImplPtr->consumers_.find(topicName3)); + ASSERT_FALSE(multiConsumerImplPtr->consumers_.find(topicName4)); + + // 2. send msg to topic1, topic2, topic3, topic4 + int messageNumber = 10; + for (int msgNum = 0; msgNum < messageNumber; msgNum++) { + auto content = "msg-content" + std::to_string(msgNum); + ASSERT_EQ(ResultOk, producer1.send(MessageBuilder().setContent(content).build())); + ASSERT_EQ(ResultOk, producer2.send(MessageBuilder().setContent(content).build())); + ASSERT_EQ(ResultOk, producer3.send(MessageBuilder().setContent(content).build())); + ASSERT_EQ(ResultOk, producer4.send(MessageBuilder().setContent(content).build())); + } + + // 3. receive msg from topic1, topic2, topic3 + Message m; + for (int i = 0; i < 3 * messageNumber; i++) { + ASSERT_EQ(ResultOk, consumer.receive(m, 1000)); + ASSERT_EQ(ResultOk, consumer.acknowledge(m)); + } + // verify no more to receive, because producer4 not match pattern + ASSERT_EQ(ResultTimeout, consumer.receive(m, 1000)); + ASSERT_EQ(ResultOk, consumer.unsubscribe()); + } + + // verify only sub persistent topic + { + ConsumerConfiguration consConfig; + consConfig.setConsumerType(ConsumerShared); + consConfig.setRegexSubscriptionMode(RegexSubscriptionMode::PersistentOnly); + Consumer consumer; + std::string pattern = "public/default/" + topicName + ".*"; + ASSERT_EQ(ResultOk, client.subscribeWithRegex(pattern, "sub-persistent", consConfig, consumer)); + auto multiConsumerImplPtr = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer); + ASSERT_EQ(multiConsumerImplPtr->consumers_.size(), 2); + ASSERT_TRUE(multiConsumerImplPtr->consumers_.find(topicName1)); + ASSERT_TRUE(multiConsumerImplPtr->consumers_.find(topicName2)); + ASSERT_FALSE(multiConsumerImplPtr->consumers_.find(topicName3)); + ASSERT_FALSE(multiConsumerImplPtr->consumers_.find(topicName4)); + ASSERT_EQ(ResultOk, consumer.unsubscribe()); + } + + // verify only sub non-persistent topic + { + ConsumerConfiguration consConfig; + consConfig.setConsumerType(ConsumerShared); + consConfig.setRegexSubscriptionMode(RegexSubscriptionMode::NonPersistentOnly); + Consumer consumer; + std::string pattern = "public/default/" + topicName + ".*"; + ASSERT_EQ(ResultOk, client.subscribeWithRegex(pattern, "sub-non-persistent", consConfig, consumer)); + auto multiConsumerImplPtr = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer); + ASSERT_EQ(multiConsumerImplPtr->consumers_.size(), 1); + ASSERT_FALSE(multiConsumerImplPtr->consumers_.find(topicName1)); + ASSERT_FALSE(multiConsumerImplPtr->consumers_.find(topicName2)); + ASSERT_TRUE(multiConsumerImplPtr->consumers_.find(topicName3)); + ASSERT_FALSE(multiConsumerImplPtr->consumers_.find(topicName4)); + ASSERT_EQ(ResultOk, consumer.unsubscribe()); + } + + client.close(); +} + class ConsumerSeekTest : public ::testing::TestWithParam { public: void SetUp() override { producerConf_ = ProducerConfiguration().setBatchingEnabled(GetParam()); } diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc index 9467727f..4a38054b 100644 --- a/tests/LookupServiceTest.cc +++ b/tests/LookupServiceTest.cc @@ -70,71 +70,6 @@ TEST(LookupServiceTest, basicLookup) { ASSERT_EQ(url, lookupResult.physicalAddress); } -TEST(LookupServiceTest, basicGetNamespaceTopics) { - std::string url = "pulsar://localhost:6650"; - std::string adminUrl = "http://localhost:8080/"; - Result result; - // 1. create some topics under same namespace - Client client(url); - - std::string topicName1 = "persistent://public/default/basicGetNamespaceTopics1"; - std::string topicName2 = "persistent://public/default/basicGetNamespaceTopics2"; - std::string topicName3 = "persistent://public/default/basicGetNamespaceTopics3"; - // This is not in same namespace. - std::string topicName4 = "persistent://public/default-2/basicGetNamespaceTopics4"; - - // call admin api to make topics partitioned - std::string url1 = adminUrl + "admin/v2/persistent/public/default/basicGetNamespaceTopics1/partitions"; - std::string url2 = adminUrl + "admin/v2/persistent/public/default/basicGetNamespaceTopics2/partitions"; - std::string url3 = adminUrl + "admin/v2/persistent/public/default/basicGetNamespaceTopics3/partitions"; - - int res = makePutRequest(url1, "2"); - ASSERT_FALSE(res != 204 && res != 409); - res = makePutRequest(url2, "3"); - ASSERT_FALSE(res != 204 && res != 409); - res = makePutRequest(url3, "4"); - ASSERT_FALSE(res != 204 && res != 409); - - Producer producer1; - result = client.createProducer(topicName1, producer1); - ASSERT_EQ(ResultOk, result); - Producer producer2; - result = client.createProducer(topicName2, producer2); - ASSERT_EQ(ResultOk, result); - Producer producer3; - result = client.createProducer(topicName3, producer3); - ASSERT_EQ(ResultOk, result); - Producer producer4; - result = client.createProducer(topicName4, producer4); - ASSERT_EQ(ResultOk, result); - - // 2. call getTopicsOfNamespaceAsync - ExecutorServiceProviderPtr service = std::make_shared(1); - AuthenticationPtr authData = AuthFactory::Disabled(); - ClientConfiguration conf; - ExecutorServiceProviderPtr ioExecutorProvider_(std::make_shared(1)); - ConnectionPool pool_(conf, ioExecutorProvider_, authData, true); - ServiceNameResolver serviceNameResolver(url); - BinaryProtoLookupService lookupService(serviceNameResolver, pool_, conf); - - TopicNamePtr topicName = TopicName::get(topicName1); - NamespaceNamePtr nsName = topicName->getNamespaceName(); - - Future getTopicsFuture = lookupService.getTopicsOfNamespaceAsync(nsName); - NamespaceTopicsPtr topicsData; - result = getTopicsFuture.get(topicsData); - ASSERT_EQ(ResultOk, result); - ASSERT_TRUE(topicsData != NULL); - - // 3. verify result contains first 3 topic - ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName1) != topicsData->end()); - ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName2) != topicsData->end()); - ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName3) != topicsData->end()); - ASSERT_FALSE(std::find(topicsData->begin(), topicsData->end(), topicName4) != topicsData->end()); - - client.shutdown(); -} - static void testMultiAddresses(LookupService& lookupService) { std::vector results; constexpr int numRequests = 6; @@ -289,6 +224,50 @@ class LookupServiceTest : public ::testing::TestWithParam { Client client_{GetParam()}; }; +TEST_P(LookupServiceTest, basicGetNamespaceTopics) { + Result result; + + auto nsName = NamespaceName::get("public", GetParam().substr(0, 4) + std::to_string(time(nullptr))); + std::string topicName1 = "persistent://" + nsName->toString() + "/basicGetNamespaceTopics1"; + std::string topicName2 = "persistent://" + nsName->toString() + "/basicGetNamespaceTopics2"; + std::string topicName3 = "non-persistent://" + nsName->toString() + "/basicGetNamespaceTopics3"; + + // 0. create a namespace + auto createNsUrl = httpLookupUrl + "/admin/v2/namespaces/" + nsName->toString(); + auto res = makePutRequest(createNsUrl, ""); + ASSERT_FALSE(res != 204 && res != 409); + + // 1. trigger auto create topic + Producer producer1; + result = client_.createProducer(topicName1, producer1); + ASSERT_EQ(ResultOk, result); + Producer producer2; + result = client_.createProducer(topicName2, producer2); + ASSERT_EQ(ResultOk, result); + Producer producer3; + result = client_.createProducer(topicName3, producer3); + ASSERT_EQ(ResultOk, result); + + // 2. verify getTopicsOfNamespace by regex mode. + auto lookupServicePtr = PulsarFriend::getClientImplPtr(client_)->getLookup(); + auto verifyGetTopics = [&](CommandGetTopicsOfNamespace_Mode mode, + const std::set& expectedTopics) { + Future getTopicsFuture = + lookupServicePtr->getTopicsOfNamespaceAsync(nsName, mode); + NamespaceTopicsPtr topicsData; + result = getTopicsFuture.get(topicsData); + ASSERT_EQ(ResultOk, result); + ASSERT_TRUE(topicsData != NULL); + std::set actualTopics(topicsData->begin(), topicsData->end()); + ASSERT_EQ(expectedTopics, actualTopics); + }; + verifyGetTopics(CommandGetTopicsOfNamespace_Mode_PERSISTENT, {topicName1, topicName2}); + verifyGetTopics(CommandGetTopicsOfNamespace_Mode_NON_PERSISTENT, {topicName3}); + verifyGetTopics(CommandGetTopicsOfNamespace_Mode_ALL, {topicName1, topicName2, topicName3}); + + client_.close(); +} + TEST_P(LookupServiceTest, testGetSchema) { const std::string topic = "testGetSchema" + std::to_string(time(nullptr)) + GetParam().substr(0, 4); std::string jsonSchema = diff --git a/tests/TopicNameTest.cc b/tests/TopicNameTest.cc index 41838f5d..74dc605e 100644 --- a/tests/TopicNameTest.cc +++ b/tests/TopicNameTest.cc @@ -191,3 +191,20 @@ TEST(TopicNameTest, testPartitionIndex) { ASSERT_EQ(topicName->getPartitionIndex(), partition); } } + +TEST(TopicNameTest, testRemoveDomain) { + auto topicName1 = "persistent://public/default/test-topic"; + ASSERT_EQ("public/default/test-topic", TopicName::removeDomain(topicName1)); + + auto topicName2 = "non-persistent://public/default/test-topic"; + ASSERT_EQ("public/default/test-topic", TopicName::removeDomain(topicName2)); + + auto topicName3 = "public/default/test-topic"; + ASSERT_EQ(topicName3, TopicName::removeDomain(topicName2)); +} + +TEST(TopicNameTest, testContainsDomain) { + ASSERT_TRUE(TopicName::containsDomain("persistent://public/default/test-topic")); + ASSERT_TRUE(TopicName::containsDomain("non-persistent://public/default/test-topic")); + ASSERT_FALSE(TopicName::containsDomain("public/default/test-topic")); +} From 86076530c78818170936ef8dd056df45925cf2fb Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Wed, 29 Mar 2023 17:39:18 +0800 Subject: [PATCH 2/2] Fix code reviews. --- lib/BinaryProtoLookupService.h | 3 +-- lib/HTTPLookupService.h | 3 +-- lib/LookupService.h | 3 +-- lib/PatternMultiTopicsConsumerImpl.cc | 6 +++--- lib/RetryableLookupService.h | 3 +-- tests/LookupServiceTest.cc | 12 ++++++++---- 6 files changed, 15 insertions(+), 15 deletions(-) diff --git a/lib/BinaryProtoLookupService.h b/lib/BinaryProtoLookupService.h index 5d795cb1..f8c91e6f 100644 --- a/lib/BinaryProtoLookupService.h +++ b/lib/BinaryProtoLookupService.h @@ -50,8 +50,7 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { Future getPartitionMetadataAsync(const TopicNamePtr& topicName) override; Future getTopicsOfNamespaceAsync( - const NamespaceNamePtr& nsName, - CommandGetTopicsOfNamespace_Mode mode = CommandGetTopicsOfNamespace_Mode_PERSISTENT) override; + const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override; Future> getSchema(const TopicNamePtr& topicName) override; diff --git a/lib/HTTPLookupService.h b/lib/HTTPLookupService.h index b1a209eb..8dc195c1 100644 --- a/lib/HTTPLookupService.h +++ b/lib/HTTPLookupService.h @@ -80,8 +80,7 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t Future> getSchema(const TopicNamePtr& topicName) override; Future getTopicsOfNamespaceAsync( - const NamespaceNamePtr& nsName, - CommandGetTopicsOfNamespace_Mode mode = CommandGetTopicsOfNamespace_Mode_PERSISTENT) override; + const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override; }; } // namespace pulsar diff --git a/lib/LookupService.h b/lib/LookupService.h index 6e460f6c..84dc37ca 100644 --- a/lib/LookupService.h +++ b/lib/LookupService.h @@ -74,8 +74,7 @@ class LookupService { * Returns all the topics name for a given namespace. */ virtual Future getTopicsOfNamespaceAsync( - const NamespaceNamePtr& nsName, - CommandGetTopicsOfNamespace_Mode mode = CommandGetTopicsOfNamespace_Mode_PERSISTENT) = 0; + const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) = 0; /** * returns current SchemaInfo {@link SchemaInfo} for a given topic. diff --git a/lib/PatternMultiTopicsConsumerImpl.cc b/lib/PatternMultiTopicsConsumerImpl.cc index 4c5cc028..e100a1c3 100644 --- a/lib/PatternMultiTopicsConsumerImpl.cc +++ b/lib/PatternMultiTopicsConsumerImpl.cc @@ -195,10 +195,10 @@ void PatternMultiTopicsConsumerImpl::onTopicsRemoved(NamespaceTopicsPtr removedT NamespaceTopicsPtr PatternMultiTopicsConsumerImpl::topicsPatternFilter( const std::vector& topics, const PULSAR_REGEX_NAMESPACE::regex& pattern) { NamespaceTopicsPtr topicsResultPtr = std::make_shared>(); - for (const auto& it : topics) { - auto topic = TopicName::removeDomain(it); + for (const auto& topicStr : topics) { + auto topic = TopicName::removeDomain(topicStr); if (PULSAR_REGEX_NAMESPACE::regex_match(topic, pattern)) { - topicsResultPtr->push_back(std::move(it)); + topicsResultPtr->push_back(std::move(topicStr)); } } return topicsResultPtr; diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h index b417247f..01dd82bb 100644 --- a/lib/RetryableLookupService.h +++ b/lib/RetryableLookupService.h @@ -61,8 +61,7 @@ class RetryableLookupService : public LookupService, } Future getTopicsOfNamespaceAsync( - const NamespaceNamePtr& nsName, - CommandGetTopicsOfNamespace_Mode mode = CommandGetTopicsOfNamespace_Mode_PERSISTENT) override { + const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override { return executeAsync( "get-topics-of-namespace-" + nsName->toString(), [this, nsName, mode] { return lookupService_->getTopicsOfNamespaceAsync(nsName, mode); }); diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc index 4a38054b..61311f46 100644 --- a/tests/LookupServiceTest.cc +++ b/tests/LookupServiceTest.cc @@ -101,8 +101,10 @@ static void testMultiAddresses(LookupService& lookupService) { results.clear(); for (int i = 0; i < numRequests; i++) { NamespaceTopicsPtr data; - const auto result = - lookupService.getTopicsOfNamespaceAsync(TopicName::get("topic")->getNamespaceName()).get(data); + const auto result = lookupService + .getTopicsOfNamespaceAsync(TopicName::get("topic")->getNamespaceName(), + CommandGetTopicsOfNamespace_Mode_PERSISTENT) + .get(data); LOG_INFO("getTopicsOfNamespaceAsync [" << i << "] " << result); results.emplace_back(result); } @@ -147,7 +149,8 @@ TEST(LookupServiceTest, testRetry) { LOG_INFO("getPartitionMetadataAsync returns " << lookupDataResultPtr->getPartitions() << " partitions"); PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0); - auto future3 = lookupService->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName()); + auto future3 = lookupService->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), + CommandGetTopicsOfNamespace_Mode_PERSISTENT); NamespaceTopicsPtr namespaceTopicsPtr; ASSERT_EQ(ResultOk, future3.get(namespaceTopicsPtr)); LOG_INFO("getTopicPartitionName Async returns " << namespaceTopicsPtr->size() << " topics"); @@ -208,7 +211,8 @@ TEST(LookupServiceTest, testTimeout) { afterMethod("getPartitionMetadataAsync"); beforeMethod(); - auto future3 = lookupService->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName()); + auto future3 = lookupService->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), + CommandGetTopicsOfNamespace_Mode_PERSISTENT); NamespaceTopicsPtr namespaceTopicsPtr; ASSERT_EQ(ResultTimeout, future3.get(namespaceTopicsPtr)); afterMethod("getTopicsOfNamespaceAsync");