From 25bd34bc479eacd20f076450c285b050fc7ad525 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 8 Jan 2024 21:06:12 +0800 Subject: [PATCH] Fix the operation timeout is not honored for GetSchema requests Fixes https://github.com/apache/pulsar-client-cpp/issues/357 ### Motivation In `Client::getSchemaInfoAsync`, the underlying `LookupService::getSchema` is called, which does not register a timer according to the operation timeout. ### Modifications Register a timer and cache it and the GetSchema promise in `ClientConnection::pendingGetSchemaRequests_`, then remove the entry and complete the promise with `ResultTimeout`. To verify the timeout is honored, modify the timeout type from `int` to `chrono::nanoseconds` so that a smaller timeout can be configured. Then test a small enough timeout (1 ns) in `LookupServiceTest.testGetSchemaTimeout`. --- lib/ClientConfiguration.cc | 7 +++-- lib/ClientConfigurationImpl.h | 4 ++- lib/ClientConnection.cc | 31 ++++++++++++++++++--- lib/ClientConnection.h | 8 +++++- lib/ClientImpl.cc | 7 ++++- lib/ClientImpl.h | 2 ++ lib/Future.h | 2 +- lib/RetryableLookupService.h | 12 +++++---- lib/RetryableOperation.h | 7 ++--- lib/RetryableOperationCache.h | 9 ++++--- tests/LookupServiceTest.cc | 40 +++++++++++++++++++++++++--- tests/PulsarWrapper.h | 31 +++++++++++++++++++++ tests/RetryableOperationCacheTest.cc | 9 ++++--- 13 files changed, 140 insertions(+), 29 deletions(-) create mode 100644 tests/PulsarWrapper.h diff --git a/lib/ClientConfiguration.cc b/lib/ClientConfiguration.cc index 6e7c7456..6a91ec1d 100644 --- a/lib/ClientConfiguration.cc +++ b/lib/ClientConfiguration.cc @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ +#include #include #include "ClientConfigurationImpl.h" @@ -61,11 +62,13 @@ Authentication& ClientConfiguration::getAuth() const { return *impl_->authentica const AuthenticationPtr& ClientConfiguration::getAuthPtr() const { return impl_->authenticationPtr; } ClientConfiguration& ClientConfiguration::setOperationTimeoutSeconds(int timeout) { - impl_->operationTimeoutSeconds = timeout; + impl_->operationTimeout = std::chrono::seconds(timeout); return *this; } -int ClientConfiguration::getOperationTimeoutSeconds() const { return impl_->operationTimeoutSeconds; } +int ClientConfiguration::getOperationTimeoutSeconds() const { + return std::chrono::duration_cast(impl_->operationTimeout).count(); +} ClientConfiguration& ClientConfiguration::setIOThreads(int threads) { impl_->ioThreads = threads; diff --git a/lib/ClientConfigurationImpl.h b/lib/ClientConfigurationImpl.h index b62b97c6..08d4b6e4 100644 --- a/lib/ClientConfigurationImpl.h +++ b/lib/ClientConfigurationImpl.h @@ -21,6 +21,8 @@ #include +#include + namespace pulsar { struct ClientConfigurationImpl { @@ -28,7 +30,7 @@ struct ClientConfigurationImpl { uint64_t memoryLimit{0ull}; int ioThreads{1}; int connectionsPerBroker{1}; - int operationTimeoutSeconds{30}; + std::chrono::nanoseconds operationTimeout{30L * 1000 * 1000 * 1000}; int messageListenerThreads{1}; int concurrentLookupRequest{50000}; int maxLookupRedirects{20}; diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 82ab4928..b2916bdb 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -24,6 +24,7 @@ #include #include "AsioDefines.h" +#include "ClientImpl.h" #include "Commands.h" #include "ConnectionPool.h" #include "ConsumerImpl.h" @@ -163,7 +164,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: const ClientConfiguration& clientConfiguration, const AuthenticationPtr& authentication, const std::string& clientVersion, ConnectionPool& pool, size_t poolIndex) - : operationsTimeout_(std::chrono::seconds(clientConfiguration.getOperationTimeoutSeconds())), + : operationsTimeout_(ClientImpl::getOperationTimeout(clientConfiguration)), authentication_(authentication), serverProtocolVersion_(proto::ProtocolVersion_MIN), executor_(executor), @@ -1278,6 +1279,7 @@ void ClientConnection::close(Result result, bool detach) { auto pendingConsumerStatsMap = std::move(pendingConsumerStatsMap_); auto pendingGetLastMessageIdRequests = std::move(pendingGetLastMessageIdRequests_); auto pendingGetNamespaceTopicsRequests = std::move(pendingGetNamespaceTopicsRequests_); + auto pendingGetSchemaRequests = std::move(pendingGetSchemaRequests_); numOfPendingLookupRequest_ = 0; @@ -1342,6 +1344,9 @@ void ClientConnection::close(Result result, bool detach) { for (auto& kv : pendingGetNamespaceTopicsRequests) { kv.second.setFailed(result); } + for (auto& kv : pendingGetSchemaRequests) { + kv.second.promise.setFailed(result); + } } bool ClientConnection::isClosed() const { return state_ == Disconnected; } @@ -1430,6 +1435,7 @@ Future ClientConnection::newGetTopicsOfNamespace( Future ClientConnection::newGetSchema(const std::string& topicName, const std::string& version, uint64_t requestId) { Lock lock(mutex_); + Promise promise; if (isClosed()) { lock.unlock(); @@ -1438,8 +1444,27 @@ Future ClientConnection::newGetSchema(const std::string& top return promise.getFuture(); } - pendingGetSchemaRequests_.insert(std::make_pair(requestId, promise)); + auto timer = executor_->createDeadlineTimer(); + pendingGetSchemaRequests_.emplace(requestId, GetSchemaRequest{promise, timer}); lock.unlock(); + + auto weakSelf = weak_from_this(); + timer->expires_from_now(operationsTimeout_); + timer->async_wait([this, weakSelf, requestId](const ASIO_ERROR& ec) { + auto self = weakSelf.lock(); + if (!self) { + return; + } + Lock lock(mutex_); + auto it = pendingGetSchemaRequests_.find(requestId); + if (it != pendingGetSchemaRequests_.end()) { + auto promise = std::move(it->second.promise); + pendingGetSchemaRequests_.erase(it); + lock.unlock(); + promise.setFailed(ResultTimeout); + } + }); + sendCommand(Commands::newGetSchema(topicName, version, requestId)); return promise.getFuture(); } @@ -1867,7 +1892,7 @@ void ClientConnection::handleGetSchemaResponse(const proto::CommandGetSchemaResp Lock lock(mutex_); auto it = pendingGetSchemaRequests_.find(response.request_id()); if (it != pendingGetSchemaRequests_.end()) { - Promise getSchemaPromise = it->second; + Promise getSchemaPromise = it->second.promise; pendingGetSchemaRequests_.erase(it); lock.unlock(); diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 69155fdd..1d44f058 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -42,6 +42,7 @@ #include #include #include +#include #include #include "AsioTimer.h" @@ -224,6 +225,11 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this promise; + DeadlineTimerPtr timer; + }; + /* * handler for connectAsync * creates a ConnectionPtr which has a valid ClientConnection object @@ -363,7 +369,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this> PendingGetNamespaceTopicsMap; PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_; - typedef std::map> PendingGetSchemaMap; + typedef std::unordered_map PendingGetSchemaMap; PendingGetSchemaMap pendingGetSchemaRequests_; mutable std::mutex mutex_; diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index 2cfbd387..76d4389e 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -21,6 +21,7 @@ #include #include +#include #include #include @@ -109,7 +110,7 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& } lookupServicePtr_ = RetryableLookupService::create( - underlyingLookupServicePtr, clientConfiguration_.getOperationTimeoutSeconds(), ioExecutorProvider_); + underlyingLookupServicePtr, clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_); } ClientImpl::~ClientImpl() { shutdown(); } @@ -768,4 +769,8 @@ std::string ClientImpl::getClientVersion(const ClientConfiguration& clientConfig return oss.str(); } +std::chrono::nanoseconds ClientImpl::getOperationTimeout(const ClientConfiguration& clientConfiguration) { + return clientConfiguration.impl_->operationTimeout; +} + } /* namespace pulsar */ diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index f7b5a891..762aa60f 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -125,6 +125,8 @@ class ClientImpl : public std::enable_shared_from_this { ConnectionPool& getConnectionPool() noexcept { return pool_; } + static std::chrono::nanoseconds getOperationTimeout(const ClientConfiguration& clientConfiguration); + friend class PulsarFriend; private: diff --git a/lib/Future.h b/lib/Future.h index 290ebc6f..69db74a3 100644 --- a/lib/Future.h +++ b/lib/Future.h @@ -141,7 +141,7 @@ class Promise { Future getFuture() const { return Future{state_}; } private: - const InternalStatePtr state_; + InternalStatePtr state_; }; } // namespace pulsar diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h index b8e3e0d7..561855f9 100644 --- a/lib/RetryableLookupService.h +++ b/lib/RetryableLookupService.h @@ -18,6 +18,8 @@ */ #pragma once +#include + #include "LookupDataResult.h" #include "LookupService.h" #include "NamespaceName.h" @@ -81,15 +83,15 @@ class RetryableLookupService : public LookupService { RetryableOperationCachePtr namespaceLookupCache_; RetryableOperationCachePtr getSchemaCache_; - RetryableLookupService(std::shared_ptr lookupService, int timeoutSeconds, + RetryableLookupService(std::shared_ptr lookupService, TimeDuration timeout, ExecutorServiceProviderPtr executorProvider) : lookupService_(lookupService), - lookupCache_(RetryableOperationCache::create(executorProvider, timeoutSeconds)), + lookupCache_(RetryableOperationCache::create(executorProvider, timeout)), partitionLookupCache_( - RetryableOperationCache::create(executorProvider, timeoutSeconds)), + RetryableOperationCache::create(executorProvider, timeout)), namespaceLookupCache_( - RetryableOperationCache::create(executorProvider, timeoutSeconds)), - getSchemaCache_(RetryableOperationCache::create(executorProvider, timeoutSeconds)) {} + RetryableOperationCache::create(executorProvider, timeout)), + getSchemaCache_(RetryableOperationCache::create(executorProvider, timeout)) {} }; } // namespace pulsar diff --git a/lib/RetryableOperation.h b/lib/RetryableOperation.h index 9c920da1..dba190f4 100644 --- a/lib/RetryableOperation.h +++ b/lib/RetryableOperation.h @@ -22,6 +22,7 @@ #include #include +#include #include #include @@ -40,11 +41,11 @@ class RetryableOperation : public std::enable_shared_from_this()>&& func, int timeoutSeconds, - DeadlineTimerPtr timer) + RetryableOperation(const std::string& name, std::function()>&& func, + TimeDuration timeout, DeadlineTimerPtr timer) : name_(name), func_(std::move(func)), - timeout_(std::chrono::seconds(timeoutSeconds)), + timeout_(timeout), backoff_(std::chrono::milliseconds(100), timeout_ + timeout_, std::chrono::milliseconds(0)), timer_(timer) {} diff --git a/lib/RetryableOperationCache.h b/lib/RetryableOperationCache.h index 70fa9140..e42460dd 100644 --- a/lib/RetryableOperationCache.h +++ b/lib/RetryableOperationCache.h @@ -18,6 +18,7 @@ */ #pragma once +#include #include #include @@ -40,8 +41,8 @@ class RetryableOperationCache : public std::enable_shared_from_this; @@ -69,7 +70,7 @@ class RetryableOperationCache : public std::enable_shared_from_this::create(key, std::move(func), timeoutSeconds_, timer); + auto operation = RetryableOperation::create(key, std::move(func), timeout_, timer); auto future = operation->run(); operations_[key] = operation; lock.unlock(); @@ -106,7 +107,7 @@ class RetryableOperationCache : public std::enable_shared_from_this>> operations_; mutable std::mutex mutex_; diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc index 3457bd4d..0fe13851 100644 --- a/tests/LookupServiceTest.cc +++ b/tests/LookupServiceTest.cc @@ -22,11 +22,14 @@ #include #include +#include #include +#include #include #include "HttpHelper.h" #include "PulsarFriend.h" +#include "PulsarWrapper.h" #include "lib/BinaryProtoLookupService.h" #include "lib/ClientConnection.h" #include "lib/ConnectionPool.h" @@ -48,7 +51,10 @@ namespace pulsar { class LookupServiceTest : public ::testing::TestWithParam { public: - void SetUp() override { client_ = Client{GetParam()}; } + void SetUp() override { + serviceUrl_ = GetParam(); + client_ = Client{serviceUrl_}; + } void TearDown() override { client_.close(); } template @@ -63,6 +69,7 @@ class LookupServiceTest : public ::testing::TestWithParam { } protected: + std::string serviceUrl_; Client client_{httpLookupUrl}; }; @@ -159,7 +166,7 @@ TEST(LookupServiceTest, testRetry) { ClientConfiguration conf; auto lookupService = RetryableLookupService::create( - std::make_shared(serviceNameResolver, pool, conf), 30 /* seconds */, + std::make_shared(serviceNameResolver, pool, conf), std::chrono::seconds(30), executorProvider); PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0); @@ -194,8 +201,8 @@ TEST(LookupServiceTest, testTimeout) { constexpr int timeoutInSeconds = 2; auto lookupService = RetryableLookupService::create( - std::make_shared(serviceNameResolver, pool, conf), timeoutInSeconds, - executorProvider); + std::make_shared(serviceNameResolver, pool, conf), + std::chrono::seconds(timeoutInSeconds), executorProvider); auto topicNamePtr = TopicName::get("lookup-service-test-retry"); decltype(std::chrono::high_resolution_clock::now()) startTime; @@ -431,6 +438,31 @@ TEST_P(LookupServiceTest, testGetSchemaByVersion) { producer2.close(); } +TEST_P(LookupServiceTest, testGetSchemaTimeout) { + if (serviceUrl_.find("pulsar://") == std::string::npos) { + // HTTP request timeout can only be configured with seconds + return; + } + const auto topic = "lookup-service-test-get-schema-timeout"; + Producer producer; + ProducerConfiguration producerConf; + producerConf.setSchema(SchemaInfo(STRING, "", "")); + ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConf, producer)); + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg").build())); + client_.close(); + + ClientConfiguration conf; + PulsarWrapper::deref(conf).operationTimeout = std::chrono::nanoseconds(1); + client_ = Client{serviceUrl_, conf}; + auto promise = std::make_shared>(); + client_.getSchemaInfoAsync(topic, 0L, + [promise](Result result, const SchemaInfo&) { promise->set_value(result); }); + auto future = promise->get_future(); + ASSERT_EQ(future.wait_for(std::chrono::milliseconds(100)), std::future_status::ready); + ASSERT_EQ(future.get(), ResultTimeout); + client_.close(); +} + INSTANTIATE_TEST_SUITE_P(Pulsar, LookupServiceTest, ::testing::Values(binaryLookupUrl, httpLookupUrl)); class BinaryProtoLookupServiceRedirectTestHelper : public BinaryProtoLookupService { diff --git a/tests/PulsarWrapper.h b/tests/PulsarWrapper.h new file mode 100644 index 00000000..87626e1c --- /dev/null +++ b/tests/PulsarWrapper.h @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include "lib/ClientConfigurationImpl.h" +#include "pulsar/ClientConfiguration.h" + +namespace pulsar { + +class PulsarWrapper { + public: + static ClientConfigurationImpl& deref(ClientConfiguration conf) { return *conf.impl_; } +}; + +} // namespace pulsar diff --git a/tests/RetryableOperationCacheTest.cc b/tests/RetryableOperationCacheTest.cc index ea1eb695..2a6948e3 100644 --- a/tests/RetryableOperationCacheTest.cc +++ b/tests/RetryableOperationCacheTest.cc @@ -19,6 +19,7 @@ #include #include +#include #include #include "lib/RetryableOperationCache.h" @@ -82,7 +83,7 @@ class RetryableOperationCacheTest : public ::testing::Test { using namespace pulsar; TEST_F(RetryableOperationCacheTest, testRetry) { - auto cache = RetryableOperationCache::create(provider_, 30 /* seconds */); + auto cache = RetryableOperationCache::create(provider_, std::chrono::seconds(30)); for (int i = 0; i < 10; i++) { futures_.emplace_back(cache->run("key-" + std::to_string(i), CountdownFunc{i * 100})); } @@ -94,7 +95,7 @@ TEST_F(RetryableOperationCacheTest, testRetry) { } TEST_F(RetryableOperationCacheTest, testCache) { - auto cache = RetryableOperationCache::create(provider_, 30 /* seconds */); + auto cache = RetryableOperationCache::create(provider_, std::chrono::seconds(30)); constexpr int numKeys = 5; for (int i = 0; i < 100; i++) { futures_.emplace_back(cache->run("key-" + std::to_string(i % numKeys), CountdownFunc{i * 100})); @@ -107,7 +108,7 @@ TEST_F(RetryableOperationCacheTest, testCache) { } TEST_F(RetryableOperationCacheTest, testTimeout) { - auto cache = RetryableOperationCache::create(provider_, 1 /* seconds */); + auto cache = RetryableOperationCache::create(provider_, std::chrono::seconds(1)); auto future = cache->run("key", CountdownFunc{0, 1000 /* retry count */}); try { wait(future); @@ -118,7 +119,7 @@ TEST_F(RetryableOperationCacheTest, testTimeout) { } TEST_F(RetryableOperationCacheTest, testClear) { - auto cache = RetryableOperationCache::create(provider_, 30 /* seconds */); + auto cache = RetryableOperationCache::create(provider_, std::chrono::seconds(30)); for (int i = 0; i < 10; i++) { futures_.emplace_back(cache->run("key-" + std::to_string(i), CountdownFunc{100})); }