Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions include/pulsar/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ typedef std::function<void(Result, Consumer)> SubscribeCallback;
typedef std::function<void(Result, Reader)> ReaderCallback;
typedef std::function<void(Result, const std::vector<std::string>&)> GetPartitionsCallback;
typedef std::function<void(Result)> CloseCallback;
typedef std::function<const std::string&()> ServiceUrlProvider;

class ClientImpl;
class PulsarFriend;
Expand Down Expand Up @@ -66,6 +67,31 @@ class PULSAR_PUBLIC Client {
*/
Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration);

/**
* Create a Pulsar client object connecting to the specified cluster address and using the default
* configuration.
*
* <p>Instead of specifying a static service URL string (with {@link #serviceUrl(String)}), an application
* can pass a {@link ServiceUrlProvider} function that dynamically provide a service URL.
*
* @param serviceUrlProvider The serviceUrlProvider used to generate ServiceUrl.
* @throw std::invalid_argument if `serviceUrlProvider()` return a invalid url.
*/
Client(ServiceUrlProvider serviceUrlProvider);
Comment on lines +70 to +80
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/**
* Create a Pulsar client object connecting to the specified cluster address and using the default
* configuration.
*
* <p>Instead of specifying a static service URL string (with {@link #serviceUrl(String)}), an application
* can pass a {@link ServiceUrlProvider} function that dynamically provide a service URL.
*
* @param serviceUrlProvider The serviceUrlProvider used to generate ServiceUrl.
* @throw std::invalid_argument if `serviceUrlProvider()` return a invalid url.
*/
Client(ServiceUrlProvider serviceUrlProvider);
/**
* @see Client(ServiceUrlProvider, const ClientConfiguration&)
*/
Client(ServiceUrlProvider serviceUrlProvider) : Client(serviceUrlProvider, ClientConfiguration{}) {}

Avoid unnecessary duplicated code and API docs.


/**
* Create a Pulsar client object connecting to the specified cluster address and using the specified
* configuration.
*
* <p>Instead of specifying a static service URL string (with {@link #serviceUrl(String)}), an application
* can pass a {@link ServiceUrlProvider} instance that dynamically provide a service URL.
*
* @param serviceUrlProvider The serviceUrlProvider used to generate ServiceUrl.
* @param clientConfiguration the client configuration to use
* @throw std::invalid_argument if `serviceUrlProvider()` return a invalid url.
*/
Client(ServiceUrlProvider serviceUrlProvider, const ClientConfiguration& clientConfiguration);

/**
* Create a producer with default configuration
*
Expand Down Expand Up @@ -333,6 +359,17 @@ class PULSAR_PUBLIC Client {
*/
void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);

/**
* Update the service URL this client is using.
*
* <p>This will force the client close all existing connections and to restart service discovery to the
* new service endpoint.
*
* @param serviceUrl
* the new service URL this client should connect to
*/
Result updateServiceUrl(const std::string& serviceUrl);

/**
*
* @return
Expand Down
7 changes: 7 additions & 0 deletions lib/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ Client::Client(const std::string& serviceUrl)
Client::Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration)
: impl_(std::make_shared<ClientImpl>(serviceUrl, clientConfiguration, true)) {}

Client::Client(ServiceUrlProvider serviceUrlProvider) : Client(serviceUrlProvider()) {}

Client::Client(ServiceUrlProvider serviceUrlProvider, const ClientConfiguration& clientConfiguration)
: Client(serviceUrlProvider(), clientConfiguration) {}

Client::Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration,
bool poolConnections)
: impl_(std::make_shared<ClientImpl>(serviceUrl, clientConfiguration, poolConnections)) {}
Expand Down Expand Up @@ -171,6 +176,8 @@ Result Client::close() {
return result;
}

Result Client::updateServiceUrl(const std::string& serviceUrl) { return impl_->updateServiceUrl(serviceUrl); }

void Client::closeAsync(CloseCallback callback) { impl_->closeAsync(callback); }

void Client::shutdown() { impl_->shutdown(); }
Expand Down
18 changes: 18 additions & 0 deletions lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,24 @@ void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, GetPartiti
std::placeholders::_2, topicName, callback));
}

Result ClientImpl::updateServiceUrl(const std::string& serviceUrl) {
Lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
return ResultAlreadyClosed;
}

LOG_INFO("Updating service URL to " << serviceUrl);
try {
serviceNameResolver_.updateServiceUrl(serviceUrl);
} catch (const std::invalid_argument& e) {
LOG_ERROR("Invalid service-url " << serviceUrl << "provided " << e.what());
return ResultInvalidUrl;
}
pool_.disconnect();
return ResultOk;
}

void ClientImpl::closeAsync(CloseCallback callback) {
if (state_ != Open) {
if (callback) {
Expand Down
2 changes: 2 additions & 0 deletions lib/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {

Future<Result, ClientConnectionWeakPtr> getConnection(const std::string& topic);

Result updateServiceUrl(const std::string& serviceUrl);

void closeAsync(CloseCallback callback);
void shutdown();

Expand Down
5 changes: 4 additions & 1 deletion lib/ConnectionPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ bool ConnectionPool::close() {
if (!closed_.compare_exchange_strong(expectedState, true)) {
return false;
}
disconnect();
return true;
}

void ConnectionPool::disconnect() {
std::unique_lock<std::mutex> lock(mutex_);
if (poolConnections_) {
for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
Expand All @@ -58,7 +62,6 @@ bool ConnectionPool::close() {
}
pool_.clear();
}
return true;
}

Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
Expand Down
5 changes: 5 additions & 0 deletions lib/ConnectionPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ class PULSAR_PUBLIC ConnectionPool {
*/
bool close();

