Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions pulsar-client-cpp/lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
consumerStatsRequestTimer_(executor_->createDeadlineTimer()),
numOfPendingLookupRequest_(0),
isTlsAllowInsecureConnection_(false) {
LOG_INFO(cnxString_ << "Create ClientConnection, timeout=" << clientConfiguration.getConnectionTimeout());
if (clientConfiguration.isUseTls()) {
#if BOOST_VERSION >= 105400
boost::asio::ssl::context ctx(boost::asio::ssl::context::tlsv12_client);
Expand Down Expand Up @@ -433,21 +434,28 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
handleHandshake(boost::system::errc::make_error_code(boost::system::errc::success));
}
} else if (endpointIterator != tcp::resolver::iterator()) {
LOG_WARN(cnxString_ << "Failed to establish connection: " << err.message());
// The connection failed. Try the next endpoint in the list.
boost::system::error_code err;
socket_->close(err); // ignore the error of close
if (err) {
boost::system::error_code closeError;
socket_->close(closeError); // ignore the error of close
if (closeError) {
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
}
connectTimeoutTask_->stop();
connectTimeoutTask_->start();
tcp::endpoint endpoint = *endpointIterator;
socket_->async_connect(endpoint, std::bind(&ClientConnection::handleTcpConnected, shared_from_this(),
std::placeholders::_1, ++endpointIterator));
++endpointIterator;
if (endpointIterator != tcp::resolver::iterator()) {
LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
connectTimeoutTask_->start();
tcp::endpoint endpoint = *endpointIterator;
socket_->async_connect(endpoint,
std::bind(&ClientConnection::handleTcpConnected, shared_from_this(),
std::placeholders::_1, ++endpointIterator));
} else {
close();
}
} else {
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
close();
return;
}
}

Expand Down Expand Up @@ -512,7 +520,7 @@ void ClientConnection::tcpConnectAsync() {
return;
}

LOG_DEBUG(cnxString_ << "Connecting to " << service_url.host() << ":" << service_url.port());
LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << service_url.port());
tcp::resolver::query query(service_url.host(), std::to_string(service_url.port()));
resolver_->async_resolve(query, std::bind(&ClientConnection::handleResolve, shared_from_this(),
std::placeholders::_1, std::placeholders::_2));
Expand All @@ -531,12 +539,16 @@ void ClientConnection::handleResolve(const boost::system::error_code& err,
if (state_ != TcpConnected) {
LOG_ERROR(cnxString_ << "Connection was not established in " << connectTimeoutTask_->getPeriodMs()
<< " ms, close the socket");
PeriodicTask::ErrorCode ignoredError;
socket_->close(ignoredError);
PeriodicTask::ErrorCode err;
socket_->close(err);
if (err) {
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
}
}
connectTimeoutTask_->stop();
});

LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
connectTimeoutTask_->start();
if (endpointIterator != tcp::resolver::iterator()) {
LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name() //
Expand Down Expand Up @@ -1455,7 +1467,10 @@ void ClientConnection::close() {
}

if (tlsSocket_) {
tlsSocket_->lowest_layer().close();
tlsSocket_->lowest_layer().close(err);
if (err) {
LOG_WARN(cnxString_ << "Failed to close TLS socket: " << err.message());
}
}

if (executor_) {
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ void ClientImpl::closeAsync(CloseCallback callback) {
}

if (*numberOfOpenHandlers == 0 && callback) {
callback(ResultOk);
handleClose(ResultOk, numberOfOpenHandlers, callback);
}
}

Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/ConnectionPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void ConnectionPool::close() {
if (poolConnections_) {
for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
ClientConnectionPtr cnx = cnxIt->second.lock();
if (cnx && !cnx->isClosed()) {
if (cnx) {
cnx->close();
}
}
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/HTTPLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ Future<Result, LookupDataResultPtr> HTTPLookupService::getPartitionMetadataAsync
<< '/' << PARTITION_METHOD_NAME;
}

completeUrlStream << "?checkAllowAutoCreation=true";
executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleLookupHTTPRequest,
shared_from_this(), promise, completeUrlStream.str(),
PartitionMetaData));
Expand Down
3 changes: 3 additions & 0 deletions pulsar-client-cpp/python/pulsar/schema/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ def __eq__(self, other):
return False
return True

def __ne__(self, other):
return not self.__eq__(other)

def __str__(self):
return str(self.__dict__)

Expand Down
34 changes: 22 additions & 12 deletions pulsar-client-cpp/python/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
from unittest import TestCase, main
import time
import os
import pulsar
import uuid
from datetime import timedelta
from pulsar import Client, MessageId, \
CompressionType, ConsumerType, PartitionsRoutingMode, \
AuthenticationTLS, Authentication, AuthenticationToken, InitialPosition, \
CryptoKeyReader

from _pulsar import ProducerConfiguration, ConsumerConfiguration, ConnectError
from _pulsar import ProducerConfiguration, ConsumerConfiguration

from schema_test import *

Expand Down Expand Up @@ -155,6 +156,7 @@ def test_producer_send(self):
consumer.acknowledge(msg)
print('receive from {}'.format(msg.message_id()))
self.assertEqual(msg_id, msg.message_id())
client.close()

