diff --git a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h index 11bfc43f734fd..2f2c4614618cb 100644 --- a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h +++ b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h @@ -244,6 +244,22 @@ class PULSAR_PUBLIC ClientConfiguration { */ unsigned int getPartitionsUpdateInterval() const; + /** + * Set the duration of time to wait for a connection to a broker to be established. If the duration passes + * without a response from the broker, the connection attempt is dropped. + * + * Default: 10000 + * + * @param timeoutMs the duration in milliseconds + * @return + */ + ClientConfiguration& setConnectionTimeout(int timeoutMs); + + /** + * The getter associated with setConnectionTimeout(). + */ + int getConnectionTimeout() const; + friend class ClientImpl; friend class PulsarWrapper; diff --git a/pulsar-client-cpp/lib/ClientConfiguration.cc b/pulsar-client-cpp/lib/ClientConfiguration.cc index 52072a3bbed3c..188980af8d497 100644 --- a/pulsar-client-cpp/lib/ClientConfiguration.cc +++ b/pulsar-client-cpp/lib/ClientConfiguration.cc @@ -140,4 +140,12 @@ ClientConfiguration& ClientConfiguration::setListenerName(const std::string& lis } const std::string& ClientConfiguration::getListenerName() const { return impl_->listenerName; } + +ClientConfiguration& ClientConfiguration::setConnectionTimeout(int timeoutMs) { + impl_->connectionTimeoutMs = timeoutMs; + return *this; +} + +int ClientConfiguration::getConnectionTimeout() const { return impl_->connectionTimeoutMs; } + } // namespace pulsar diff --git a/pulsar-client-cpp/lib/ClientConfigurationImpl.h b/pulsar-client-cpp/lib/ClientConfigurationImpl.h index 631e8ae59d5cf..887ecf2037851 100644 --- a/pulsar-client-cpp/lib/ClientConfigurationImpl.h +++ b/pulsar-client-cpp/lib/ClientConfigurationImpl.h @@ -39,6 +39,7 @@ struct ClientConfigurationImpl { bool validateHostName{false}; unsigned int partitionsUpdateInterval{60}; // 1 minute std::string listenerName; + int connectionTimeoutMs{10000}; // 10 seconds std::unique_ptr takeLogger() { return std::move(loggerFactory); } }; diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index 58a3ccee1dc5e..3b071150a431d 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -175,6 +175,8 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: error_(boost::system::error_code()), incomingBuffer_(SharedBuffer::allocate(DefaultBufferSize)), incomingCmd_(), + connectTimeoutTask_(std::make_shared(executor_->getIOService(), + clientConfiguration.getConnectionTimeout())), pendingWriteBuffers_(), pendingWriteOperations_(0), outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)), @@ -374,6 +376,7 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err, LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_); } state_ = TcpConnected; + connectTimeoutTask_->stop(); socket_->set_option(tcp::no_delay(true)); socket_->set_option(tcp::socket::keep_alive(true)); @@ -414,7 +417,13 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err, } } else if (endpointIterator != tcp::resolver::iterator()) { // The connection failed. Try the next endpoint in the list. - socket_->close(); + boost::system::error_code err; + socket_->close(err); // ignore the error of close + if (err) { + LOG_WARN(cnxString_ << "Failed to close socket: " << err.message()); + } + connectTimeoutTask_->stop(); + connectTimeoutTask_->start(); tcp::endpoint endpoint = *endpointIterator; socket_->async_connect(endpoint, std::bind(&ClientConnection::handleTcpConnected, shared_from_this(), std::placeholders::_1, ++endpointIterator)); @@ -500,6 +509,18 @@ void ClientConnection::handleResolve(const boost::system::error_code& err, return; } + auto self = shared_from_this(); + connectTimeoutTask_->setCallback([this, self](const PeriodicTask::ErrorCode& ec) { + if (state_ != TcpConnected) { + LOG_ERROR(cnxString_ << "Connection was not established in " << connectTimeoutTask_->getPeriodMs() + << " ms, close the socket"); + PeriodicTask::ErrorCode ignoredError; + socket_->close(ignoredError); + } + connectTimeoutTask_->stop(); + }); + + connectTimeoutTask_->start(); if (endpointIterator != tcp::resolver::iterator()) { LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name() // << " to " << endpointIterator->endpoint()); @@ -1412,6 +1433,9 @@ void ClientConnection::close() { state_ = Disconnected; boost::system::error_code err; socket_->close(err); + if (err) { + LOG_WARN(cnxString_ << "Failed to close socket: " << err.message()); + } if (tlsSocket_) { tlsSocket_->lowest_layer().close(); @@ -1442,6 +1466,8 @@ void ClientConnection::close() { consumerStatsRequestTimer_.reset(); } + connectTimeoutTask_->stop(); + lock.unlock(); LOG_INFO(cnxString_ << "Connection closed"); diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h index 71db1adf888d3..a20219cfc751a 100644 --- a/pulsar-client-cpp/lib/ClientConnection.h +++ b/pulsar-client-cpp/lib/ClientConnection.h @@ -45,6 +45,7 @@ #include #include #include +#include "lib/PeriodicTask.h" using namespace pulsar; @@ -283,6 +284,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this connectPromise_; + std::shared_ptr connectTimeoutTask_; typedef std::map PendingRequestsMap; PendingRequestsMap pendingRequests_; diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc index d9a2b570d8c48..b93ad2dcd9df1 100644 --- a/pulsar-client-cpp/lib/ClientImpl.cc +++ b/pulsar-client-cpp/lib/ClientImpl.cc @@ -480,8 +480,9 @@ void ClientImpl::closeAsync(CloseCallback callback) { state_ = Closing; lock.unlock(); - LOG_INFO("Closing Pulsar client"); SharedInt numberOfOpenHandlers = std::make_shared(producers.size() + consumers.size()); + LOG_INFO("Closing Pulsar client with " << producers.size() << " producers and " << consumers.size() + << " consumers"); for (ProducersList::iterator it = producers.begin(); it != producers.end(); ++it) { ProducerImplBasePtr producer = it->lock(); diff --git a/pulsar-client-cpp/lib/ExecutorService.h b/pulsar-client-cpp/lib/ExecutorService.h index b673b791724fc..d0ffc23c2cb22 100644 --- a/pulsar-client-cpp/lib/ExecutorService.h +++ b/pulsar-client-cpp/lib/ExecutorService.h @@ -47,6 +47,8 @@ class PULSAR_PUBLIC ExecutorService : private boost::noncopyable { void postWork(std::function task); void close(); + boost::asio::io_service &getIOService() { return *io_service_; } + private: /* * only called once and within lock so no need to worry about thread-safety diff --git a/pulsar-client-cpp/lib/PeriodicTask.cc b/pulsar-client-cpp/lib/PeriodicTask.cc new file mode 100644 index 0000000000000..f25a175e4d073 --- /dev/null +++ b/pulsar-client-cpp/lib/PeriodicTask.cc @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "lib/PeriodicTask.h" +#include + +namespace pulsar { + +void PeriodicTask::start() { + if (state_ != Pending) { + return; + } + state_ = Ready; + if (periodMs_ >= 0) { + auto self = shared_from_this(); + timer_.expires_from_now(boost::posix_time::millisec(periodMs_)); + timer_.async_wait([this, self](const ErrorCode& ec) { handleTimeout(ec); }); + } +} + +void PeriodicTask::stop() { + State state = Ready; + if (!state_.compare_exchange_strong(state, Closing)) { + return; + } + timer_.cancel(); + state_ = Pending; +} + +void PeriodicTask::handleTimeout(const ErrorCode& ec) { + if (state_ != Ready) { + return; + } + + callback_(ec); + + // state_ may be changed in handleTimeout, so we check state_ again + if (state_ == Ready) { + auto self = shared_from_this(); + timer_.expires_from_now(boost::posix_time::millisec(periodMs_)); + timer_.async_wait([this, self](const ErrorCode& ec) { handleTimeout(ec); }); + } +} + +} // namespace pulsar diff --git a/pulsar-client-cpp/lib/PeriodicTask.h b/pulsar-client-cpp/lib/PeriodicTask.h new file mode 100644 index 0000000000000..57d0734859fd1 --- /dev/null +++ b/pulsar-client-cpp/lib/PeriodicTask.h @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +#include +#include +#include + +#include + +namespace pulsar { + +/** + * A task that is executed periodically. + * + * After the `start()` method is called, it will trigger `callback_` method periodically whose interval is + * `periodMs` in the constructor. After the `stop()` method is called, the timer will be cancelled and + * `callback()` will never be called again unless `start()` was called again. + * + * If you don't want to execute the task infinitely, you can call `stop()` in the implementation of + * `callback()` method. + * + * NOTE: If the `periodMs` is negative, the `callback()` will never be called. + */ +class PeriodicTask : public std::enable_shared_from_this { + public: + using ErrorCode = boost::system::error_code; + using CallbackType = std::function; + + enum State : std::uint8_t + { + Pending, + Ready, + Closing + }; + + PeriodicTask(boost::asio::io_service& ioService, int periodMs) : timer_(ioService), periodMs_(periodMs) {} + + void start(); + + void stop(); + + void setCallback(CallbackType callback) noexcept { callback_ = callback; } + + State getState() const noexcept { return state_; } + int getPeriodMs() const noexcept { return periodMs_; } + + private: + std::atomic state_{Pending}; + boost::asio::deadline_timer timer_; + const int periodMs_; + CallbackType callback_{trivialCallback}; + + void handleTimeout(const ErrorCode& ec); + + static void trivialCallback(const ErrorCode&) {} +}; + +} // namespace pulsar diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py index 93df3b5f90a28..514ca11dc7d0d 100644 --- a/pulsar-client-cpp/python/pulsar/__init__.py +++ b/pulsar-client-cpp/python/pulsar/__init__.py @@ -363,7 +363,8 @@ def __init__(self, service_url, tls_trust_certs_file_path=None, tls_allow_insecure_connection=False, tls_validate_hostname=False, - logger=None + logger=None, + connection_timeout_ms=10000, ): """ Create a new Pulsar client instance. @@ -409,10 +410,13 @@ def __init__(self, service_url, the endpoint. * `logger`: Set a Python logger for this Pulsar client. Should be an instance of `logging.Logger`. + * `connection_timeout_ms`: + Set timeout in milliseconds on TCP connections. """ _check_type(str, service_url, 'service_url') _check_type_or_none(Authentication, authentication, 'authentication') _check_type(int, operation_timeout_seconds, 'operation_timeout_seconds') + _check_type(int, connection_timeout_ms, 'connection_timeout_ms') _check_type(int, io_threads, 'io_threads') _check_type(int, message_listener_threads, 'message_listener_threads') _check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests') @@ -427,6 +431,7 @@ def __init__(self, service_url, if authentication: conf.authentication(authentication.auth) conf.operation_timeout_seconds(operation_timeout_seconds) + conf.connection_timeout(connection_timeout_ms) conf.io_threads(io_threads) conf.message_listener_threads(message_listener_threads) conf.concurrent_lookup_requests(concurrent_lookup_requests) diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py index dd2b228f308b9..fc17c72f03711 100755 --- a/pulsar-client-cpp/python/pulsar_test.py +++ b/pulsar-client-cpp/python/pulsar_test.py @@ -30,7 +30,7 @@ AuthenticationTLS, Authentication, AuthenticationToken, InitialPosition, \ CryptoKeyReader -from _pulsar import ProducerConfiguration, ConsumerConfiguration +from _pulsar import ProducerConfiguration, ConsumerConfiguration, ConnectError from schema_test import * @@ -1148,6 +1148,22 @@ def test_negative_acks(self): consumer.receive(100) client.close() + def test_connect_timeout(self): + client = pulsar.Client( + service_url='pulsar://192.0.2.1:1234', + connection_timeout_ms=1000, # 1 second + ) + t1 = time.time() + try: + producer = client.create_producer('test_connect_timeout') + self.fail('create_producer should not succeed') + except ConnectError as expected: + print('expected error: {} when create producer'.format(expected)) + t2 = time.time() + self.assertGreater(t2 - t1, 1.0) + self.assertLess(t2 - t1, 1.5) # 1.5 seconds is long enough + client.close() + def _check_value_error(self, fun): with self.assertRaises(ValueError): fun() diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc index 583820853b426..b665ec7089598 100644 --- a/pulsar-client-cpp/python/src/config.cc +++ b/pulsar-client-cpp/python/src/config.cc @@ -195,6 +195,8 @@ void export_config() { .def("authentication", &ClientConfiguration_setAuthentication, return_self<>()) .def("operation_timeout_seconds", &ClientConfiguration::getOperationTimeoutSeconds) .def("operation_timeout_seconds", &ClientConfiguration::setOperationTimeoutSeconds, return_self<>()) + .def("connection_timeout", &ClientConfiguration::getConnectionTimeout) + .def("connection_timeout", &ClientConfiguration::setConnectionTimeout, return_self<>()) .def("io_threads", &ClientConfiguration::getIOThreads) .def("io_threads", &ClientConfiguration::setIOThreads, return_self<>()) .def("message_listener_threads", &ClientConfiguration::getMessageListenerThreads) diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc index 91232dcf7d6aa..129a322b262e5 100644 --- a/pulsar-client-cpp/tests/ClientTest.cc +++ b/pulsar-client-cpp/tests/ClientTest.cc @@ -18,6 +18,7 @@ */ #include +#include #include #include "../lib/checksum/ChecksumProvider.h" @@ -86,3 +87,30 @@ TEST(ClientTest, testServerConnectError) { ASSERT_EQ(ResultConnectError, client.createReader(topic, MessageId::earliest(), readerConf, reader)); client.close(); } + +TEST(ClientTest, testConnectTimeout) { + // 192.0.2.0/24 is assigned for documentation, should be a deadend + const std::string blackHoleBroker = "pulsar://192.0.2.1:1234"; + const std::string topic = "test-connect-timeout"; + + Client clientLow(blackHoleBroker, ClientConfiguration().setConnectionTimeout(1000)); + Client clientDefault(blackHoleBroker); + + std::promise promiseLow; + clientLow.createProducerAsync( + topic, [&promiseLow](Result result, Producer producer) { promiseLow.set_value(result); }); + + std::promise promiseDefault; + clientDefault.createProducerAsync( + topic, [&promiseDefault](Result result, Producer producer) { promiseDefault.set_value(result); }); + + auto futureLow = promiseLow.get_future(); + ASSERT_EQ(futureLow.wait_for(std::chrono::milliseconds(1500)), std::future_status::ready); + ASSERT_EQ(futureLow.get(), ResultConnectError); + + auto futureDefault = promiseDefault.get_future(); + ASSERT_EQ(futureDefault.wait_for(std::chrono::milliseconds(10)), std::future_status::timeout); + + clientLow.close(); + clientDefault.close(); +} diff --git a/pulsar-client-cpp/tests/PeriodicTaskTest.cc b/pulsar-client-cpp/tests/PeriodicTaskTest.cc new file mode 100644 index 0000000000000..11c1c62ec3f2b --- /dev/null +++ b/pulsar-client-cpp/tests/PeriodicTaskTest.cc @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include +#include +#include +#include "lib/ExecutorService.h" +#include "lib/LogUtils.h" +#include "lib/PeriodicTask.h" + +DECLARE_LOG_OBJECT() + +using namespace pulsar; + +TEST(PeriodicTaskTest, testCountdownTask) { + ExecutorService executor; + + std::atomic_int count{5}; + + auto task = std::make_shared(executor.getIOService(), 200); + task->setCallback([task, &count](const PeriodicTask::ErrorCode& ec) { + if (--count <= 0) { + task->stop(); + } + LOG_INFO("Now count is " << count << ", error code: " << ec.message()); + }); + + // Wait for 2 seconds to verify callback won't be triggered after 1 second (200 ms * 5) + task->start(); + std::this_thread::sleep_for(std::chrono::seconds(2)); + LOG_INFO("Now count is " << count); + ASSERT_EQ(count.load(), 0); + task->stop(); // it's redundant, just to verify multiple stop() is idempotent + + // Test start again + count = 1; + task->start(); + std::this_thread::sleep_for(std::chrono::milliseconds(800)); + LOG_INFO("Now count is " << count); + ASSERT_EQ(count.load(), 0); + task->stop(); + + executor.close(); +} + +TEST(PeriodicTaskTest, testNegativePeriod) { + ExecutorService executor; + + auto task = std::make_shared(executor.getIOService(), -1); + std::atomic_bool callbackTriggered{false}; + task->setCallback([&callbackTriggered](const PeriodicTask::ErrorCode& ec) { callbackTriggered = true; }); + + task->start(); + std::this_thread::sleep_for(std::chrono::seconds(1)); + ASSERT_EQ(callbackTriggered.load(), false); + task->stop(); + + executor.close(); +}