diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h index c189a200..7909c4fa 100644 --- a/include/pulsar/Client.h +++ b/include/pulsar/Client.h @@ -39,6 +39,7 @@ typedef std::function SubscribeCallback; typedef std::function ReaderCallback; typedef std::function&)> GetPartitionsCallback; typedef std::function CloseCallback; +typedef std::function ServiceUrlProvider; class ClientImpl; class PulsarFriend; @@ -66,6 +67,31 @@ class PULSAR_PUBLIC Client { */ Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration); + /** + * Create a Pulsar client object connecting to the specified cluster address and using the default + * configuration. + * + *

Instead of specifying a static service URL string (with {@link #serviceUrl(String)}), an application + * can pass a {@link ServiceUrlProvider} function that dynamically provide a service URL. + * + * @param serviceUrlProvider The serviceUrlProvider used to generate ServiceUrl. + * @throw std::invalid_argument if `serviceUrlProvider()` return a invalid url. + */ + Client(ServiceUrlProvider serviceUrlProvider); + + /** + * Create a Pulsar client object connecting to the specified cluster address and using the specified + * configuration. + * + *

Instead of specifying a static service URL string (with {@link #serviceUrl(String)}), an application + * can pass a {@link ServiceUrlProvider} instance that dynamically provide a service URL. + * + * @param serviceUrlProvider The serviceUrlProvider used to generate ServiceUrl. + * @param clientConfiguration the client configuration to use + * @throw std::invalid_argument if `serviceUrlProvider()` return a invalid url. + */ + Client(ServiceUrlProvider serviceUrlProvider, const ClientConfiguration& clientConfiguration); + /** * Create a producer with default configuration * @@ -333,6 +359,17 @@ class PULSAR_PUBLIC Client { */ void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback); + /** + * Update the service URL this client is using. + * + *

This will force the client close all existing connections and to restart service discovery to the + * new service endpoint. + * + * @param serviceUrl + * the new service URL this client should connect to + */ + Result updateServiceUrl(const std::string& serviceUrl); + /** * * @return diff --git a/lib/Client.cc b/lib/Client.cc index 03823fb6..c4417f26 100644 --- a/lib/Client.cc +++ b/lib/Client.cc @@ -38,6 +38,11 @@ Client::Client(const std::string& serviceUrl) Client::Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration) : impl_(std::make_shared(serviceUrl, clientConfiguration, true)) {} +Client::Client(ServiceUrlProvider serviceUrlProvider) : Client(serviceUrlProvider()) {} + +Client::Client(ServiceUrlProvider serviceUrlProvider, const ClientConfiguration& clientConfiguration) + : Client(serviceUrlProvider(), clientConfiguration) {} + Client::Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration, bool poolConnections) : impl_(std::make_shared(serviceUrl, clientConfiguration, poolConnections)) {} @@ -171,6 +176,8 @@ Result Client::close() { return result; } +Result Client::updateServiceUrl(const std::string& serviceUrl) { return impl_->updateServiceUrl(serviceUrl); } + void Client::closeAsync(CloseCallback callback) { impl_->closeAsync(callback); } void Client::shutdown() { impl_->shutdown(); } diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index a9c16536..eceae448 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -497,6 +497,24 @@ void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, GetPartiti std::placeholders::_2, topicName, callback)); } +Result ClientImpl::updateServiceUrl(const std::string& serviceUrl) { + Lock lock(mutex_); + if (state_ != Open) { + lock.unlock(); + return ResultAlreadyClosed; + } + + LOG_INFO("Updating service URL to " << serviceUrl); + try { + serviceNameResolver_.updateServiceUrl(serviceUrl); + } catch (const std::invalid_argument& e) { + LOG_ERROR("Invalid service-url " << serviceUrl << "provided " << e.what()); + return ResultInvalidUrl; + } + pool_.disconnect(); + return ResultOk; +} + void ClientImpl::closeAsync(CloseCallback callback) { if (state_ != Open) { if (callback) { diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index 8a393960..34ddd469 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -87,6 +87,8 @@ class ClientImpl : public std::enable_shared_from_this { Future getConnection(const std::string& topic); + Result updateServiceUrl(const std::string& serviceUrl); + void closeAsync(CloseCallback callback); void shutdown(); diff --git a/lib/ConnectionPool.cc b/lib/ConnectionPool.cc index 37c72b9e..d32fc04b 100644 --- a/lib/ConnectionPool.cc +++ b/lib/ConnectionPool.cc @@ -47,7 +47,11 @@ bool ConnectionPool::close() { if (!closed_.compare_exchange_strong(expectedState, true)) { return false; } + disconnect(); + return true; +} +void ConnectionPool::disconnect() { std::unique_lock lock(mutex_); if (poolConnections_) { for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) { @@ -58,7 +62,6 @@ bool ConnectionPool::close() { } pool_.clear(); } - return true; } Future ConnectionPool::getConnectionAsync( diff --git a/lib/ConnectionPool.h b/lib/ConnectionPool.h index cd55fc2a..8a02131a 100644 --- a/lib/ConnectionPool.h +++ b/lib/ConnectionPool.h @@ -50,6 +50,11 @@ class PULSAR_PUBLIC ConnectionPool { */ bool close(); + /** + * Disconnect and clear all connect. + */ + void disconnect(); + /** * Get a connection from the pool. *

diff --git a/lib/ServiceNameResolver.h b/lib/ServiceNameResolver.h index 8457d0e1..87c0146a 100644 --- a/lib/ServiceNameResolver.h +++ b/lib/ServiceNameResolver.h @@ -21,6 +21,7 @@ #include #include +#include #include "ServiceURI.h" @@ -29,30 +30,32 @@ namespace pulsar { class ServiceNameResolver { public: ServiceNameResolver(const std::string& uriString) - : serviceUri_(uriString), numAddresses_(serviceUri_.getServiceHosts().size()) { - assert(numAddresses_ > 0); // the validation has been done in ServiceURI - } + : serviceUri_(std::make_shared(uriString)) {} ServiceNameResolver(const ServiceNameResolver&) = delete; ServiceNameResolver& operator=(const ServiceNameResolver&) = delete; bool useTls() const noexcept { - return serviceUri_.getScheme() == PulsarScheme::PULSAR_SSL || - serviceUri_.getScheme() == PulsarScheme::HTTPS; + return serviceUri_->getScheme() == PulsarScheme::PULSAR_SSL || + serviceUri_->getScheme() == PulsarScheme::HTTPS; } bool useHttp() const noexcept { - return serviceUri_.getScheme() == PulsarScheme::HTTP || - serviceUri_.getScheme() == PulsarScheme::HTTPS; + return serviceUri_->getScheme() == PulsarScheme::HTTP || + serviceUri_->getScheme() == PulsarScheme::HTTPS; } const std::string& resolveHost() { - return serviceUri_.getServiceHosts()[(numAddresses_ == 1) ? 0 : (index_++ % numAddresses_)]; + return serviceUri_->getServiceHosts()[(serviceUri_->getNumAddresses() == 1) + ? 0 + : (index_++ % serviceUri_->getNumAddresses())]; } + void updateServiceUrl(const std::string& urlString) { serviceUri_.reset(new ServiceURI(urlString)); } + private: - const ServiceURI serviceUri_; - const size_t numAddresses_; + typedef std::shared_ptr ServiceURIPtr; + ServiceURIPtr serviceUri_; std::atomic_size_t index_{0}; friend class PulsarFriend; diff --git a/lib/ServiceURI.h b/lib/ServiceURI.h index 6ee26a78..991cae2c 100644 --- a/lib/ServiceURI.h +++ b/lib/ServiceURI.h @@ -38,6 +38,8 @@ class ServiceURI { const std::vector& getServiceHosts() const noexcept { return data_.second; } + size_t getNumAddresses() const noexcept { return data_.second.size(); }; + private: // The 2 elements of the pair are: // 1. The Scheme of the lookup protocol diff --git a/tests/ServiceUrlProviderTest.cc b/tests/ServiceUrlProviderTest.cc new file mode 100644 index 00000000..8c290317 --- /dev/null +++ b/tests/ServiceUrlProviderTest.cc @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +using namespace pulsar; + +static const std::string lookupUrl = "pulsar://localhost:6650"; +static const std::string adminUrl = "http://localhost:8080/"; + +class ServiceUrlProviderTest : public ::testing::TestWithParam { + public: + void SetUp() override { serviceUrl = GetParam(); } + + std::string serviceUrl; +}; + +TEST(ServiceUrlProviderTest, testClientClose) { + const std::string topicName = "testClientClose-" + std::to_string(time(nullptr)); + Client client([]() -> const std::string& { return lookupUrl; }); + client.close(); + ASSERT_EQ(ResultAlreadyClosed, client.updateServiceUrl(lookupUrl)); + std::map testMap; +} + +TEST_P(ServiceUrlProviderTest, testBasicUpdateUrl) { + const std::string topicName = "basicUpdateUrl-" + std::to_string(time(nullptr)); + Client client([this]() -> const std::string& { return serviceUrl; }); + + Producer producer1; + ASSERT_EQ(ResultOk, client.createProducer(topicName, producer1)); + + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topicName, "test-sub", consumer)); + + // Update service url. + ASSERT_EQ(ResultOk, client.updateServiceUrl(serviceUrl)); + Producer producer2; + ASSERT_EQ(ResultOk, client.createProducer(topicName, producer2)); + + // Assert that both producer1 and producer2 are available + int sendNum = 10; + for (int i = 0; i < sendNum; ++i) { + ASSERT_EQ(ResultOk, producer1.send(MessageBuilder().setContent("test").build())); + ASSERT_EQ(ResultOk, producer2.send(MessageBuilder().setContent("test").build())); + } + + Message msg; + for (int i = 0; i < 2 * sendNum; ++i) { + ASSERT_EQ(ResultOk, consumer.receive(msg)); + } + + client.close(); +} + +TEST(ServiceUrlProviderTest, testInvalidServiceUrl) { + const std::string invalidServiceUrl = "invalid://localhost:6650"; + + // Assert invalid url throw exception when create client. + { + ASSERT_THROW( + Client client([&invalidServiceUrl]() -> const std::string& { return invalidServiceUrl; }), + std::invalid_argument); + } + + // Assert return ResultInvalidUrl when client.updateServiceUrl(invalidServiceUrl); + { + Client client([]() -> const std::string& { return lookupUrl; }); + ASSERT_EQ(ResultInvalidUrl, client.updateServiceUrl(invalidServiceUrl)); + } +} + +INSTANTIATE_TEST_SUITE_P(BasicEndToEndTest, ServiceUrlProviderTest, testing::Values(lookupUrl, adminUrl));