Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions include/pulsar/ConsumerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <pulsar/InitialPosition.h>
#include <pulsar/KeySharedPolicy.h>
#include <pulsar/Message.h>
#include <pulsar/RegexSubscriptionMode.h>
#include <pulsar/Result.h>
#include <pulsar/Schema.h>
#include <pulsar/TypedMessage.h>
Expand Down Expand Up @@ -383,6 +384,19 @@ class PULSAR_PUBLIC ConsumerConfiguration {
*/
int getPatternAutoDiscoveryPeriod() const;

/**
* Determines which topics this consumer should be subscribed to - Persistent, Non-Persistent, or
* AllTopics. Only used with pattern subscriptions.
*
* @param regexSubscriptionMode The default value is `PersistentOnly`.
*/
ConsumerConfiguration& setRegexSubscriptionMode(RegexSubscriptionMode regexSubscriptionMode);

/**
* @return the regex subscription mode for the pattern consumer.
*/
RegexSubscriptionMode getRegexSubscriptionMode() const;

/**
* The default value is `InitialPositionLatest`.
*
Expand Down
42 changes: 42 additions & 0 deletions include/pulsar/RegexSubscriptionMode.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef PULSAR_CPP_REGEX_SUB_MODE_H
#define PULSAR_CPP_REGEX_SUB_MODE_H

namespace pulsar {
enum RegexSubscriptionMode
{
/**
* Only subscribe to persistent topics.
*/
PersistentOnly = 0,

/**
* Only subscribe to non-persistent topics.
*/
NonPersistentOnly = 1,

/**
* Subscribe to both persistent and non-persistent topics.
*/
AllTopics = 2
};
}

#endif // PULSAR_CPP_REGEX_SUB_MODE_H
11 changes: 6 additions & 5 deletions lib/BinaryProtoLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ uint64_t BinaryProtoLookupService::newRequestId() {
}

Future<Result, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespaceAsync(
const NamespaceNamePtr& nsName) {
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) {
NamespaceTopicsPromisePtr promise = std::make_shared<Promise<Result, NamespaceTopicsPtr>>();
if (!nsName) {
promise->setFailed(ResultInvalidTopicName);
Expand All @@ -160,7 +160,7 @@ Future<Result, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespac
std::string namespaceName = nsName->toString();
cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost())
.addListener(std::bind(&BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest, this,
namespaceName, std::placeholders::_1, std::placeholders::_2, promise));
namespaceName, mode, std::placeholders::_1, std::placeholders::_2, promise));
return promise->getFuture();
}

Expand Down Expand Up @@ -201,7 +201,9 @@ void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName
});
}

void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string& nsName, Result result,
void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string& nsName,
CommandGetTopicsOfNamespace_Mode mode,
Result result,
const ClientConnectionWeakPtr& clientCnx,
NamespaceTopicsPromisePtr promise) {
if (result != ResultOk) {
Expand All @@ -212,8 +214,7 @@ void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string
ClientConnectionPtr conn = clientCnx.lock();
uint64_t requestId = newRequestId();
LOG_DEBUG("sendGetTopicsOfNamespaceRequest. requestId: " << requestId << " nsName: " << nsName);

conn->newGetTopicsOfNamespace(nsName, requestId)
conn->newGetTopicsOfNamespace(nsName, mode, requestId)
.addListener(std::bind(&BinaryProtoLookupService::getTopicsOfNamespaceListener, this,
std::placeholders::_1, std::placeholders::_2, promise));
}
Expand Down
7 changes: 4 additions & 3 deletions lib/BinaryProtoLookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {

Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) override;

Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) override;
Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override;

Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& topicName) override;

Expand All @@ -75,8 +76,8 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
const ClientConnectionWeakPtr& clientCnx,
LookupDataResultPromisePtr promise);

