Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci-pr-validation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:
sudo curl -o /gtest-parallel https://raw.githubusercontent.com/google/gtest-parallel/master/gtest_parallel.py

- name: CMake
run: cmake . -DBUILD_PERF_TOOLS=ON
run: cmake . -DCMAKE_BUILD_TYPE=Debug -DBUILD_PERF_TOOLS=ON

- name: Check formatting
run: make check-format
Expand Down
41 changes: 28 additions & 13 deletions lib/BatchMessageAcker.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,40 +31,55 @@ class BatchMessageAcker;
using BatchMessageAckerPtr = std::shared_ptr<BatchMessageAcker>;

class BatchMessageAcker {
public:
virtual ~BatchMessageAcker() {}
// Return false for these methods so that batch index ACK will be falled back to if the acker is created
// by deserializing from raw bytes.
virtual bool ackIndividual(int32_t) { return false; }
virtual bool ackCumulative(int32_t) { return false; }

bool shouldAckPreviousMessageId() noexcept {
bool expectedValue = false;
return prevBatchCumulativelyAcked_.compare_exchange_strong(expectedValue, true);
}

private:
// When a batched message is acknowledged cumulatively, the previous message id will be acknowledged
// without batch index ACK enabled. However, it should be acknowledged only once. Use this flag to
// determine whether to acknowledge the previous message id.
std::atomic_bool prevBatchCumulativelyAcked_{false};
};

