From a40a1a927dfdec137d5da0c913f7ef14601b74e5 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 22 Jun 2021 22:51:31 +0800 Subject: [PATCH 1/9] Add PeriodicTask and its tests --- pulsar-client-cpp/lib/ExecutorService.h | 2 + pulsar-client-cpp/lib/PeriodicTask.cc | 63 +++++++++++++++++++++ pulsar-client-cpp/lib/PeriodicTask.h | 61 ++++++++++++++++++++ pulsar-client-cpp/tests/PeriodicTaskTest.cc | 63 +++++++++++++++++++++ 4 files changed, 189 insertions(+) create mode 100644 pulsar-client-cpp/lib/PeriodicTask.cc create mode 100644 pulsar-client-cpp/lib/PeriodicTask.h create mode 100644 pulsar-client-cpp/tests/PeriodicTaskTest.cc diff --git a/pulsar-client-cpp/lib/ExecutorService.h b/pulsar-client-cpp/lib/ExecutorService.h index b673b791724fc..d0ffc23c2cb22 100644 --- a/pulsar-client-cpp/lib/ExecutorService.h +++ b/pulsar-client-cpp/lib/ExecutorService.h @@ -47,6 +47,8 @@ class PULSAR_PUBLIC ExecutorService : private boost::noncopyable { void postWork(std::function task); void close(); + boost::asio::io_service &getIOService() { return *io_service_; } + private: /* * only called once and within lock so no need to worry about thread-safety diff --git a/pulsar-client-cpp/lib/PeriodicTask.cc b/pulsar-client-cpp/lib/PeriodicTask.cc new file mode 100644 index 0000000000000..576396cae8dae --- /dev/null +++ b/pulsar-client-cpp/lib/PeriodicTask.cc @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "lib/PeriodicTask.h" +#include + +namespace pulsar { + +PeriodicTask::PeriodicTask(boost::asio::io_service& ioService, int periodMs) + : timer_(ioService), periodMs_(periodMs) {} + +void PeriodicTask::start() { + if (state_ != Pending) { + return; + } + state_ = Ready; + if (periodMs_ >= 0) { + auto self = shared_from_this(); + timer_.expires_from_now(boost::posix_time::millisec(periodMs_)); + timer_.async_wait([this, self](const ErrorCode& ec) { handleTimeout(ec); }); + } +} + +void PeriodicTask::stop() { + State state = Ready; + if (!state_.compare_exchange_strong(state, Closing)) { + return; + } + timer_.cancel(); + state_ = Pending; +} + +void PeriodicTask::handleTimeout(const ErrorCode& ec) { + if (state_ != Ready) { + return; + } + + callback(ec); + + // state_ may be changed in handleTimeout, so we check state_ again + if (state_ == Ready) { + auto self = shared_from_this(); + timer_.expires_from_now(boost::posix_time::millisec(periodMs_)); + timer_.async_wait([this, self](const ErrorCode& ec) { handleTimeout(ec); }); + } +} + +} // namespace pulsar diff --git a/pulsar-client-cpp/lib/PeriodicTask.h b/pulsar-client-cpp/lib/PeriodicTask.h new file mode 100644 index 0000000000000..a0d08dfcc02cb --- /dev/null +++ b/pulsar-client-cpp/lib/PeriodicTask.h @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +#include +#include + +#include + +namespace pulsar { + +/** + * A task that is executed periodically. + */ +class PeriodicTask : public std::enable_shared_from_this { + public: + using ErrorCode = boost::system::error_code; + + enum State : std::uint8_t + { + Pending, + Ready, + Closing + }; + + PeriodicTask(boost::asio::io_service& ioService, int periodMs); + + void start(); + + void stop(); + + virtual void callback(const ErrorCode& ec) = 0; + + State getState() const noexcept { return state_; } + + private: + std::atomic state_{Pending}; + boost::asio::deadline_timer timer_; + const int periodMs_; + + void handleTimeout(const ErrorCode& ec); +}; + +} // namespace pulsar diff --git a/pulsar-client-cpp/tests/PeriodicTaskTest.cc b/pulsar-client-cpp/tests/PeriodicTaskTest.cc new file mode 100644 index 0000000000000..2a3f561609d37 --- /dev/null +++ b/pulsar-client-cpp/tests/PeriodicTaskTest.cc @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include +#include +#include +#include "lib/ExecutorService.h" +#include "lib/LogUtils.h" +#include "lib/PeriodicTask.h" + +DECLARE_LOG_OBJECT() + +using namespace pulsar; + +class CountdownTask : public PeriodicTask { + public: + CountdownTask(boost::asio::io_service& ioService, int periodMs, int initialCount) + : PeriodicTask(ioService, periodMs), count_(initialCount) {} + + void callback(const ErrorCode& ec) override { + if (--count_ <= 0) { + stop(); + } + LOG_INFO("Now count is " << count_ << ", error code: " << ec.message()); + } + + int getCount() const noexcept { return count_; } + + private: + std::atomic_int count_; +}; + +TEST(PeriodicTaskTest, testCountdownTask) { + ExecutorService executor; + + auto task = std::make_shared(executor.getIOService(), 200, 5); + + // Wait for 3 seconds to verify callback won't be triggered after 1 second (200 ms * 5) + task->start(); + std::this_thread::sleep_for(std::chrono::seconds(3)); + LOG_INFO("Now count is " << task->getCount()); + ASSERT_EQ(task->getCount(), 0); + + task->stop(); // it's redundant, just to verify multiple stop() is idempotent + + executor.close(); +} From 3905b9c4113069887623e1c8e52f609c1612b2f7 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 22 Jun 2021 23:16:44 +0800 Subject: [PATCH 2/9] Add tests for negative timeout --- pulsar-client-cpp/lib/PeriodicTask.cc | 3 --- pulsar-client-cpp/lib/PeriodicTask.h | 11 +++++++++- pulsar-client-cpp/tests/PeriodicTaskTest.cc | 24 ++++++++++++++++++++- 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/pulsar-client-cpp/lib/PeriodicTask.cc b/pulsar-client-cpp/lib/PeriodicTask.cc index 576396cae8dae..640b690310a8c 100644 --- a/pulsar-client-cpp/lib/PeriodicTask.cc +++ b/pulsar-client-cpp/lib/PeriodicTask.cc @@ -21,9 +21,6 @@ namespace pulsar { -PeriodicTask::PeriodicTask(boost::asio::io_service& ioService, int periodMs) - : timer_(ioService), periodMs_(periodMs) {} - void PeriodicTask::start() { if (state_ != Pending) { return; diff --git a/pulsar-client-cpp/lib/PeriodicTask.h b/pulsar-client-cpp/lib/PeriodicTask.h index a0d08dfcc02cb..6b9f4fbaab4fb 100644 --- a/pulsar-client-cpp/lib/PeriodicTask.h +++ b/pulsar-client-cpp/lib/PeriodicTask.h @@ -28,6 +28,15 @@ namespace pulsar { /** * A task that is executed periodically. + * + * After the `start()` method is called, it will trigger `callback()` method periodically whose interval is + * `periodMs` in the constructor. After the `stop()` method is called, the timer will be cancelled and + * `callback()` will never be called again unless `start()` was called again. + * + * If you don't want to execute the task infinitely, you can call `stop()` in the implementation of + * `callback()` method. + * + * NOTE: If the `periodMs` is negative, the `callback()` will never be called. */ class PeriodicTask : public std::enable_shared_from_this { public: @@ -40,7 +49,7 @@ class PeriodicTask : public std::enable_shared_from_this { Closing }; - PeriodicTask(boost::asio::io_service& ioService, int periodMs); + PeriodicTask(boost::asio::io_service& ioService, int periodMs) : timer_(ioService), periodMs_(periodMs) {} void start(); diff --git a/pulsar-client-cpp/tests/PeriodicTaskTest.cc b/pulsar-client-cpp/tests/PeriodicTaskTest.cc index 2a3f561609d37..65261a0759b9a 100644 --- a/pulsar-client-cpp/tests/PeriodicTaskTest.cc +++ b/pulsar-client-cpp/tests/PeriodicTaskTest.cc @@ -42,6 +42,8 @@ class CountdownTask : public PeriodicTask { int getCount() const noexcept { return count_; } + void setCount(int count) noexcept { count_ = count; } + private: std::atomic_int count_; }; @@ -56,8 +58,28 @@ TEST(PeriodicTaskTest, testCountdownTask) { std::this_thread::sleep_for(std::chrono::seconds(3)); LOG_INFO("Now count is " << task->getCount()); ASSERT_EQ(task->getCount(), 0); - task->stop(); // it's redundant, just to verify multiple stop() is idempotent + // Test start again + task->setCount(1); + ASSERT_EQ(task->getCount(), 1); + task->start(); + std::this_thread::sleep_for(std::chrono::milliseconds(800)); + LOG_INFO("Now count is " << task->getCount()); + ASSERT_EQ(task->getCount(), 0); + task->stop(); + + executor.close(); +} + +TEST(PeriodicTaskTest, testNegativePeriod) { + ExecutorService executor; + + auto task = std::make_shared(executor.getIOService(), -1, 5); + task->start(); + std::this_thread::sleep_for(std::chrono::seconds(1)); + ASSERT_EQ(task->getCount(), 5); // the callback is never called + task->stop(); + executor.close(); } From 1e115ed92b737612e004ea895ac5d5e9f7d3fa77 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 22 Jun 2021 23:32:23 +0800 Subject: [PATCH 3/9] Add connection timeout to ClientConfiguration --- .../include/pulsar/ClientConfiguration.h | 16 ++++++++++++++++ pulsar-client-cpp/lib/ClientConfiguration.cc | 8 ++++++++ pulsar-client-cpp/lib/ClientConfigurationImpl.h | 1 + 3 files changed, 25 insertions(+) diff --git a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h index 11bfc43f734fd..2f2c4614618cb 100644 --- a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h +++ b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h @@ -244,6 +244,22 @@ class PULSAR_PUBLIC ClientConfiguration { */ unsigned int getPartitionsUpdateInterval() const; + /** + * Set the duration of time to wait for a connection to a broker to be established. If the duration passes + * without a response from the broker, the connection attempt is dropped. + * + * Default: 10000 + * + * @param timeoutMs the duration in milliseconds + * @return + */ + ClientConfiguration& setConnectionTimeout(int timeoutMs); + + /** + * The getter associated with setConnectionTimeout(). + */ + int getConnectionTimeout() const; + friend class ClientImpl; friend class PulsarWrapper; diff --git a/pulsar-client-cpp/lib/ClientConfiguration.cc b/pulsar-client-cpp/lib/ClientConfiguration.cc index 52072a3bbed3c..188980af8d497 100644 --- a/pulsar-client-cpp/lib/ClientConfiguration.cc +++ b/pulsar-client-cpp/lib/ClientConfiguration.cc @@ -140,4 +140,12 @@ ClientConfiguration& ClientConfiguration::setListenerName(const std::string& lis } const std::string& ClientConfiguration::getListenerName() const { return impl_->listenerName; } + +ClientConfiguration& ClientConfiguration::setConnectionTimeout(int timeoutMs) { + impl_->connectionTimeoutMs = timeoutMs; + return *this; +} + +int ClientConfiguration::getConnectionTimeout() const { return impl_->connectionTimeoutMs; } + } // namespace pulsar diff --git a/pulsar-client-cpp/lib/ClientConfigurationImpl.h b/pulsar-client-cpp/lib/ClientConfigurationImpl.h index 631e8ae59d5cf..887ecf2037851 100644 --- a/pulsar-client-cpp/lib/ClientConfigurationImpl.h +++ b/pulsar-client-cpp/lib/ClientConfigurationImpl.h @@ -39,6 +39,7 @@ struct ClientConfigurationImpl { bool validateHostName{false}; unsigned int partitionsUpdateInterval{60}; // 1 minute std::string listenerName; + int connectionTimeoutMs{10000}; // 10 seconds std::unique_ptr takeLogger() { return std::move(loggerFactory); } }; From ef3ba3189f274c7a7ff3e0f78e9350201c93433a Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 22 Jun 2021 23:52:07 +0800 Subject: [PATCH 4/9] Use setCallback instead of virtual function --- pulsar-client-cpp/lib/PeriodicTask.cc | 2 +- pulsar-client-cpp/lib/PeriodicTask.h | 9 +++- pulsar-client-cpp/tests/PeriodicTaskTest.cc | 52 +++++++++------------ 3 files changed, 29 insertions(+), 34 deletions(-) diff --git a/pulsar-client-cpp/lib/PeriodicTask.cc b/pulsar-client-cpp/lib/PeriodicTask.cc index 640b690310a8c..f25a175e4d073 100644 --- a/pulsar-client-cpp/lib/PeriodicTask.cc +++ b/pulsar-client-cpp/lib/PeriodicTask.cc @@ -47,7 +47,7 @@ void PeriodicTask::handleTimeout(const ErrorCode& ec) { return; } - callback(ec); + callback_(ec); // state_ may be changed in handleTimeout, so we check state_ again if (state_ == Ready) { diff --git a/pulsar-client-cpp/lib/PeriodicTask.h b/pulsar-client-cpp/lib/PeriodicTask.h index 6b9f4fbaab4fb..eeadde555230a 100644 --- a/pulsar-client-cpp/lib/PeriodicTask.h +++ b/pulsar-client-cpp/lib/PeriodicTask.h @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -29,7 +30,7 @@ namespace pulsar { /** * A task that is executed periodically. * - * After the `start()` method is called, it will trigger `callback()` method periodically whose interval is + * After the `start()` method is called, it will trigger `callback_` method periodically whose interval is * `periodMs` in the constructor. After the `stop()` method is called, the timer will be cancelled and * `callback()` will never be called again unless `start()` was called again. * @@ -41,6 +42,7 @@ namespace pulsar { class PeriodicTask : public std::enable_shared_from_this { public: using ErrorCode = boost::system::error_code; + using CallbackType = std::function; enum State : std::uint8_t { @@ -55,7 +57,7 @@ class PeriodicTask : public std::enable_shared_from_this { void stop(); - virtual void callback(const ErrorCode& ec) = 0; + void setCallback(CallbackType callback) noexcept { callback_ = callback; } State getState() const noexcept { return state_; } @@ -63,8 +65,11 @@ class PeriodicTask : public std::enable_shared_from_this { std::atomic state_{Pending}; boost::asio::deadline_timer timer_; const int periodMs_; + CallbackType callback_{trivialCallback}; void handleTimeout(const ErrorCode& ec); + + static void trivialCallback(const ErrorCode&) {} }; } // namespace pulsar diff --git a/pulsar-client-cpp/tests/PeriodicTaskTest.cc b/pulsar-client-cpp/tests/PeriodicTaskTest.cc index 65261a0759b9a..11c1c62ec3f2b 100644 --- a/pulsar-client-cpp/tests/PeriodicTaskTest.cc +++ b/pulsar-client-cpp/tests/PeriodicTaskTest.cc @@ -28,45 +28,32 @@ DECLARE_LOG_OBJECT() using namespace pulsar; -class CountdownTask : public PeriodicTask { - public: - CountdownTask(boost::asio::io_service& ioService, int periodMs, int initialCount) - : PeriodicTask(ioService, periodMs), count_(initialCount) {} - - void callback(const ErrorCode& ec) override { - if (--count_ <= 0) { - stop(); - } - LOG_INFO("Now count is " << count_ << ", error code: " << ec.message()); - } - - int getCount() const noexcept { return count_; } - - void setCount(int count) noexcept { count_ = count; } - - private: - std::atomic_int count_; -}; - TEST(PeriodicTaskTest, testCountdownTask) { ExecutorService executor; - auto task = std::make_shared(executor.getIOService(), 200, 5); + std::atomic_int count{5}; + + auto task = std::make_shared(executor.getIOService(), 200); + task->setCallback([task, &count](const PeriodicTask::ErrorCode& ec) { + if (--count <= 0) { + task->stop(); + } + LOG_INFO("Now count is " << count << ", error code: " << ec.message()); + }); - // Wait for 3 seconds to verify callback won't be triggered after 1 second (200 ms * 5) + // Wait for 2 seconds to verify callback won't be triggered after 1 second (200 ms * 5) task->start(); - std::this_thread::sleep_for(std::chrono::seconds(3)); - LOG_INFO("Now count is " << task->getCount()); - ASSERT_EQ(task->getCount(), 0); + std::this_thread::sleep_for(std::chrono::seconds(2)); + LOG_INFO("Now count is " << count); + ASSERT_EQ(count.load(), 0); task->stop(); // it's redundant, just to verify multiple stop() is idempotent // Test start again - task->setCount(1); - ASSERT_EQ(task->getCount(), 1); + count = 1; task->start(); std::this_thread::sleep_for(std::chrono::milliseconds(800)); - LOG_INFO("Now count is " << task->getCount()); - ASSERT_EQ(task->getCount(), 0); + LOG_INFO("Now count is " << count); + ASSERT_EQ(count.load(), 0); task->stop(); executor.close(); @@ -75,10 +62,13 @@ TEST(PeriodicTaskTest, testCountdownTask) { TEST(PeriodicTaskTest, testNegativePeriod) { ExecutorService executor; - auto task = std::make_shared(executor.getIOService(), -1, 5); + auto task = std::make_shared(executor.getIOService(), -1); + std::atomic_bool callbackTriggered{false}; + task->setCallback([&callbackTriggered](const PeriodicTask::ErrorCode& ec) { callbackTriggered = true; }); + task->start(); std::this_thread::sleep_for(std::chrono::seconds(1)); - ASSERT_EQ(task->getCount(), 5); // the callback is never called + ASSERT_EQ(callbackTriggered.load(), false); task->stop(); executor.close(); From 214ec970476272313c45db1eb6fd2e318f77b32c Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 23 Jun 2021 01:24:43 +0800 Subject: [PATCH 5/9] Add connection timeout and tests --- pulsar-client-cpp/lib/ClientConnection.cc | 17 ++++++++++++++ pulsar-client-cpp/lib/ClientConnection.h | 2 ++ pulsar-client-cpp/lib/ClientImpl.cc | 3 ++- pulsar-client-cpp/lib/PeriodicTask.h | 1 + pulsar-client-cpp/tests/ClientTest.cc | 28 +++++++++++++++++++++++ 5 files changed, 50 insertions(+), 1 deletion(-) diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index 58a3ccee1dc5e..660088da955c6 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -175,6 +175,8 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: error_(boost::system::error_code()), incomingBuffer_(SharedBuffer::allocate(DefaultBufferSize)), incomingCmd_(), + connectTimeoutTask_(std::make_shared(executor_->getIOService(), + clientConfiguration.getConnectionTimeout())), pendingWriteBuffers_(), pendingWriteOperations_(0), outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)), @@ -374,6 +376,7 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err, LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_); } state_ = TcpConnected; + connectTimeoutTask_->stop(); socket_->set_option(tcp::no_delay(true)); socket_->set_option(tcp::socket::keep_alive(true)); @@ -500,6 +503,18 @@ void ClientConnection::handleResolve(const boost::system::error_code& err, return; } + auto self = shared_from_this(); + connectTimeoutTask_->setCallback([this, self](const PeriodicTask::ErrorCode& ec) { + if (state_ != TcpConnected) { + LOG_ERROR(cnxString_ << "Connection was not established in " << connectTimeoutTask_->getPeriodMs() + << " ms, close the socket"); + PeriodicTask::ErrorCode ignoredError; + socket_->close(ignoredError); + } + connectTimeoutTask_->stop(); + }); + + connectTimeoutTask_->start(); if (endpointIterator != tcp::resolver::iterator()) { LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name() // << " to " << endpointIterator->endpoint()); @@ -1442,6 +1457,8 @@ void ClientConnection::close() { consumerStatsRequestTimer_.reset(); } + connectTimeoutTask_->stop(); + lock.unlock(); LOG_INFO(cnxString_ << "Connection closed"); diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h index 71db1adf888d3..a20219cfc751a 100644 --- a/pulsar-client-cpp/lib/ClientConnection.h +++ b/pulsar-client-cpp/lib/ClientConnection.h @@ -45,6 +45,7 @@ #include #include #include +#include "lib/PeriodicTask.h" using namespace pulsar; @@ -283,6 +284,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this connectPromise_; + std::shared_ptr connectTimeoutTask_; typedef std::map PendingRequestsMap; PendingRequestsMap pendingRequests_; diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc index d9a2b570d8c48..b93ad2dcd9df1 100644 --- a/pulsar-client-cpp/lib/ClientImpl.cc +++ b/pulsar-client-cpp/lib/ClientImpl.cc @@ -480,8 +480,9 @@ void ClientImpl::closeAsync(CloseCallback callback) { state_ = Closing; lock.unlock(); - LOG_INFO("Closing Pulsar client"); SharedInt numberOfOpenHandlers = std::make_shared(producers.size() + consumers.size()); + LOG_INFO("Closing Pulsar client with " << producers.size() << " producers and " << consumers.size() + << " consumers"); for (ProducersList::iterator it = producers.begin(); it != producers.end(); ++it) { ProducerImplBasePtr producer = it->lock(); diff --git a/pulsar-client-cpp/lib/PeriodicTask.h b/pulsar-client-cpp/lib/PeriodicTask.h index eeadde555230a..57d0734859fd1 100644 --- a/pulsar-client-cpp/lib/PeriodicTask.h +++ b/pulsar-client-cpp/lib/PeriodicTask.h @@ -60,6 +60,7 @@ class PeriodicTask : public std::enable_shared_from_this { void setCallback(CallbackType callback) noexcept { callback_ = callback; } State getState() const noexcept { return state_; } + int getPeriodMs() const noexcept { return periodMs_; } private: std::atomic state_{Pending}; diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc index 91232dcf7d6aa..129a322b262e5 100644 --- a/pulsar-client-cpp/tests/ClientTest.cc +++ b/pulsar-client-cpp/tests/ClientTest.cc @@ -18,6 +18,7 @@ */ #include +#include #include #include "../lib/checksum/ChecksumProvider.h" @@ -86,3 +87,30 @@ TEST(ClientTest, testServerConnectError) { ASSERT_EQ(ResultConnectError, client.createReader(topic, MessageId::earliest(), readerConf, reader)); client.close(); } + +TEST(ClientTest, testConnectTimeout) { + // 192.0.2.0/24 is assigned for documentation, should be a deadend + const std::string blackHoleBroker = "pulsar://192.0.2.1:1234"; + const std::string topic = "test-connect-timeout"; + + Client clientLow(blackHoleBroker, ClientConfiguration().setConnectionTimeout(1000)); + Client clientDefault(blackHoleBroker); + + std::promise promiseLow; + clientLow.createProducerAsync( + topic, [&promiseLow](Result result, Producer producer) { promiseLow.set_value(result); }); + + std::promise promiseDefault; + clientDefault.createProducerAsync( + topic, [&promiseDefault](Result result, Producer producer) { promiseDefault.set_value(result); }); + + auto futureLow = promiseLow.get_future(); + ASSERT_EQ(futureLow.wait_for(std::chrono::milliseconds(1500)), std::future_status::ready); + ASSERT_EQ(futureLow.get(), ResultConnectError); + + auto futureDefault = promiseDefault.get_future(); + ASSERT_EQ(futureDefault.wait_for(std::chrono::milliseconds(10)), std::future_status::timeout); + + clientLow.close(); + clientDefault.close(); +} From 384be1ac5b9d389a0c24bfdbb2bd6b13c5042870 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 23 Jun 2021 11:45:55 +0800 Subject: [PATCH 6/9] Add connection timeout to Python client --- pulsar-client-cpp/python/pulsar/__init__.py | 5 +++++ pulsar-client-cpp/python/pulsar_test.py | 18 +++++++++++++++++- pulsar-client-cpp/python/src/config.cc | 2 ++ 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py index 93df3b5f90a28..0aabeab7ece73 100644 --- a/pulsar-client-cpp/python/pulsar/__init__.py +++ b/pulsar-client-cpp/python/pulsar/__init__.py @@ -355,6 +355,7 @@ class Client: def __init__(self, service_url, authentication=None, operation_timeout_seconds=30, + connection_timeout_ms=10000, io_threads=1, message_listener_threads=1, concurrent_lookup_requests=50000, @@ -380,6 +381,8 @@ def __init__(self, service_url, * `operation_timeout_seconds`: Set timeout on client operations (subscribe, create producer, close, unsubscribe). + * `connection_timeout_ms`: + Set timeout in milliseconds on TCP connections. * `io_threads`: Set the number of IO threads to be used by the Pulsar client. * `message_listener_threads`: @@ -413,6 +416,7 @@ def __init__(self, service_url, _check_type(str, service_url, 'service_url') _check_type_or_none(Authentication, authentication, 'authentication') _check_type(int, operation_timeout_seconds, 'operation_timeout_seconds') + _check_type(int, connection_timeout_ms, 'connection_timeout_ms') _check_type(int, io_threads, 'io_threads') _check_type(int, message_listener_threads, 'message_listener_threads') _check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests') @@ -427,6 +431,7 @@ def __init__(self, service_url, if authentication: conf.authentication(authentication.auth) conf.operation_timeout_seconds(operation_timeout_seconds) + conf.connection_timeout(connection_timeout_ms) conf.io_threads(io_threads) conf.message_listener_threads(message_listener_threads) conf.concurrent_lookup_requests(concurrent_lookup_requests) diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py index dd2b228f308b9..fc17c72f03711 100755 --- a/pulsar-client-cpp/python/pulsar_test.py +++ b/pulsar-client-cpp/python/pulsar_test.py @@ -30,7 +30,7 @@ AuthenticationTLS, Authentication, AuthenticationToken, InitialPosition, \ CryptoKeyReader -from _pulsar import ProducerConfiguration, ConsumerConfiguration +from _pulsar import ProducerConfiguration, ConsumerConfiguration, ConnectError from schema_test import * @@ -1148,6 +1148,22 @@ def test_negative_acks(self): consumer.receive(100) client.close() + def test_connect_timeout(self): + client = pulsar.Client( + service_url='pulsar://192.0.2.1:1234', + connection_timeout_ms=1000, # 1 second + ) + t1 = time.time() + try: + producer = client.create_producer('test_connect_timeout') + self.fail('create_producer should not succeed') + except ConnectError as expected: + print('expected error: {} when create producer'.format(expected)) + t2 = time.time() + self.assertGreater(t2 - t1, 1.0) + self.assertLess(t2 - t1, 1.5) # 1.5 seconds is long enough + client.close() + def _check_value_error(self, fun): with self.assertRaises(ValueError): fun() diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc index 583820853b426..b665ec7089598 100644 --- a/pulsar-client-cpp/python/src/config.cc +++ b/pulsar-client-cpp/python/src/config.cc @@ -195,6 +195,8 @@ void export_config() { .def("authentication", &ClientConfiguration_setAuthentication, return_self<>()) .def("operation_timeout_seconds", &ClientConfiguration::getOperationTimeoutSeconds) .def("operation_timeout_seconds", &ClientConfiguration::setOperationTimeoutSeconds, return_self<>()) + .def("connection_timeout", &ClientConfiguration::getConnectionTimeout) + .def("connection_timeout", &ClientConfiguration::setConnectionTimeout, return_self<>()) .def("io_threads", &ClientConfiguration::getIOThreads) .def("io_threads", &ClientConfiguration::setIOThreads, return_self<>()) .def("message_listener_threads", &ClientConfiguration::getMessageListenerThreads) From fe7ef8bcf043bf0f30de31d1d79008dfa65fec2d Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 23 Jun 2021 12:06:57 +0800 Subject: [PATCH 7/9] Register timer before another async_connect --- pulsar-client-cpp/lib/ClientConnection.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index 660088da955c6..8ac1945030182 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -418,6 +418,8 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err, } else if (endpointIterator != tcp::resolver::iterator()) { // The connection failed. Try the next endpoint in the list. socket_->close(); + connectTimeoutTask_->stop(); + connectTimeoutTask_->start(); tcp::endpoint endpoint = *endpointIterator; socket_->async_connect(endpoint, std::bind(&ClientConnection::handleTcpConnected, shared_from_this(), std::placeholders::_1, ++endpointIterator)); From 31eff3ab4bd351219874832a0db2c337b3947f8b Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 23 Jun 2021 12:17:29 +0800 Subject: [PATCH 8/9] Avoid exception thrown when socket is closed --- pulsar-client-cpp/lib/ClientConnection.cc | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index 8ac1945030182..3b071150a431d 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -417,7 +417,11 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err, } } else if (endpointIterator != tcp::resolver::iterator()) { // The connection failed. Try the next endpoint in the list. - socket_->close(); + boost::system::error_code err; + socket_->close(err); // ignore the error of close + if (err) { + LOG_WARN(cnxString_ << "Failed to close socket: " << err.message()); + } connectTimeoutTask_->stop(); connectTimeoutTask_->start(); tcp::endpoint endpoint = *endpointIterator; @@ -1429,6 +1433,9 @@ void ClientConnection::close() { state_ = Disconnected; boost::system::error_code err; socket_->close(err); + if (err) { + LOG_WARN(cnxString_ << "Failed to close socket: " << err.message()); + } if (tlsSocket_) { tlsSocket_->lowest_layer().close(); From 96324ba595524a5dfc1e4e1ec73d5e7371bb72cc Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 24 Jun 2021 23:10:36 +0800 Subject: [PATCH 9/9] Fix Python Function that uses positional argument --- pulsar-client-cpp/python/pulsar/__init__.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py index 0aabeab7ece73..514ca11dc7d0d 100644 --- a/pulsar-client-cpp/python/pulsar/__init__.py +++ b/pulsar-client-cpp/python/pulsar/__init__.py @@ -355,7 +355,6 @@ class Client: def __init__(self, service_url, authentication=None, operation_timeout_seconds=30, - connection_timeout_ms=10000, io_threads=1, message_listener_threads=1, concurrent_lookup_requests=50000, @@ -364,7 +363,8 @@ def __init__(self, service_url, tls_trust_certs_file_path=None, tls_allow_insecure_connection=False, tls_validate_hostname=False, - logger=None + logger=None, + connection_timeout_ms=10000, ): """ Create a new Pulsar client instance. @@ -381,8 +381,6 @@ def __init__(self, service_url, * `operation_timeout_seconds`: Set timeout on client operations (subscribe, create producer, close, unsubscribe). - * `connection_timeout_ms`: - Set timeout in milliseconds on TCP connections. * `io_threads`: Set the number of IO threads to be used by the Pulsar client. * `message_listener_threads`: @@ -412,6 +410,8 @@ def __init__(self, service_url, the endpoint. * `logger`: Set a Python logger for this Pulsar client. Should be an instance of `logging.Logger`. + * `connection_timeout_ms`: + Set timeout in milliseconds on TCP connections. """ _check_type(str, service_url, 'service_url') _check_type_or_none(Authentication, authentication, 'authentication')