void sendGetTopicsOfNamespaceRequest(const std::string& nsName, Result result,
const ClientConnectionWeakPtr& clientCnx,
void sendGetTopicsOfNamespaceRequest(const std::string& nsName, CommandGetTopicsOfNamespace_Mode mode,
Result result, const ClientConnectionWeakPtr& clientCnx,
NamespaceTopicsPromisePtr promise);

void sendGetSchemaRequest(const std::string& topiName, Result result,
Expand Down
6 changes: 3 additions & 3 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1293,8 +1293,8 @@ Future<Result, GetLastMessageIdResponse> ClientConnection::newGetLastMessageId(u
return promise.getFuture();
}

Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(const std::string& nsName,
uint64_t requestId) {
Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(
const std::string& nsName, CommandGetTopicsOfNamespace_Mode mode, uint64_t requestId) {
Lock lock(mutex_);
Promise<Result, NamespaceTopicsPtr> promise;
if (isClosed()) {
Expand All @@ -1306,7 +1306,7 @@ Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(con

pendingGetNamespaceTopicsRequests_.insert(std::make_pair(requestId, promise));
lock.unlock();
sendCommand(Commands::newGetTopicsOfNamespace(nsName, requestId));
sendCommand(Commands::newGetTopicsOfNamespace(nsName, mode, requestId));
return promise.getFuture();
}

Expand Down
4 changes: 3 additions & 1 deletion lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien

Future<Result, GetLastMessageIdResponse> newGetLastMessageId(uint64_t consumerId, uint64_t requestId);

Future<Result, NamespaceTopicsPtr> newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId);
Future<Result, NamespaceTopicsPtr> newGetTopicsOfNamespace(const std::string& nsName,
CommandGetTopicsOfNamespace_Mode mode,
uint64_t requestId);

Future<Result, boost::optional<SchemaInfo>> newGetSchema(const std::string& topicName,
uint64_t requestId);
Expand Down
36 changes: 30 additions & 6 deletions lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,29 +338,53 @@ void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const
}
}

NamespaceNamePtr nsName = topicNamePtr->getNamespaceName();
if (TopicName::containsDomain(regexPattern)) {
LOG_WARN("Ignore invalid domain: "
<< topicNamePtr->getDomain()
<< ", use the RegexSubscriptionMode parameter to set the topic type");
}

CommandGetTopicsOfNamespace_Mode mode;
auto regexSubscriptionMode = conf.getRegexSubscriptionMode();
switch (regexSubscriptionMode) {
case PersistentOnly:
mode = CommandGetTopicsOfNamespace_Mode_PERSISTENT;
break;
case NonPersistentOnly:
mode = CommandGetTopicsOfNamespace_Mode_NON_PERSISTENT;
break;
case AllTopics:
mode = CommandGetTopicsOfNamespace_Mode_ALL;
break;
default:
LOG_ERROR("RegexSubscriptionMode not valid: " << regexSubscriptionMode);
callback(ResultInvalidConfiguration, Consumer());
return;
}

lookupServicePtr_->getTopicsOfNamespaceAsync(nsName).addListener(
std::bind(&ClientImpl::createPatternMultiTopicsConsumer, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, regexPattern, subscriptionName, conf, callback));
lookupServicePtr_->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode)
.addListener(std::bind(&ClientImpl::createPatternMultiTopicsConsumer, shared_from_this(),
std::placeholders::_1, std::placeholders::_2, regexPattern, mode,
subscriptionName, conf, callback));
}