class BatchMessageAckerImpl : public BatchMessageAcker {
public:
using Lock = std::lock_guard<std::mutex>;

static BatchMessageAckerPtr create(int32_t batchSize) {
return std::make_shared<BatchMessageAcker>(batchSize);
if (batchSize > 0) {
return std::make_shared<BatchMessageAckerImpl>(batchSize);
} else {
return std::make_shared<BatchMessageAcker>();
}
}

BatchMessageAcker(int32_t batchSize) : bitSet_(batchSize) { bitSet_.set(0, batchSize); }
BatchMessageAckerImpl(int32_t batchSize) : bitSet_(batchSize) { bitSet_.set(0, batchSize); }

bool ackIndividual(int32_t batchIndex) {
bool ackIndividual(int32_t batchIndex) override {
Lock lock{mutex_};
bitSet_.clear(batchIndex);
return bitSet_.isEmpty();
}

bool ackCumulative(int32_t batchIndex) {
bool ackCumulative(int32_t batchIndex) override {
Lock lock{mutex_};
// The range of cumulative acknowledgment is closed while BitSet::clear accepts a left-closed
// right-open range.
bitSet_.clear(0, batchIndex + 1);
return bitSet_.isEmpty();
}

bool shouldAckPreviousMessageId() noexcept {
bool expectedValue = false;
return prevBatchCumulativelyAcked_.compare_exchange_strong(expectedValue, true);
}

private:
BitSet bitSet_;
// When a batched message is acknowledged cumulatively, the previous message id will be acknowledged
// without batch index ACK enabled. However, it should be acknowledged only once. Use this flag to
// determine whether to acknowledge the previous message id.
std::atomic_bool prevBatchCumulativelyAcked_{false};
mutable std::mutex mutex_;
};

Expand Down
2 changes: 1 addition & 1 deletion lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection

int skippedMessages = 0;

auto acker = BatchMessageAcker::create(batchSize);
auto acker = BatchMessageAckerImpl::create(batchSize);
for (int i = 0; i < batchSize; i++) {
// This is a cheap copy since message contains only one shared pointer (impl_)
Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i, batchSize, acker);
Expand Down
2 changes: 1 addition & 1 deletion lib/MessageBatch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ MessageBatch& MessageBatch::parseFrom(const SharedBuffer& payload, uint32_t batc
impl_->metadata.set_num_messages_in_batch(batchSize);
batch_.clear();

auto acker = BatchMessageAcker::create(batchSize);
auto acker = BatchMessageAckerImpl::create(batchSize);
for (int i = 0; i < batchSize; ++i) {
batch_.push_back(Commands::deSerializeSingleMessageInBatch(batchMessage_, i, batchSize, acker));
}
Expand Down
4 changes: 4 additions & 0 deletions lib/MessageId.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ void MessageId::serialize(std::string& result) const {
idData.set_batch_index(impl_->batchIndex_);
}

if (impl_->batchSize_ != 0) {
idData.set_batch_size(impl_->batchSize_);
}

auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(impl_);
if (chunkMsgId) {
proto::MessageIdData& firstChunkIdData = *idData.mutable_first_chunk_message_id();
Expand Down
9 changes: 6 additions & 3 deletions lib/MessageIdBuilder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
#include <assert.h>
#include <pulsar/MessageIdBuilder.h>

#include "BatchedMessageIdImpl.h"
#include "MessageIdImpl.h"
#include "PulsarApi.pb.h"

Expand All @@ -42,8 +42,11 @@ MessageIdBuilder MessageIdBuilder::from(const proto::MessageIdData& messageIdDat
}

MessageId MessageIdBuilder::build() const {
assert(impl_->batchIndex_ < 0 || (impl_->batchSize_ > impl_->batchIndex_));
return MessageId{impl_};
if (impl_->batchIndex_ >= 0 && impl_->batchSize_ > 0) {
return MessageId{std::make_shared<BatchedMessageIdImpl>(*impl_, BatchMessageAckerImpl::create(0))};
} else {
return MessageId{impl_};
}
}

MessageIdBuilder& MessageIdBuilder::ledgerId(int64_t ledgerId) {
Expand Down
28 changes: 26 additions & 2 deletions tests/MessageIdTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,43 @@
#include <string>

#include "PulsarFriend.h"
#include "lib/BatchedMessageIdImpl.h"
#include "lib/Commands.h"
#include "lib/MessageIdUtil.h"

using namespace pulsar;

TEST(MessageIdTest, testSerialization) {
auto msgId = MessageIdBuilder().ledgerId(1L).entryId(2L).batchIndex(3L).build();
auto msgId = MessageIdBuilder().ledgerId(1L).entryId(2L).partition(10).batchIndex(3).build();

std::string serialized;
msgId.serialize(serialized);

MessageId deserialized = MessageId::deserialize(serialized);
ASSERT_FALSE(std::dynamic_pointer_cast<BatchedMessageIdImpl>(Commands::getMessageIdImpl(deserialized)));
ASSERT_EQ(deserialized.ledgerId(), 1L);
ASSERT_EQ(deserialized.entryId(), 2L);
ASSERT_EQ(deserialized.partition(), 10);
ASSERT_EQ(deserialized.batchIndex(), 3);
ASSERT_EQ(deserialized.batchSize(), 0);

ASSERT_EQ(msgId, deserialized);
// Only a MessageId whose batch index and batch size are both valid can be deserialized as a batched
// message id.
msgId = MessageIdBuilder().ledgerId(3L).entryId(1L).batchIndex(0).batchSize(1).build();
msgId.serialize(serialized);
deserialized = MessageId::deserialize(serialized);
auto batchedMessageId =
std::dynamic_pointer_cast<BatchedMessageIdImpl>(Commands::getMessageIdImpl(deserialized));
ASSERT_TRUE(batchedMessageId);
// The BatchMessageAcker object created from deserialization is a fake implementation that all acknowledge
// methods return false.
ASSERT_FALSE(batchedMessageId->ackIndividual(0));
ASSERT_FALSE(batchedMessageId->ackCumulative(0));
ASSERT_EQ(deserialized.ledgerId(), 3L);
ASSERT_EQ(deserialized.entryId(), 1L);
ASSERT_EQ(deserialized.partition(), -1);
ASSERT_EQ(deserialized.batchIndex(), 0);
ASSERT_EQ(deserialized.batchSize(), 1);
}

TEST(MessageIdTest, testCompareLedgerAndEntryId) {
Expand Down