diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 828cdaf4..2a63c7ab 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -1142,6 +1142,13 @@ void ClientConnection::handleLookupTimeout(const boost::system::error_code& ec, } } +void ClientConnection::handleGetLastMessageIdTimeout(const boost::system::error_code& ec, + ClientConnection::LastMessageIdRequestData data) { + if (!ec) { + data.promise->setFailed(ResultTimeout); + } +} + void ClientConnection::handleKeepAliveTimeout() { if (isClosed()) { return; @@ -1251,7 +1258,7 @@ void ClientConnection::close(Result result) { kv.second.setFailed(result); } for (auto& kv : pendingGetLastMessageIdRequests) { - kv.second.setFailed(result); + kv.second.promise->setFailed(result); } for (auto& kv : pendingGetNamespaceTopicsRequests) { kv.second.setFailed(result); @@ -1299,23 +1306,24 @@ Commands::ChecksumType ClientConnection::getChecksumType() const { Future ClientConnection::newGetLastMessageId(uint64_t consumerId, uint64_t requestId) { Lock lock(mutex_); - Promise promise; + auto promise = std::make_shared(); if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString_ << " Client is not connected to the broker"); - promise.setFailed(ResultNotConnected); - return promise.getFuture(); + promise->setFailed(ResultNotConnected); + return promise->getFuture(); } - pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId, promise)); + LastMessageIdRequestData requestData; + requestData.promise = promise; + requestData.timer = executor_->createDeadlineTimer(); + requestData.timer->expires_from_now(operationsTimeout_); + requestData.timer->async_wait(std::bind(&ClientConnection::handleGetLastMessageIdTimeout, + shared_from_this(), std::placeholders::_1, requestData)); + pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId, requestData)); lock.unlock(); - sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId) - .addListener([promise](Result result, const ResponseData& data) { - if (result != ResultOk) { - promise.setFailed(result); - } - }); - return promise.getFuture(); + sendCommand(Commands::newGetLastMessageId(consumerId, requestId)); + return promise->getFuture(); } Future ClientConnection::newGetTopicsOfNamespace( @@ -1635,11 +1643,11 @@ void ClientConnection::handleError(const proto::CommandError& error) { PendingGetLastMessageIdRequestsMap::iterator it = pendingGetLastMessageIdRequests_.find(error.request_id()); if (it != pendingGetLastMessageIdRequests_.end()) { - auto getLastMessageIdPromise = it->second; + auto getLastMessageIdPromise = it->second.promise; pendingGetLastMessageIdRequests_.erase(it); lock.unlock(); - getLastMessageIdPromise.setFailed(result); + getLastMessageIdPromise->setFailed(result); } else { PendingGetNamespaceTopicsMap::iterator it = pendingGetNamespaceTopicsRequests_.find(error.request_id()); @@ -1719,16 +1727,16 @@ void ClientConnection::handleGetLastMessageIdResponse( auto it = pendingGetLastMessageIdRequests_.find(getLastMessageIdResponse.request_id()); if (it != pendingGetLastMessageIdRequests_.end()) { - auto getLastMessageIdPromise = it->second; + auto getLastMessageIdPromise = it->second.promise; pendingGetLastMessageIdRequests_.erase(it); lock.unlock(); if (getLastMessageIdResponse.has_consumer_mark_delete_position()) { - getLastMessageIdPromise.setValue( + getLastMessageIdPromise->setValue( {toMessageId(getLastMessageIdResponse.last_message_id()), toMessageId(getLastMessageIdResponse.consumer_mark_delete_position())}); } else { - getLastMessageIdPromise.setValue({toMessageId(getLastMessageIdResponse.last_message_id())}); + getLastMessageIdPromise->setValue({toMessageId(getLastMessageIdResponse.last_message_id())}); } } else { lock.unlock(); diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 7de90895..38b814ce 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -201,6 +201,11 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this @@ -342,7 +349,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this> PendingConsumerStatsMap; PendingConsumerStatsMap pendingConsumerStatsMap_; - typedef std::map> PendingGetLastMessageIdRequestsMap; + typedef std::map PendingGetLastMessageIdRequestsMap; PendingGetLastMessageIdRequestsMap pendingGetLastMessageIdRequests_; typedef std::map> PendingGetNamespaceTopicsMap; diff --git a/lib/GetLastMessageIdResponse.h b/lib/GetLastMessageIdResponse.h index cee754b1..05c4af71 100644 --- a/lib/GetLastMessageIdResponse.h +++ b/lib/GetLastMessageIdResponse.h @@ -23,8 +23,14 @@ #include +#include "Future.h" + namespace pulsar { +class GetLastMessageIdResponse; +typedef Promise GetLastMessageIdResponsePromise; +typedef std::shared_ptr GetLastMessageIdResponsePromisePtr; + class GetLastMessageIdResponse { friend std::ostream& operator<<(std::ostream& os, const GetLastMessageIdResponse& response) { os << "lastMessageId: " << response.lastMessageId_; @@ -52,7 +58,7 @@ class GetLastMessageIdResponse { private: MessageId lastMessageId_; MessageId markDeletePosition_; - bool hasMarkDeletePosition_; + bool hasMarkDeletePosition_ = false; }; typedef std::function BrokerGetLastMessageIdCallback;