Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pulsar-client-cpp/include/pulsar/ClientConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,22 @@ class PULSAR_PUBLIC ClientConfiguration {
*/
unsigned int getPartitionsUpdateInterval() const;

/**
* Set the duration of time to wait for a connection to a broker to be established. If the duration passes
* without a response from the broker, the connection attempt is dropped.
*
* Default: 10000
*
* @param timeoutMs the duration in milliseconds
* @return
*/
ClientConfiguration& setConnectionTimeout(int timeoutMs);

/**
* The getter associated with setConnectionTimeout().
*/
int getConnectionTimeout() const;

friend class ClientImpl;
friend class PulsarWrapper;

Expand Down
8 changes: 8 additions & 0 deletions pulsar-client-cpp/lib/ClientConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,12 @@ ClientConfiguration& ClientConfiguration::setListenerName(const std::string& lis
}

const std::string& ClientConfiguration::getListenerName() const { return impl_->listenerName; }

ClientConfiguration& ClientConfiguration::setConnectionTimeout(int timeoutMs) {
impl_->connectionTimeoutMs = timeoutMs;
return *this;
}

int ClientConfiguration::getConnectionTimeout() const { return impl_->connectionTimeoutMs; }

} // namespace pulsar
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ClientConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ struct ClientConfigurationImpl {
bool validateHostName{false};
unsigned int partitionsUpdateInterval{60}; // 1 minute
std::string listenerName;
int connectionTimeoutMs{10000}; // 10 seconds

std::unique_ptr<LoggerFactory> takeLogger() { return std::move(loggerFactory); }
};
Expand Down
28 changes: 27 additions & 1 deletion pulsar-client-cpp/lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
error_(boost::system::error_code()),
incomingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
incomingCmd_(),
connectTimeoutTask_(std::make_shared<PeriodicTask>(executor_->getIOService(),
clientConfiguration.getConnectionTimeout())),
pendingWriteBuffers_(),
pendingWriteOperations_(0),
outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
Expand Down Expand Up @@ -374,6 +376,7 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_);
}
state_ = TcpConnected;
connectTimeoutTask_->stop();
socket_->set_option(tcp::no_delay(true));