/**
* Disconnect and clear all connect.
*/
void disconnect();

/**
* Get a connection from the pool.
* <p>
Expand Down
23 changes: 13 additions & 10 deletions lib/ServiceNameResolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <assert.h>

#include <atomic>
#include <memory>

#include "ServiceURI.h"

Expand All @@ -29,30 +30,32 @@ namespace pulsar {
class ServiceNameResolver {
public:
ServiceNameResolver(const std::string& uriString)
: serviceUri_(uriString), numAddresses_(serviceUri_.getServiceHosts().size()) {
assert(numAddresses_ > 0); // the validation has been done in ServiceURI
}
: serviceUri_(std::make_shared<ServiceURI>(uriString)) {}

ServiceNameResolver(const ServiceNameResolver&) = delete;
ServiceNameResolver& operator=(const ServiceNameResolver&) = delete;

bool useTls() const noexcept {
return serviceUri_.getScheme() == PulsarScheme::PULSAR_SSL ||
serviceUri_.getScheme() == PulsarScheme::HTTPS;
return serviceUri_->getScheme() == PulsarScheme::PULSAR_SSL ||
serviceUri_->getScheme() == PulsarScheme::HTTPS;
}

bool useHttp() const noexcept {
return serviceUri_.getScheme() == PulsarScheme::HTTP ||
serviceUri_.getScheme() == PulsarScheme::HTTPS;
return serviceUri_->getScheme() == PulsarScheme::HTTP ||
serviceUri_->getScheme() == PulsarScheme::HTTPS;
}

const std::string& resolveHost() {
return serviceUri_.getServiceHosts()[(numAddresses_ == 1) ? 0 : (index_++ % numAddresses_)];
return serviceUri_->getServiceHosts()[(serviceUri_->getNumAddresses() == 1)
? 0
: (index_++ % serviceUri_->getNumAddresses())];
}

void updateServiceUrl(const std::string& urlString) { serviceUri_.reset(new ServiceURI(urlString)); }

private:
const ServiceURI serviceUri_;
const size_t numAddresses_;
typedef std::shared_ptr<ServiceURI> ServiceURIPtr;
ServiceURIPtr serviceUri_;
std::atomic_size_t index_{0};

friend class PulsarFriend;
Expand Down
2 changes: 2 additions & 0 deletions lib/ServiceURI.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class ServiceURI {

const std::vector<std::string>& getServiceHosts() const noexcept { return data_.second; }

size_t getNumAddresses() const noexcept { return data_.second.size(); };

private:
// The 2 elements of the pair are:
// 1. The Scheme of the lookup protocol
Expand Down
89 changes: 89 additions & 0 deletions tests/ServiceUrlProviderTest.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>

using namespace pulsar;

static const std::string lookupUrl = "pulsar://localhost:6650";
static const std::string adminUrl = "http://localhost:8080/";

class ServiceUrlProviderTest : public ::testing::TestWithParam<std::string> {
public:
void SetUp() override { serviceUrl = GetParam(); }

std::string serviceUrl;
};

TEST(ServiceUrlProviderTest, testClientClose) {
const std::string topicName = "testClientClose-" + std::to_string(time(nullptr));
Client client([]() -> const std::string& { return lookupUrl; });
client.close();
ASSERT_EQ(ResultAlreadyClosed, client.updateServiceUrl(lookupUrl));
std::map<std::string, std::string> testMap;
}

TEST_P(ServiceUrlProviderTest, testBasicUpdateUrl) {
const std::string topicName = "basicUpdateUrl-" + std::to_string(time(nullptr));
Client client([this]() -> const std::string& { return serviceUrl; });

Producer producer1;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer1));

Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, "test-sub", consumer));

// Update service url.
ASSERT_EQ(ResultOk, client.updateServiceUrl(serviceUrl));
Producer producer2;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer2));

// Assert that both producer1 and producer2 are available
int sendNum = 10;
for (int i = 0; i < sendNum; ++i) {
ASSERT_EQ(ResultOk, producer1.send(MessageBuilder().setContent("test").build()));
ASSERT_EQ(ResultOk, producer2.send(MessageBuilder().setContent("test").build()));
}

Message msg;
for (int i = 0; i < 2 * sendNum; ++i) {
ASSERT_EQ(ResultOk, consumer.receive(msg));
}

client.close();
}

TEST(ServiceUrlProviderTest, testInvalidServiceUrl) {
const std::string invalidServiceUrl = "invalid://localhost:6650";

// Assert invalid url throw exception when create client.
{
ASSERT_THROW(
Client client([&invalidServiceUrl]() -> const std::string& { return invalidServiceUrl; }),
std::invalid_argument);
}

// Assert return ResultInvalidUrl when client.updateServiceUrl(invalidServiceUrl);
{
Client client([]() -> const std::string& { return lookupUrl; });
ASSERT_EQ(ResultInvalidUrl, client.updateServiceUrl(invalidServiceUrl));
}
}

INSTANTIATE_TEST_SUITE_P(BasicEndToEndTest, ServiceUrlProviderTest, testing::Values(lookupUrl, adminUrl));