Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/Consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ void Consumer::getLastMessageIdAsync(GetLastMessageIdCallback callback) {
callback(ResultConsumerNotInitialized, MessageId());
return;
}
getLastMessageIdAsync([callback](Result result, const GetLastMessageIdResponse& response) {

impl_->getLastMessageIdAsync([callback](Result result, const GetLastMessageIdResponse& response) {
callback(result, response.getLastMessageId());
});
}
Expand Down
2 changes: 1 addition & 1 deletion lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1341,7 +1341,7 @@ void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback
if (state == Closed || state == Closing) {
LOG_ERROR(getName() << "Client connection already closed.");
if (callback) {
callback(ResultAlreadyClosed, MessageId());
callback(ResultAlreadyClosed, GetLastMessageIdResponse());
}
return;
}
Expand Down
3 changes: 1 addition & 2 deletions lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class BatchAcknowledgementTracker;
class MessageCrypto;
class GetLastMessageIdResponse;
typedef std::shared_ptr<MessageCrypto> MessageCryptoPtr;
typedef std::function<void(Result, const GetLastMessageIdResponse&)> BrokerGetLastMessageIdCallback;
typedef std::shared_ptr<Backoff> BackoffPtr;

class AckGroupingTracker;
Expand Down Expand Up @@ -124,6 +123,7 @@ class ConsumerImpl : public ConsumerImplBase {
const std::string& getName() const override;
int getNumOfPrefetchedMessages() const override;
void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) override;
void getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) override;
void seekAsync(const MessageId& msgId, ResultCallback callback) override;
void seekAsync(uint64_t timestamp, ResultCallback callback) override;
void negativeAcknowledge(const MessageId& msgId) override;
Expand All @@ -139,7 +139,6 @@ class ConsumerImpl : public ConsumerImplBase {

virtual bool isReadCompacted();
virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback);
virtual void getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback);
void beforeConnectionChange(ClientConnection& cnx) override;

protected:
Expand Down
3 changes: 2 additions & 1 deletion lib/ConsumerImplBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
#include <set>

#include "Future.h"
#include "GetLastMessageIdResponse.h"
#include "HandlerBase.h"

namespace pulsar {
class ConsumerImplBase;
using ConsumerImplBaseWeakPtr = std::weak_ptr<ConsumerImplBase>;

class OpBatchReceive {
public:
OpBatchReceive();
Expand Down Expand Up @@ -68,6 +68,7 @@ class ConsumerImplBase : public HandlerBase, public std::enable_shared_from_this
virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) = 0;
virtual int getNumOfPrefetchedMessages() const = 0;
virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0;
virtual void getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) = 0;
virtual void seekAsync(const MessageId& msgId, ResultCallback callback) = 0;
virtual void seekAsync(uint64_t timestamp, ResultCallback callback) = 0;
virtual void negativeAcknowledge(const MessageId& msgId) = 0;
Expand Down
3 changes: 3 additions & 0 deletions lib/GetLastMessageIdResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#pragma once

#include <pulsar/MessageId.h>
#include <pulsar/Result.h>

#include <iostream>

Expand Down Expand Up @@ -54,4 +55,6 @@ class GetLastMessageIdResponse {
bool hasMarkDeletePosition_;
};

typedef std::function<void(Result, const GetLastMessageIdResponse&)> BrokerGetLastMessageIdCallback;

} // namespace pulsar
4 changes: 4 additions & 0 deletions lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,10 @@ void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCal
});
}

void MultiTopicsConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) {
callback(ResultOperationNotSupported, GetLastMessageIdResponse());
}

void MultiTopicsConsumerImpl::handleGetConsumerStats(Result res, BrokerConsumerStats brokerConsumerStats,
LatchPtr latchPtr,
MultiTopicsBrokerConsumerStatsPtr statsPtr, size_t index,
Expand Down
1 change: 1 addition & 0 deletions lib/MultiTopicsConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
const std::string& getName() const override;
int getNumOfPrefetchedMessages() const override;
void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) override;
void getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) override;
void seekAsync(const MessageId& msgId, ResultCallback callback) override;
void seekAsync(uint64_t timestamp, ResultCallback callback) override;
void negativeAcknowledge(const MessageId& msgId) override;
Expand Down
22 changes: 22 additions & 0 deletions tests/ConsumerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,28 @@ TEST(ConsumerTest, testPartitionsWithCloseUnblock) {
thread.join();
}

TEST(ConsumerTest, testGetLastMessageId) {
Client client(lookupUrl);
const std::string topic = "testGetLastMessageId-" + std::to_string(time(nullptr));

Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topic, "test-sub", consumer));

MessageId msgId;
ASSERT_EQ(ResultOk, consumer.getLastMessageId(msgId));
ASSERT_EQ(msgId, MessageId(-1, -1, -1, -1));

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
Message msg = MessageBuilder().setContent("message").build();
ASSERT_EQ(ResultOk, producer.send(msg));

ASSERT_EQ(ResultOk, consumer.getLastMessageId(msgId));
ASSERT_NE(msgId, MessageId(-1, -1, -1, -1));

client.close();
}

TEST(ConsumerTest, testGetLastMessageIdBlockWhenConnectionDisconnected) {
int operationTimeout = 5;
ClientConfiguration clientConfiguration;
Expand Down