socket_->set_option(tcp::socket::keep_alive(true));
Expand Down Expand Up @@ -414,7 +417,13 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
}
} else if (endpointIterator != tcp::resolver::iterator()) {
// The connection failed. Try the next endpoint in the list.
socket_->close();
boost::system::error_code err;
socket_->close(err); // ignore the error of close
if (err) {
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
}
connectTimeoutTask_->stop();
connectTimeoutTask_->start();
tcp::endpoint endpoint = *endpointIterator;
socket_->async_connect(endpoint, std::bind(&ClientConnection::handleTcpConnected, shared_from_this(),
std::placeholders::_1, ++endpointIterator));
Expand Down Expand Up @@ -500,6 +509,18 @@ void ClientConnection::handleResolve(const boost::system::error_code& err,
return;
}

auto self = shared_from_this();
connectTimeoutTask_->setCallback([this, self](const PeriodicTask::ErrorCode& ec) {
if (state_ != TcpConnected) {
LOG_ERROR(cnxString_ << "Connection was not established in " << connectTimeoutTask_->getPeriodMs()
<< " ms, close the socket");
PeriodicTask::ErrorCode ignoredError;
socket_->close(ignoredError);
}
connectTimeoutTask_->stop();
});

connectTimeoutTask_->start();
if (endpointIterator != tcp::resolver::iterator()) {
LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name() //
<< " to " << endpointIterator->endpoint());
Expand Down Expand Up @@ -1412,6 +1433,9 @@ void ClientConnection::close() {
state_ = Disconnected;
boost::system::error_code err;
socket_->close(err);
if (err) {
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
}

if (tlsSocket_) {
tlsSocket_->lowest_layer().close();
Expand Down Expand Up @@ -1442,6 +1466,8 @@ void ClientConnection::close() {
consumerStatsRequestTimer_.reset();
}

connectTimeoutTask_->stop();

lock.unlock();
LOG_INFO(cnxString_ << "Connection closed");

Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include <pulsar/Client.h>
#include <set>
#include <lib/BrokerConsumerStatsImpl.h>
#include "lib/PeriodicTask.h"

using namespace pulsar;

Expand Down Expand Up @@ -283,6 +284,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
proto::BaseCommand incomingCmd_;

Promise<Result, ClientConnectionWeakPtr> connectPromise_;
std::shared_ptr<PeriodicTask> connectTimeoutTask_;

typedef std::map<long, PendingRequestData> PendingRequestsMap;
PendingRequestsMap pendingRequests_;
Expand Down
3 changes: 2 additions & 1 deletion pulsar-client-cpp/lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,9 @@ void ClientImpl::closeAsync(CloseCallback callback) {
state_ = Closing;
lock.unlock();

LOG_INFO("Closing Pulsar client");
SharedInt numberOfOpenHandlers = std::make_shared<int>(producers.size() + consumers.size());
LOG_INFO("Closing Pulsar client with " << producers.size() << " producers and " << consumers.size()
<< " consumers");

for (ProducersList::iterator it = producers.begin(); it != producers.end(); ++it) {
ProducerImplBasePtr producer = it->lock();
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/ExecutorService.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class PULSAR_PUBLIC ExecutorService : private boost::noncopyable {
void postWork(std::function<void(void)> task);
void close();

boost::asio::io_service &getIOService() { return *io_service_; }

private:
/*
* only called once and within lock so no need to worry about thread-safety
Expand Down
60 changes: 60 additions & 0 deletions pulsar-client-cpp/lib/PeriodicTask.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "lib/PeriodicTask.h"
#include <boost/date_time/posix_time/posix_time.hpp>

namespace pulsar {

void PeriodicTask::start() {
if (state_ != Pending) {
return;
}
state_ = Ready;
if (periodMs_ >= 0) {
auto self = shared_from_this();
timer_.expires_from_now(boost::posix_time::millisec(periodMs_));
timer_.async_wait([this, self](const ErrorCode& ec) { handleTimeout(ec); });
}
}

void PeriodicTask::stop() {
State state = Ready;
if (!state_.compare_exchange_strong(state, Closing)) {
return;
}
timer_.cancel();
state_ = Pending;
}

void PeriodicTask::handleTimeout(const ErrorCode& ec) {
if (state_ != Ready) {
return;
}

callback_(ec);

// state_ may be changed in handleTimeout, so we check state_ again
if (state_ == Ready) {
auto self = shared_from_this();
timer_.expires_from_now(boost::posix_time::millisec(periodMs_));
timer_.async_wait([this, self](const ErrorCode& ec) { handleTimeout(ec); });
}
}

} // namespace pulsar
76 changes: 76 additions & 0 deletions pulsar-client-cpp/lib/PeriodicTask.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once

#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>

#include <boost/asio.hpp>

namespace pulsar {

/**
* A task that is executed periodically.
*
* After the `start()` method is called, it will trigger `callback_` method periodically whose interval is
* `periodMs` in the constructor. After the `stop()` method is called, the timer will be cancelled and
* `callback()` will never be called again unless `start()` was called again.
*
* If you don't want to execute the task infinitely, you can call `stop()` in the implementation of
* `callback()` method.
*
* NOTE: If the `periodMs` is negative, the `callback()` will never be called.
*/
class PeriodicTask : public std::enable_shared_from_this<PeriodicTask> {
public:
using ErrorCode = boost::system::error_code;
using CallbackType = std::function<void(const ErrorCode&)>;

enum State : std::uint8_t
{
Pending,
Ready,
Closing
};

PeriodicTask(boost::asio::io_service& ioService, int periodMs) : timer_(ioService), periodMs_(periodMs) {}

void start();

void stop();

void setCallback(CallbackType callback) noexcept { callback_ = callback; }

State getState() const noexcept { return state_; }
int getPeriodMs() const noexcept { return periodMs_; }

private:
std::atomic<State> state_{Pending};
boost::asio::deadline_timer timer_;
const int periodMs_;
CallbackType callback_{trivialCallback};

void handleTimeout(const ErrorCode& ec);

static void trivialCallback(const ErrorCode&) {}
};

} // namespace pulsar
7 changes: 6 additions & 1 deletion pulsar-client-cpp/python/pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,8 @@ def __init__(self, service_url,
tls_trust_certs_file_path=None,
tls_allow_insecure_connection=False,
tls_validate_hostname=False,
logger=None
logger=None,
connection_timeout_ms=10000,
):
"""
Create a new Pulsar client instance.
Expand Down Expand Up @@ -409,10 +410,13 @@ def __init__(self, service_url,
the endpoint.
* `logger`:
Set a Python logger for this Pulsar client. Should be an instance of `logging.Logger`.
* `connection_timeout_ms`:
Set timeout in milliseconds on TCP connections.
"""
_check_type(str, service_url, 'service_url')
_check_type_or_none(Authentication, authentication, 'authentication')
_check_type(int, operation_timeout_seconds, 'operation_timeout_seconds')
_check_type(int, connection_timeout_ms, 'connection_timeout_ms')
_check_type(int, io_threads, 'io_threads')
_check_type(int, message_listener_threads, 'message_listener_threads')
_check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests')
Expand All @@ -427,6 +431,7 @@ def __init__(self, service_url,
if authentication:
conf.authentication(authentication.auth)
conf.operation_timeout_seconds(operation_timeout_seconds)
conf.connection_timeout(connection_timeout_ms)
conf.io_threads(io_threads)
conf.message_listener_threads(message_listener_threads)
conf.concurrent_lookup_requests(concurrent_lookup_requests)
Expand Down
18 changes: 17 additions & 1 deletion pulsar-client-cpp/python/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
AuthenticationTLS, Authentication, AuthenticationToken, InitialPosition, \
CryptoKeyReader

from _pulsar import ProducerConfiguration, ConsumerConfiguration
from _pulsar import ProducerConfiguration, ConsumerConfiguration, ConnectError

from schema_test import *

Expand Down Expand Up @@ -1148,6 +1148,22 @@ def test_negative_acks(self):
consumer.receive(100)
client.close()

def test_connect_timeout(self):
client = pulsar.Client(
service_url='pulsar://192.0.2.1:1234',
connection_timeout_ms=1000, # 1 second
)
t1 = time.time()
try:
producer = client.create_producer('test_connect_timeout')
self.fail('create_producer should not succeed')
except ConnectError as expected:
print('expected error: {} when create producer'.format(expected))
t2 = time.time()
self.assertGreater(t2 - t1, 1.0)
self.assertLess(t2 - t1, 1.5) # 1.5 seconds is long enough
client.close()

def _check_value_error(self, fun):
with self.assertRaises(ValueError):
fun()
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/python/src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ void export_config() {
.def("authentication", &ClientConfiguration_setAuthentication, return_self<>())
.def("operation_timeout_seconds", &ClientConfiguration::getOperationTimeoutSeconds)
.def("operation_timeout_seconds", &ClientConfiguration::setOperationTimeoutSeconds, return_self<>())
.def("connection_timeout", &ClientConfiguration::getConnectionTimeout)
.def("connection_timeout", &ClientConfiguration::setConnectionTimeout, return_self<>())
.def("io_threads", &ClientConfiguration::getIOThreads)
.def("io_threads", &ClientConfiguration::setIOThreads, return_self<>())
.def("message_listener_threads", &ClientConfiguration::getMessageListenerThreads)
Expand Down
28 changes: 28 additions & 0 deletions pulsar-client-cpp/tests/ClientTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
#include <gtest/gtest.h>

#include <future>
#include <pulsar/Client.h>
#include "../lib/checksum/ChecksumProvider.h"

Expand Down Expand Up @@ -86,3 +87,30 @@ TEST(ClientTest, testServerConnectError) {
ASSERT_EQ(ResultConnectError, client.createReader(topic, MessageId::earliest(), readerConf, reader));
client.close();
}

TEST(ClientTest, testConnectTimeout) {
// 192.0.2.0/24 is assigned for documentation, should be a deadend
const std::string blackHoleBroker = "pulsar://192.0.2.1:1234";
const std::string topic = "test-connect-timeout";

Client clientLow(blackHoleBroker, ClientConfiguration().setConnectionTimeout(1000));
Client clientDefault(blackHoleBroker);

std::promise<Result> promiseLow;
clientLow.createProducerAsync(
topic, [&promiseLow](Result result, Producer producer) { promiseLow.set_value(result); });

std::promise<Result> promiseDefault;
clientDefault.createProducerAsync(
topic, [&promiseDefault](Result result, Producer producer) { promiseDefault.set_value(result); });

auto futureLow = promiseLow.get_future();
ASSERT_EQ(futureLow.wait_for(std::chrono::milliseconds(1500)), std::future_status::ready);
ASSERT_EQ(futureLow.get(), ResultConnectError);

auto futureDefault = promiseDefault.get_future();
ASSERT_EQ(futureDefault.wait_for(std::chrono::milliseconds(10)), std::future_status::timeout);

clientLow.close();
clientDefault.close();
}
Loading