def test_producer_consumer(self):
client = Client(self.serviceUrl)
Expand Down Expand Up @@ -292,7 +294,7 @@ def test_message_properties(self):
subscription_name='my-subscription',
schema=pulsar.schema.StringSchema())
producer = client.create_producer(topic=topic,
schema=StringSchema())
schema=pulsar.schema.StringSchema())
producer.send('hello',
properties={
'a': '1',
Expand All @@ -319,10 +321,11 @@ def test_tls_auth(self):
tls_allow_insecure_connection=False,
authentication=AuthenticationTLS(certs_dir + 'client-cert.pem', certs_dir + 'client-key.pem'))

consumer = client.subscribe('my-python-topic-tls-auth',
topic = 'my-python-topic-tls-auth-' + str(time.time())
consumer = client.subscribe(topic,
'my-sub',
consumer_type=ConsumerType.Shared)
producer = client.create_producer('my-python-topic-tls-auth')
producer = client.create_producer(topic)
producer.send(b'hello')

msg = consumer.receive(TM)
Expand All @@ -346,10 +349,11 @@ def test_tls_auth2(self):
tls_allow_insecure_connection=False,
authentication=Authentication(authPlugin, authParams))

consumer = client.subscribe('my-python-topic-tls-auth-2',
topic = 'my-python-topic-tls-auth-2-' + str(time.time())
consumer = client.subscribe(topic,
'my-sub',
consumer_type=ConsumerType.Shared)
producer = client.create_producer('my-python-topic-tls-auth-2')
producer = client.create_producer(topic)
producer.send(b'hello')

msg = consumer.receive(TM)
Expand Down Expand Up @@ -392,10 +396,11 @@ def test_tls_auth3(self):
tls_allow_insecure_connection=False,
authentication=Authentication(authPlugin, authParams))

consumer = client.subscribe('my-python-topic-tls-auth-3',
topic = 'my-python-topic-tls-auth-3-' + str(time.time())
consumer = client.subscribe(topic,
'my-sub',
consumer_type=ConsumerType.Shared)
producer = client.create_producer('my-python-topic-tls-auth-3')
producer = client.create_producer(topic)
producer.send(b'hello')

msg = consumer.receive(TM)
Expand Down Expand Up @@ -583,6 +588,8 @@ def test_producer_sequence_after_reconnection(self):
producer.send(b'hello-%d' % i)
self.assertEqual(producer.last_sequence_id(), i)

client.close()

doHttpPost(self.adminUrl + '/admin/v2/namespaces/public/default/deduplication',
'false')

Expand Down Expand Up @@ -630,6 +637,8 @@ def test_producer_deduplication(self):
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)

client.close()

doHttpPost(self.adminUrl + '/admin/v2/namespaces/public/default/deduplication',
'false')

Expand Down Expand Up @@ -820,10 +829,11 @@ def test_reader_has_message_available(self):

def test_seek(self):
client = Client(self.serviceUrl)
consumer = client.subscribe('my-python-topic-seek',
topic = 'my-python-topic-seek-' + str(time.time())
consumer = client.subscribe(topic,
'my-sub',
consumer_type=ConsumerType.Shared)
producer = client.create_producer('my-python-topic-seek')
producer = client.create_producer(topic)

for i in range(100):
if i > 0:
Expand Down Expand Up @@ -858,7 +868,7 @@ def test_seek(self):
self.assertEqual(msg.data(), b'hello-42')

# repeat with reader
reader = client.create_reader('my-python-topic-seek', MessageId.latest)
reader = client.create_reader(topic, MessageId.latest)
with self.assertRaises(pulsar.Timeout):
reader.read_next(100)

Expand Down Expand Up @@ -1157,7 +1167,7 @@ def test_connect_timeout(self):
try:
producer = client.create_producer('test_connect_timeout')
self.fail('create_producer should not succeed')
except ConnectError as expected:
except pulsar.ConnectError as expected:
print('expected error: {} when create producer'.format(expected))
t2 = time.time()
self.assertGreater(t2 - t1, 1.0)
Expand Down
3 changes: 3 additions & 0 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ TEST(BasicEndToEndTest, testLookupThrottling) {
std::string topicName = "testLookupThrottling";
ClientConfiguration config;
config.setConcurrentLookupRequest(0);
config.setLogger(new ConsoleLoggerFactory(Logger::LEVEL_DEBUG));
Client client(lookupUrl, config);

Producer producer;
Expand All @@ -307,6 +308,8 @@ TEST(BasicEndToEndTest, testLookupThrottling) {
Consumer consumer1;
result = client.subscribe(topicName, "my-sub-name", consumer1);
ASSERT_EQ(ResultTooManyLookupRequestException, result);

client.close();
}

TEST(BasicEndToEndTest, testNonExistingTopic) {
Expand Down
4 changes: 2 additions & 2 deletions pulsar-client-cpp/tests/CustomLoggerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ TEST(CustomLoggerTest, testCustomLogger) {
// reset to previous log factory
Client client("pulsar://localhost:6650", clientConfig);
client.close();
ASSERT_EQ(logLines.size(), 2);
ASSERT_EQ(logLines.size(), 3);
LogUtils::resetLoggerFactory();
});
testThread.join();
Expand All @@ -65,7 +65,7 @@ TEST(CustomLoggerTest, testCustomLogger) {
Client client("pulsar://localhost:6650", clientConfig);
client.close();
// custom logger didn't get any new lines
ASSERT_EQ(logLines.size(), 2);
ASSERT_EQ(logLines.size(), 3);
}

TEST(CustomLoggerTest, testConsoleLoggerFactory) {
Expand Down