From b8ea69cf505692ef88c6a05ddbbf7df55707f4b6 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 4 Aug 2021 12:56:19 +0800 Subject: [PATCH 1/7] Allow HTTP lookup to create topics automatically --- pulsar-client-cpp/lib/HTTPLookupService.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc b/pulsar-client-cpp/lib/HTTPLookupService.cc index 72b7a443c5d74..e881ac4144a58 100644 --- a/pulsar-client-cpp/lib/HTTPLookupService.cc +++ b/pulsar-client-cpp/lib/HTTPLookupService.cc @@ -106,6 +106,7 @@ Future HTTPLookupService::getPartitionMetadataAsync << '/' << PARTITION_METHOD_NAME; } + completeUrlStream << "?checkAllowAutoCreation=true"; executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleLookupHTTPRequest, shared_from_this(), promise, completeUrlStream.str(), PartitionMetaData)); From 07e5daefacb66219be57bafa9cb5cb2dcb755790 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 4 Aug 2021 15:31:43 +0800 Subject: [PATCH 2/7] Call shutdown() before Client::close() completed --- pulsar-client-cpp/lib/ClientConnection.cc | 5 ++++- pulsar-client-cpp/lib/ClientImpl.cc | 2 +- pulsar-client-cpp/lib/ConnectionPool.cc | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index 3471c75c009ea..326414f9370cb 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -1455,7 +1455,10 @@ void ClientConnection::close() { } if (tlsSocket_) { - tlsSocket_->lowest_layer().close(); + tlsSocket_->lowest_layer().close(err); + if (err) { + LOG_WARN(cnxString_ << "Failed to close TLS socket: " << err.message()); + } } if (executor_) { diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc index d6663e834bb50..e051d76ee177e 100644 --- a/pulsar-client-cpp/lib/ClientImpl.cc +++ b/pulsar-client-cpp/lib/ClientImpl.cc @@ -514,7 +514,7 @@ void ClientImpl::closeAsync(CloseCallback callback) { } if (*numberOfOpenHandlers == 0 && callback) { - callback(ResultOk); + handleClose(ResultOk, numberOfOpenHandlers, callback); } } diff --git a/pulsar-client-cpp/lib/ConnectionPool.cc b/pulsar-client-cpp/lib/ConnectionPool.cc index cc5668b64e222..bb4f5c2d46753 100644 --- a/pulsar-client-cpp/lib/ConnectionPool.cc +++ b/pulsar-client-cpp/lib/ConnectionPool.cc @@ -46,7 +46,7 @@ void ConnectionPool::close() { if (poolConnections_) { for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) { ClientConnectionPtr cnx = cnxIt->second.lock(); - if (cnx && !cnx->isClosed()) { + if (cnx) { cnx->close(); } } From b9b0d0172de1db70bf856d08130df0f4be9b5b41 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 4 Aug 2021 17:43:25 +0800 Subject: [PATCH 3/7] Fix testConnectTimeout --- pulsar-client-cpp/lib/ClientConnection.cc | 34 +++++++++++++++-------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index 326414f9370cb..47ab4f79bd0dc 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -187,6 +187,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: consumerStatsRequestTimer_(executor_->createDeadlineTimer()), numOfPendingLookupRequest_(0), isTlsAllowInsecureConnection_(false) { + LOG_INFO(cnxString_ << "Create ClientConnection, timeout=" << clientConfiguration.getConnectionTimeout()); if (clientConfiguration.isUseTls()) { #if BOOST_VERSION >= 105400 boost::asio::ssl::context ctx(boost::asio::ssl::context::tlsv12_client); @@ -433,21 +434,28 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err, handleHandshake(boost::system::errc::make_error_code(boost::system::errc::success)); } } else if (endpointIterator != tcp::resolver::iterator()) { + LOG_WARN(cnxString_ << "Failed to establish connection: " << err.message()); // The connection failed. Try the next endpoint in the list. - boost::system::error_code err; - socket_->close(err); // ignore the error of close - if (err) { + boost::system::error_code closeError; + socket_->close(closeError); // ignore the error of close + if (closeError) { LOG_WARN(cnxString_ << "Failed to close socket: " << err.message()); } connectTimeoutTask_->stop(); - connectTimeoutTask_->start(); - tcp::endpoint endpoint = *endpointIterator; - socket_->async_connect(endpoint, std::bind(&ClientConnection::handleTcpConnected, shared_from_this(), - std::placeholders::_1, ++endpointIterator)); + ++endpointIterator; + if (endpointIterator != tcp::resolver::iterator()) { + LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "..."); + connectTimeoutTask_->start(); + tcp::endpoint endpoint = *endpointIterator; + socket_->async_connect(endpoint, + std::bind(&ClientConnection::handleTcpConnected, shared_from_this(), + std::placeholders::_1, ++endpointIterator)); + } else { + close(); + } } else { LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message()); close(); - return; } } @@ -512,7 +520,7 @@ void ClientConnection::tcpConnectAsync() { return; } - LOG_DEBUG(cnxString_ << "Connecting to " << service_url.host() << ":" << service_url.port()); + LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << service_url.port()); tcp::resolver::query query(service_url.host(), std::to_string(service_url.port())); resolver_->async_resolve(query, std::bind(&ClientConnection::handleResolve, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); @@ -531,12 +539,16 @@ void ClientConnection::handleResolve(const boost::system::error_code& err, if (state_ != TcpConnected) { LOG_ERROR(cnxString_ << "Connection was not established in " << connectTimeoutTask_->getPeriodMs() << " ms, close the socket"); - PeriodicTask::ErrorCode ignoredError; - socket_->close(ignoredError); + PeriodicTask::ErrorCode err; + socket_->close(err); + if (err) { + LOG_WARN(cnxString_ << "Failed to close socket: " << err.message()); + } } connectTimeoutTask_->stop(); }); + LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "..."); connectTimeoutTask_->start(); if (endpointIterator != tcp::resolver::iterator()) { LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name() // From 66ae900a49d405ff756e06c39c57d7645f0cbf87 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 4 Aug 2021 18:36:28 +0800 Subject: [PATCH 4/7] Fix other tests --- pulsar-client-cpp/tests/BasicEndToEndTest.cc | 3 +++ pulsar-client-cpp/tests/CustomLoggerTest.cc | 6 ++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index 2f8524e4bfd81..8e7eb6b3300df 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -298,6 +298,7 @@ TEST(BasicEndToEndTest, testLookupThrottling) { std::string topicName = "testLookupThrottling"; ClientConfiguration config; config.setConcurrentLookupRequest(0); + config.setLogger(new ConsoleLoggerFactory(Logger::LEVEL_DEBUG)); Client client(lookupUrl, config); Producer producer; @@ -307,6 +308,8 @@ TEST(BasicEndToEndTest, testLookupThrottling) { Consumer consumer1; result = client.subscribe(topicName, "my-sub-name", consumer1); ASSERT_EQ(ResultTooManyLookupRequestException, result); + + client.close(); } TEST(BasicEndToEndTest, testNonExistingTopic) { diff --git a/pulsar-client-cpp/tests/CustomLoggerTest.cc b/pulsar-client-cpp/tests/CustomLoggerTest.cc index f2a97d1978675..a45dd6cb27120 100644 --- a/pulsar-client-cpp/tests/CustomLoggerTest.cc +++ b/pulsar-client-cpp/tests/CustomLoggerTest.cc @@ -22,6 +22,8 @@ #include #include +DECLARE_LOG_OBJECT() + using namespace pulsar; static std::vector logLines; @@ -56,7 +58,7 @@ TEST(CustomLoggerTest, testCustomLogger) { // reset to previous log factory Client client("pulsar://localhost:6650", clientConfig); client.close(); - ASSERT_EQ(logLines.size(), 2); + ASSERT_EQ(logLines.size(), 3); LogUtils::resetLoggerFactory(); }); testThread.join(); @@ -65,7 +67,7 @@ TEST(CustomLoggerTest, testCustomLogger) { Client client("pulsar://localhost:6650", clientConfig); client.close(); // custom logger didn't get any new lines - ASSERT_EQ(logLines.size(), 2); + ASSERT_EQ(logLines.size(), 3); } TEST(CustomLoggerTest, testConsoleLoggerFactory) { From ab6db1260eb969808355f31de5c7df0a2bc3bf9e Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 4 Aug 2021 19:15:55 +0800 Subject: [PATCH 5/7] Remove redundant macro --- pulsar-client-cpp/tests/CustomLoggerTest.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar-client-cpp/tests/CustomLoggerTest.cc b/pulsar-client-cpp/tests/CustomLoggerTest.cc index a45dd6cb27120..ec83e42d08614 100644 --- a/pulsar-client-cpp/tests/CustomLoggerTest.cc +++ b/pulsar-client-cpp/tests/CustomLoggerTest.cc @@ -22,8 +22,6 @@ #include #include -DECLARE_LOG_OBJECT() - using namespace pulsar; static std::vector logLines; From e98c323576bffcfdd38ef8bf38a360fc279c9d92 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sat, 7 Aug 2021 02:38:48 +0800 Subject: [PATCH 6/7] Fix failed Python tests of PulsarTest --- pulsar-client-cpp/python/pulsar_test.py | 34 ++++++++++++++++--------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py index fc17c72f03711..58107456f44a0 100755 --- a/pulsar-client-cpp/python/pulsar_test.py +++ b/pulsar-client-cpp/python/pulsar_test.py @@ -23,6 +23,7 @@ from unittest import TestCase, main import time import os +import pulsar import uuid from datetime import timedelta from pulsar import Client, MessageId, \ @@ -30,7 +31,7 @@ AuthenticationTLS, Authentication, AuthenticationToken, InitialPosition, \ CryptoKeyReader -from _pulsar import ProducerConfiguration, ConsumerConfiguration, ConnectError +from _pulsar import ProducerConfiguration, ConsumerConfiguration from schema_test import * @@ -155,6 +156,7 @@ def test_producer_send(self): consumer.acknowledge(msg) print('receive from {}'.format(msg.message_id())) self.assertEqual(msg_id, msg.message_id()) + client.close() def test_producer_consumer(self): client = Client(self.serviceUrl) @@ -292,7 +294,7 @@ def test_message_properties(self): subscription_name='my-subscription', schema=pulsar.schema.StringSchema()) producer = client.create_producer(topic=topic, - schema=StringSchema()) + schema=pulsar.schema.StringSchema()) producer.send('hello', properties={ 'a': '1', @@ -319,10 +321,11 @@ def test_tls_auth(self): tls_allow_insecure_connection=False, authentication=AuthenticationTLS(certs_dir + 'client-cert.pem', certs_dir + 'client-key.pem')) - consumer = client.subscribe('my-python-topic-tls-auth', + topic = 'my-python-topic-tls-auth-' + str(time.time()) + consumer = client.subscribe(topic, 'my-sub', consumer_type=ConsumerType.Shared) - producer = client.create_producer('my-python-topic-tls-auth') + producer = client.create_producer(topic) producer.send(b'hello') msg = consumer.receive(TM) @@ -346,10 +349,11 @@ def test_tls_auth2(self): tls_allow_insecure_connection=False, authentication=Authentication(authPlugin, authParams)) - consumer = client.subscribe('my-python-topic-tls-auth-2', + topic = 'my-python-topic-tls-auth-2-' + str(time.time()) + consumer = client.subscribe(topic, 'my-sub', consumer_type=ConsumerType.Shared) - producer = client.create_producer('my-python-topic-tls-auth-2') + producer = client.create_producer(topic) producer.send(b'hello') msg = consumer.receive(TM) @@ -392,10 +396,11 @@ def test_tls_auth3(self): tls_allow_insecure_connection=False, authentication=Authentication(authPlugin, authParams)) - consumer = client.subscribe('my-python-topic-tls-auth-3', + topic = 'my-python-topic-tls-auth-3-' + str(time.time()) + consumer = client.subscribe(topic, 'my-sub', consumer_type=ConsumerType.Shared) - producer = client.create_producer('my-python-topic-tls-auth-3') + producer = client.create_producer(topic) producer.send(b'hello') msg = consumer.receive(TM) @@ -583,6 +588,8 @@ def test_producer_sequence_after_reconnection(self): producer.send(b'hello-%d' % i) self.assertEqual(producer.last_sequence_id(), i) + client.close() + doHttpPost(self.adminUrl + '/admin/v2/namespaces/public/default/deduplication', 'false') @@ -630,6 +637,8 @@ def test_producer_deduplication(self): with self.assertRaises(pulsar.Timeout): consumer.receive(100) + client.close() + doHttpPost(self.adminUrl + '/admin/v2/namespaces/public/default/deduplication', 'false') @@ -820,10 +829,11 @@ def test_reader_has_message_available(self): def test_seek(self): client = Client(self.serviceUrl) - consumer = client.subscribe('my-python-topic-seek', + topic = 'my-python-topic-seek-' + str(time.time()) + consumer = client.subscribe(topic, 'my-sub', consumer_type=ConsumerType.Shared) - producer = client.create_producer('my-python-topic-seek') + producer = client.create_producer(topic) for i in range(100): if i > 0: @@ -858,7 +868,7 @@ def test_seek(self): self.assertEqual(msg.data(), b'hello-42') # repeat with reader - reader = client.create_reader('my-python-topic-seek', MessageId.latest) + reader = client.create_reader(topic, MessageId.latest) with self.assertRaises(pulsar.Timeout): reader.read_next(100) @@ -1157,7 +1167,7 @@ def test_connect_timeout(self): try: producer = client.create_producer('test_connect_timeout') self.fail('create_producer should not succeed') - except ConnectError as expected: + except pulsar.ConnectError as expected: print('expected error: {} when create producer'.format(expected)) t2 = time.time() self.assertGreater(t2 - t1, 1.0) From 24fa483051d458caab3a1aa01d23e75866775c0d Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sat, 7 Aug 2021 18:46:44 +0800 Subject: [PATCH 7/7] Fix Python2 ComplexRecord equal error --- pulsar-client-cpp/python/pulsar/schema/definition.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py b/pulsar-client-cpp/python/pulsar/schema/definition.py index dcb2d835a8061..41c094dcd215f 100644 --- a/pulsar-client-cpp/python/pulsar/schema/definition.py +++ b/pulsar-client-cpp/python/pulsar/schema/definition.py @@ -139,6 +139,9 @@ def __eq__(self, other): return False return True + def __ne__(self, other): + return not self.__eq__(other) + def __str__(self): return str(self.__dict__)