From 09be1f6b2e2e5bcd486107d4fbc84f90b2cd06c8 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Fri, 11 Nov 2022 17:57:29 +0800 Subject: [PATCH 1/2] Fix getLastMessageId method dead recursion. --- lib/Consumer.cc | 3 ++- lib/ConsumerImpl.cc | 2 +- lib/ConsumerImpl.h | 3 +-- lib/ConsumerImplBase.h | 3 ++- lib/GetLastMessageIdResponse.h | 3 +++ lib/MultiTopicsConsumerImpl.cc | 4 ++++ lib/MultiTopicsConsumerImpl.h | 1 + tests/ConsumerTest.cc | 24 ++++++++++++++++++++++++ 8 files changed, 38 insertions(+), 5 deletions(-) diff --git a/lib/Consumer.cc b/lib/Consumer.cc index 07afcea7..bda7f862 100644 --- a/lib/Consumer.cc +++ b/lib/Consumer.cc @@ -295,7 +295,8 @@ void Consumer::getLastMessageIdAsync(GetLastMessageIdCallback callback) { callback(ResultConsumerNotInitialized, MessageId()); return; } - getLastMessageIdAsync([callback](Result result, const GetLastMessageIdResponse& response) { + + impl_->getLastMessageIdAsync([callback](Result result, const GetLastMessageIdResponse& response) { callback(result, response.getLastMessageId()); }); } diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 09667294..7e6099d3 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -1341,7 +1341,7 @@ void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback if (state == Closed || state == Closing) { LOG_ERROR(getName() << "Client connection already closed."); if (callback) { - callback(ResultAlreadyClosed, MessageId()); + callback(ResultAlreadyClosed, GetLastMessageIdResponse()); } return; } diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 9ba65773..7b89135f 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -44,7 +44,6 @@ class BatchAcknowledgementTracker; class MessageCrypto; class GetLastMessageIdResponse; typedef std::shared_ptr MessageCryptoPtr; -typedef std::function BrokerGetLastMessageIdCallback; typedef std::shared_ptr BackoffPtr; class AckGroupingTracker; @@ -124,6 +123,7 @@ class ConsumerImpl : public ConsumerImplBase { const std::string& getName() const override; int getNumOfPrefetchedMessages() const override; void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) override; + void getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) override; void seekAsync(const MessageId& msgId, ResultCallback callback) override; void seekAsync(uint64_t timestamp, ResultCallback callback) override; void negativeAcknowledge(const MessageId& msgId) override; @@ -139,7 +139,6 @@ class ConsumerImpl : public ConsumerImplBase { virtual bool isReadCompacted(); virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback); - virtual void getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback); void beforeConnectionChange(ClientConnection& cnx) override; protected: diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h index 74a8810e..5bc7e1b8 100644 --- a/lib/ConsumerImplBase.h +++ b/lib/ConsumerImplBase.h @@ -25,12 +25,12 @@ #include #include "Future.h" +#include "GetLastMessageIdResponse.h" #include "HandlerBase.h" namespace pulsar { class ConsumerImplBase; using ConsumerImplBaseWeakPtr = std::weak_ptr; - class OpBatchReceive { public: OpBatchReceive(); @@ -68,6 +68,7 @@ class ConsumerImplBase : public HandlerBase, public std::enable_shared_from_this virtual void redeliverUnacknowledgedMessages(const std::set& messageIds) = 0; virtual int getNumOfPrefetchedMessages() const = 0; virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0; + virtual void getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) = 0; virtual void seekAsync(const MessageId& msgId, ResultCallback callback) = 0; virtual void seekAsync(uint64_t timestamp, ResultCallback callback) = 0; virtual void negativeAcknowledge(const MessageId& msgId) = 0; diff --git a/lib/GetLastMessageIdResponse.h b/lib/GetLastMessageIdResponse.h index 1ff7933e..cee754b1 100644 --- a/lib/GetLastMessageIdResponse.h +++ b/lib/GetLastMessageIdResponse.h @@ -19,6 +19,7 @@ #pragma once #include +#include #include @@ -54,4 +55,6 @@ class GetLastMessageIdResponse { bool hasMarkDeletePosition_; }; +typedef std::function BrokerGetLastMessageIdCallback; + } // namespace pulsar diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index d14c3cae..b8d55b49 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -803,6 +803,10 @@ void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCal }); } +void MultiTopicsConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) { + callback(ResultOperationNotSupported, GetLastMessageIdResponse()); +} + void MultiTopicsConsumerImpl::handleGetConsumerStats(Result res, BrokerConsumerStats brokerConsumerStats, LatchPtr latchPtr, MultiTopicsBrokerConsumerStatsPtr statsPtr, size_t index, diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h index ac25c840..da42b748 100644 --- a/lib/MultiTopicsConsumerImpl.h +++ b/lib/MultiTopicsConsumerImpl.h @@ -82,6 +82,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { const std::string& getName() const override; int getNumOfPrefetchedMessages() const override; void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) override; + void getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) override; void seekAsync(const MessageId& msgId, ResultCallback callback) override; void seekAsync(uint64_t timestamp, ResultCallback callback) override; void negativeAcknowledge(const MessageId& msgId) override; diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index a77636ee..d1ad376a 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -841,6 +841,30 @@ TEST(ConsumerTest, testPartitionsWithCloseUnblock) { thread.join(); } +TEST(ConsumerTest, testGetLastMessageId) { + Client client(lookupUrl); + const std::string topic = "testGetLastMessageId-" + std::to_string(time(nullptr)); + + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, "test-sub", consumer)); + + MessageId msgId; + ASSERT_EQ(ResultOk, consumer.getLastMessageId(msgId)); + ASSERT_EQ(msgId, MessageId(-1, -1, -1, -1)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); + Message msg = MessageBuilder().setContent("message").build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + + ASSERT_EQ(ResultOk, consumer.getLastMessageId(msgId)); + ASSERT_TRUE(msgId != MessageId(-1, -1, -1, -1)); + + std::cout << msgId << std::endl; + + client.close(); +} + TEST(ConsumerTest, testGetLastMessageIdBlockWhenConnectionDisconnected) { int operationTimeout = 5; ClientConfiguration clientConfiguration; From f916cddd129cdc4618131f850e5990c4fa762a73 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 17 Nov 2022 00:03:26 +0800 Subject: [PATCH 2/2] Fix code review. --- tests/ConsumerTest.cc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index d1ad376a..a5a25bcb 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -858,9 +858,7 @@ TEST(ConsumerTest, testGetLastMessageId) { ASSERT_EQ(ResultOk, producer.send(msg)); ASSERT_EQ(ResultOk, consumer.getLastMessageId(msgId)); - ASSERT_TRUE(msgId != MessageId(-1, -1, -1, -1)); - - std::cout << msgId << std::endl; + ASSERT_NE(msgId, MessageId(-1, -1, -1, -1)); client.close(); }