void ClientImpl::createPatternMultiTopicsConsumer(const Result result, const NamespaceTopicsPtr topics,
const std::string& regexPattern,
CommandGetTopicsOfNamespace_Mode mode,
const std::string& subscriptionName,
const ConsumerConfiguration& conf,
SubscribeCallback callback) {
if (result == ResultOk) {
ConsumerImplBasePtr consumer;

PULSAR_REGEX_NAMESPACE::regex pattern(regexPattern);
PULSAR_REGEX_NAMESPACE::regex pattern(TopicName::removeDomain(regexPattern));

NamespaceTopicsPtr matchTopics =
PatternMultiTopicsConsumerImpl::topicsPatternFilter(*topics, pattern);

auto interceptors = std::make_shared<ConsumerInterceptors>(conf.getInterceptors());

consumer = std::make_shared<PatternMultiTopicsConsumerImpl>(shared_from_this(), regexPattern,
consumer = std::make_shared<PatternMultiTopicsConsumerImpl>(shared_from_this(), regexPattern, mode,
*matchTopics, subscriptionName, conf,
lookupServicePtr_, interceptors);

Expand Down
7 changes: 5 additions & 2 deletions lib/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "Future.h"
#include "LookupDataResult.h"
#include "MemoryLimitController.h"
#include "ProtoApiEnums.h"
#include "ServiceNameResolver.h"
#include "SynchronizedHashMap.h"

Expand Down Expand Up @@ -151,8 +152,10 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
void handleClose(Result result, SharedInt remaining, ResultCallback callback);

void createPatternMultiTopicsConsumer(const Result result, const NamespaceTopicsPtr topics,
const std::string& regexPattern, const std::string& consumerName,
const ConsumerConfiguration& conf, SubscribeCallback callback);
const std::string& regexPattern,
CommandGetTopicsOfNamespace_Mode mode,
const std::string& consumerName, const ConsumerConfiguration& conf,
SubscribeCallback callback);

enum State
{
Expand Down
4 changes: 3 additions & 1 deletion lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -609,12 +609,14 @@ SharedBuffer Commands::newGetLastMessageId(uint64_t consumerId, uint64_t request
return buffer;
}

SharedBuffer Commands::newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId) {
SharedBuffer Commands::newGetTopicsOfNamespace(const std::string& nsName,
CommandGetTopicsOfNamespace_Mode mode, uint64_t requestId) {
BaseCommand cmd;
cmd.set_type(BaseCommand::GET_TOPICS_OF_NAMESPACE);
CommandGetTopicsOfNamespace* getTopics = cmd.mutable_gettopicsofnamespace();
getTopics->set_request_id(requestId);
getTopics->set_namespace_(nsName);
getTopics->set_mode(static_cast<proto::CommandGetTopicsOfNamespace_Mode>(mode));

const SharedBuffer buffer = writeMessageWithSize(cmd);
cmd.clear_gettopicsofnamespace();
Expand Down
3 changes: 2 additions & 1 deletion lib/Commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ class Commands {
static SharedBuffer newSeek(uint64_t consumerId, uint64_t requestId, const MessageId& messageId);
static SharedBuffer newSeek(uint64_t consumerId, uint64_t requestId, uint64_t timestamp);
static SharedBuffer newGetLastMessageId(uint64_t consumerId, uint64_t requestId);
static SharedBuffer newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId);
static SharedBuffer newGetTopicsOfNamespace(const std::string& nsName,
CommandGetTopicsOfNamespace_Mode mode, uint64_t requestId);

static bool peerSupportsGetLastMessageId(int32_t peerVersion);
static bool peerSupportsActiveConsumerListener(int32_t peerVersion);
Expand Down
10 changes: 10 additions & 0 deletions lib/ConsumerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -317,4 +317,14 @@ ConsumerConfiguration& ConsumerConfiguration::setAckReceiptEnabled(bool ackRecei

bool ConsumerConfiguration::isAckReceiptEnabled() const { return impl_->ackReceiptEnabled; }

ConsumerConfiguration& ConsumerConfiguration::setRegexSubscriptionMode(
RegexSubscriptionMode regexSubscriptionMode) {
impl_->regexSubscriptionMode = regexSubscriptionMode;
return *this;
}

RegexSubscriptionMode ConsumerConfiguration::getRegexSubscriptionMode() const {
return impl_->regexSubscriptionMode;
}

} // namespace pulsar
2 changes: 2 additions & 0 deletions lib/ConsumerConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ struct ConsumerConfigurationImpl {
BatchReceivePolicy batchReceivePolicy{};
DeadLetterPolicy deadLetterPolicy;
int patternAutoDiscoveryPeriod{60};
RegexSubscriptionMode regexSubscriptionMode{RegexSubscriptionMode::PersistentOnly};

bool replicateSubscriptionStateEnabled{false};
std::map<std::string, std::string> properties;
std::map<std::string, std::string> subscriptionProperties;
Expand Down
19 changes: 16 additions & 3 deletions lib/HTTPLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,30 @@ Future<Result, LookupDataResultPtr> HTTPLookupService::getPartitionMetadataAsync
}

Future<Result, NamespaceTopicsPtr> HTTPLookupService::getTopicsOfNamespaceAsync(
const NamespaceNamePtr &nsName) {
const NamespaceNamePtr &nsName, CommandGetTopicsOfNamespace_Mode mode) {
NamespaceTopicsPromise promise;
std::stringstream completeUrlStream;

auto convertRegexSubMode = [](CommandGetTopicsOfNamespace_Mode mode) {
switch (mode) {
case CommandGetTopicsOfNamespace_Mode_PERSISTENT:
return "PERSISTENT";
case CommandGetTopicsOfNamespace_Mode_NON_PERSISTENT:
return "NON_PERSISTENT";
case CommandGetTopicsOfNamespace_Mode_ALL:
return "ALL";
default:
return "PERSISTENT";
}
};

const auto &url = serviceNameResolver_.resolveHost();
if (nsName->isV2()) {
completeUrlStream << url << ADMIN_PATH_V2 << "namespaces" << '/' << nsName->toString() << '/'
<< "topics";
<< "topics?mode=" << convertRegexSubMode(mode);
} else {
completeUrlStream << url << ADMIN_PATH_V1 << "namespaces" << '/' << nsName->toString() << '/'
<< "destinations";
<< "destinations?mode=" << convertRegexSubMode(mode);
}

executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleNamespaceTopicsHTTPRequest,
Expand Down
3 changes: 2 additions & 1 deletion lib/HTTPLookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t

Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& topicName) override;

Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) override;
Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override;
};
} // namespace pulsar

Expand Down
4 changes: 3 additions & 1 deletion lib/LookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include "Future.h"
#include "LookupDataResult.h"
#include "ProtoApiEnums.h"

namespace pulsar {
using NamespaceTopicsPtr = std::shared_ptr<std::vector<std::string>>;
Expand Down Expand Up @@ -72,7 +73,8 @@ class LookupService {
*
* Returns all the topics name for a given namespace.
*/
virtual Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) = 0;
virtual Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) = 0;

/**
* returns current SchemaInfo {@link SchemaInfo} for a given topic.
Expand Down
1 change: 1 addition & 0 deletions lib/MultiTopicsConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
FRIEND_TEST(ConsumerTest, testPatternSubscribeTopic);
};

typedef std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImplPtr;
Expand Down
Loading