From 283658796c15b5d1882dda61905534f6568d2519 Mon Sep 17 00:00:00 2001 From: Sergei Fundaev Date: Tue, 11 Jul 2023 15:56:25 +0400 Subject: [PATCH 1/3] Fix memory leak during getLastMessageId request processing. ## Motivation `ClientConnection::newGetLastMessageId` method uses `ClientConnection::sendRequestWithId` one to send request. It adds a new node into `ClientConnection::pendingRequests_` map. However `ClientConnection::handleGetLastMessageIdResponse` method does not affect this map. As a result the size of mentioned map is increased every time when `Reader::getLastMessageIdAsync` method is called. ## Modification - `ClientConnection::newGetLastMessageId` method use now `ClientConnection::sendCommand` one to send request. --- lib/ClientConnection.cc | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 828cdaf4..a2eeb6a6 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -1304,17 +1304,11 @@ Future ClientConnection::newGetLastMessageId(u lock.unlock(); LOG_ERROR(cnxString_ << " Client is not connected to the broker"); promise.setFailed(ResultNotConnected); - return promise.getFuture(); } pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId, promise)); lock.unlock(); - sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId) - .addListener([promise](Result result, const ResponseData& data) { - if (result != ResultOk) { - promise.setFailed(result); - } - }); + sendCommand(Commands::newGetLastMessageId(consumerId, requestId)); return promise.getFuture(); } From d7be7f1736c84ec0c555bd76babe6560f468b76e Mon Sep 17 00:00:00 2001 From: Sergei Fundaev Date: Fri, 21 Jul 2023 21:37:37 +0400 Subject: [PATCH 2/3] Fix memory leak during getLastMessageId request processing. ## Motivation `ClientConnection::newGetLastMessageId` method uses `ClientConnection::sendRequestWithId` one to send request. It adds a new node into `ClientConnection::pendingRequests_` map. However `ClientConnection::handleGetLastMessageIdResponse` method does not affect this map. As a result the size of mentioned map is increased every time when `Reader::getLastMessageIdAsync` method is called. ## Modification - `ClientConnection::newGetLastMessageId` method use now `ClientConnection::sendCommand` one to send request. --- lib/ClientConnection.cc | 34 ++++++++++++++++++++++++---------- lib/ClientConnection.h | 9 ++++++++- lib/GetLastMessageIdResponse.h | 7 ++++++- 3 files changed, 38 insertions(+), 12 deletions(-) diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index a2eeb6a6..2a63c7ab 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -1142,6 +1142,13 @@ void ClientConnection::handleLookupTimeout(const boost::system::error_code& ec, } } +void ClientConnection::handleGetLastMessageIdTimeout(const boost::system::error_code& ec, + ClientConnection::LastMessageIdRequestData data) { + if (!ec) { + data.promise->setFailed(ResultTimeout); + } +} + void ClientConnection::handleKeepAliveTimeout() { if (isClosed()) { return; @@ -1251,7 +1258,7 @@ void ClientConnection::close(Result result) { kv.second.setFailed(result); } for (auto& kv : pendingGetLastMessageIdRequests) { - kv.second.setFailed(result); + kv.second.promise->setFailed(result); } for (auto& kv : pendingGetNamespaceTopicsRequests) { kv.second.setFailed(result); @@ -1299,17 +1306,24 @@ Commands::ChecksumType ClientConnection::getChecksumType() const { Future ClientConnection::newGetLastMessageId(uint64_t consumerId, uint64_t requestId) { Lock lock(mutex_); - Promise promise; + auto promise = std::make_shared(); if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString_ << " Client is not connected to the broker"); - promise.setFailed(ResultNotConnected); + promise->setFailed(ResultNotConnected); + return promise->getFuture(); } - pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId, promise)); + LastMessageIdRequestData requestData; + requestData.promise = promise; + requestData.timer = executor_->createDeadlineTimer(); + requestData.timer->expires_from_now(operationsTimeout_); + requestData.timer->async_wait(std::bind(&ClientConnection::handleGetLastMessageIdTimeout, + shared_from_this(), std::placeholders::_1, requestData)); + pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId, requestData)); lock.unlock(); sendCommand(Commands::newGetLastMessageId(consumerId, requestId)); - return promise.getFuture(); + return promise->getFuture(); } Future ClientConnection::newGetTopicsOfNamespace( @@ -1629,11 +1643,11 @@ void ClientConnection::handleError(const proto::CommandError& error) { PendingGetLastMessageIdRequestsMap::iterator it = pendingGetLastMessageIdRequests_.find(error.request_id()); if (it != pendingGetLastMessageIdRequests_.end()) { - auto getLastMessageIdPromise = it->second; + auto getLastMessageIdPromise = it->second.promise; pendingGetLastMessageIdRequests_.erase(it); lock.unlock(); - getLastMessageIdPromise.setFailed(result); + getLastMessageIdPromise->setFailed(result); } else { PendingGetNamespaceTopicsMap::iterator it = pendingGetNamespaceTopicsRequests_.find(error.request_id()); @@ -1713,16 +1727,16 @@ void ClientConnection::handleGetLastMessageIdResponse( auto it = pendingGetLastMessageIdRequests_.find(getLastMessageIdResponse.request_id()); if (it != pendingGetLastMessageIdRequests_.end()) { - auto getLastMessageIdPromise = it->second; + auto getLastMessageIdPromise = it->second.promise; pendingGetLastMessageIdRequests_.erase(it); lock.unlock(); if (getLastMessageIdResponse.has_consumer_mark_delete_position()) { - getLastMessageIdPromise.setValue( + getLastMessageIdPromise->setValue( {toMessageId(getLastMessageIdResponse.last_message_id()), toMessageId(getLastMessageIdResponse.consumer_mark_delete_position())}); } else { - getLastMessageIdPromise.setValue({toMessageId(getLastMessageIdResponse.last_message_id())}); + getLastMessageIdPromise->setValue({toMessageId(getLastMessageIdResponse.last_message_id())}); } } else { lock.unlock(); diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 7de90895..38b814ce 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -201,6 +201,11 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this @@ -342,7 +349,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this> PendingConsumerStatsMap; PendingConsumerStatsMap pendingConsumerStatsMap_; - typedef std::map> PendingGetLastMessageIdRequestsMap; + typedef std::map PendingGetLastMessageIdRequestsMap; PendingGetLastMessageIdRequestsMap pendingGetLastMessageIdRequests_; typedef std::map> PendingGetNamespaceTopicsMap; diff --git a/lib/GetLastMessageIdResponse.h b/lib/GetLastMessageIdResponse.h index cee754b1..af73bd70 100644 --- a/lib/GetLastMessageIdResponse.h +++ b/lib/GetLastMessageIdResponse.h @@ -20,11 +20,16 @@ #include #include +#include "Future.h" #include namespace pulsar { +class GetLastMessageIdResponse; +typedef Promise GetLastMessageIdResponsePromise; +typedef std::shared_ptr GetLastMessageIdResponsePromisePtr; + class GetLastMessageIdResponse { friend std::ostream& operator<<(std::ostream& os, const GetLastMessageIdResponse& response) { os << "lastMessageId: " << response.lastMessageId_; @@ -52,7 +57,7 @@ class GetLastMessageIdResponse { private: MessageId lastMessageId_; MessageId markDeletePosition_; - bool hasMarkDeletePosition_; + bool hasMarkDeletePosition_ = false; }; typedef std::function BrokerGetLastMessageIdCallback; From b755adf7697f072a9c507bb9fb2c00993790cff5 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 24 Jul 2023 10:36:10 +0800 Subject: [PATCH 3/3] Fix format --- lib/GetLastMessageIdResponse.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/GetLastMessageIdResponse.h b/lib/GetLastMessageIdResponse.h index af73bd70..05c4af71 100644 --- a/lib/GetLastMessageIdResponse.h +++ b/lib/GetLastMessageIdResponse.h @@ -20,10 +20,11 @@ #include #include -#include "Future.h" #include +#include "Future.h" + namespace pulsar { class GetLastMessageIdResponse;