From fa4d831d15208bb89a515442cd833d93fa95bda1 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sun, 6 Nov 2022 00:40:10 +0800 Subject: [PATCH] Add MessageId::batchSize() and the MessageIdBuilder Master issue: https://github.com/apache/pulsar-client-cpp/issues/87 ### Motivation To support batch index acknowledgment, we must provide a method to get the batch size of a batched message ID. ### Modifications Instead of adding another overload constructor to `MessageId`, this PR adds a `MessageIdBuilder` class to construct the `MessageId` in a more elegant way. The original constructor is counterintuitive because the partition index is the 1st argument. https://github.com/apache/pulsar-client-cpp/blob/74ef1a01f5c7a4604d251de6d040c433f9bbf56b/include/pulsar/MessageId.h#L47 Therefore, this PR marks it as deprecated and replace all invocations of it with the `MessageIdBuilder` usages. To verify the `MessageId::batchSize()`, the following tests are modified: - `BatchMessageTest.testBatchSizeInBytes`: the batch size is always 2 because of the `batchingMaxAllowedSizeInBytes` config. - `MessageChunkingTest.testEndToEnd`: the batch size field is not set (default: 0) because batching is disabled. --- include/pulsar/MessageId.h | 8 ++ include/pulsar/MessageIdBuilder.h | 115 ++++++++++++++++++++++++++++ lib/BatchAcknowledgementTracker.cc | 11 +-- lib/ClientConnection.cc | 8 +- lib/Commands.cc | 6 +- lib/Commands.h | 3 +- lib/ConsumerImpl.cc | 25 +++--- lib/Message.cc | 5 +- lib/MessageAndCallbackBatch.cc | 5 +- lib/MessageBatch.cc | 2 +- lib/MessageId.cc | 11 ++- lib/MessageIdBuilder.cc | 74 ++++++++++++++++++ lib/MessageIdImpl.h | 13 ++-- lib/MessageIdUtil.h | 5 ++ lib/NegativeAcksTracker.cc | 4 +- lib/ProducerImpl.cc | 5 +- lib/UnAckedMessageTrackerEnabled.cc | 5 +- tests/BasicEndToEndTest.cc | 4 +- tests/BatchMessageTest.cc | 6 +- tests/ConsumerTest.cc | 4 +- tests/MessageChunkingTest.cc | 2 + tests/MessageIdTest.cc | 12 +-- tests/PulsarFriend.h | 6 -- 23 files changed, 276 insertions(+), 63 deletions(-) create mode 100644 include/pulsar/MessageIdBuilder.h create mode 100644 lib/MessageIdBuilder.cc diff --git a/include/pulsar/MessageId.h b/include/pulsar/MessageId.h index 7c9626c1..28b88c85 100644 --- a/include/pulsar/MessageId.h +++ b/include/pulsar/MessageId.h @@ -37,8 +37,12 @@ class PULSAR_PUBLIC MessageId { MessageId(); /** + * @deprecated + * * Construct the MessageId * + * NOTE: This API still exists for backward compatibility, use MessageIdBuilder instead. + * * @param partition the partition number of a topic * @param ledgerId the ledger id * @param entryId the entry id @@ -88,6 +92,7 @@ class PULSAR_PUBLIC MessageId { int64_t entryId() const; int32_t batchIndex() const; int32_t partition() const; + int32_t batchSize() const; private: friend class ConsumerImpl; @@ -102,11 +107,14 @@ class PULSAR_PUBLIC MessageId { friend class PulsarWrapper; friend class PulsarFriend; friend class NegativeAcksTracker; + friend class MessageIdBuilder; friend PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const MessageId& messageId); typedef std::shared_ptr MessageIdImplPtr; MessageIdImplPtr impl_; + + explicit MessageId(const MessageIdImplPtr& impl); }; typedef std::vector MessageIdList; diff --git a/include/pulsar/MessageIdBuilder.h b/include/pulsar/MessageIdBuilder.h new file mode 100644 index 00000000..ce2b99ba --- /dev/null +++ b/include/pulsar/MessageIdBuilder.h @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include + +#include + +namespace pulsar { + +namespace proto { +class MessageIdData; +} + +/** + * The builder to build a MessageId. + * + * Example of building a single MessageId: + * + * ```c++ + * MessageId msgId = MessageIdBuilder() + * .ledgerId(0L) + * .entryId(0L) + * .build(); + * ``` + * + * Example of building a batched MessageId: + * + * ```c++ + * MessageId msgId = MessageIdBuilder() + * .ledgerId(0L) + * .entryId(0L) + * .batchIndex(0) + * .batchSize(2) + * .build(); + * ``` + */ +class PULSAR_PUBLIC MessageIdBuilder { + public: + explicit MessageIdBuilder(); + + /** + * Create an instance that copies the data from messageId. + */ + static MessageIdBuilder from(const MessageId& messageId); + + /** + * Create an instance from the proto::MessageIdData instance. + * + * @note It's an internal API that converts the MessageIdData defined by PulsarApi.proto + * @see https://github.com/apache/pulsar-client-cpp/blob/main/proto/PulsarApi.proto + */ + static MessageIdBuilder from(const proto::MessageIdData& messageIdData); + + /** + * Build a MessageId. + */ + MessageId build() const; + + /** + * Set the ledger ID field. + * + * Default: -1L + */ + MessageIdBuilder& ledgerId(int64_t ledgerId); + + /** + * Set the entry ID field. + * + * Default: -1L + */ + MessageIdBuilder& entryId(int64_t entryId); + + /** + * Set the partition index. + * + * Default: -1 + */ + MessageIdBuilder& partition(int32_t partition); + + /** + * Set the batch index. + * + * Default: -1 + */ + MessageIdBuilder& batchIndex(int32_t batchIndex); + + /** + * Set the batch size. + * + * Default: 0 + */ + MessageIdBuilder& batchSize(int32_t batchSize); + + private: + std::shared_ptr impl_; +}; + +} // namespace pulsar diff --git a/lib/BatchAcknowledgementTracker.cc b/lib/BatchAcknowledgementTracker.cc index 1df4984f..d1bb6a89 100644 --- a/lib/BatchAcknowledgementTracker.cc +++ b/lib/BatchAcknowledgementTracker.cc @@ -19,6 +19,7 @@ #include "BatchAcknowledgementTracker.h" #include "LogUtils.h" +#include "MessageIdUtil.h" #include "MessageImpl.h" namespace pulsar { @@ -71,8 +72,7 @@ void BatchAcknowledgementTracker::deleteAckedMessage(const MessageId& messageId, return; } - MessageId batchMessageId = - MessageId(messageId.partition(), messageId.ledgerId(), messageId.entryId(), -1 /* Batch index */); + auto batchMessageId = discardBatch(messageId); Lock lock(mutex_); if (ackType == CommandAck_AckType_Cumulative) { @@ -114,9 +114,7 @@ void BatchAcknowledgementTracker::deleteAckedMessage(const MessageId& messageId, bool BatchAcknowledgementTracker::isBatchReady(const MessageId& msgID, CommandAck_AckType ackType) { Lock lock(mutex_); - // Remove batch index - MessageId batchMessageId = - MessageId(msgID.partition(), msgID.ledgerId(), msgID.entryId(), -1 /* Batch index */); + auto batchMessageId = discardBatch(msgID); TrackerMap::iterator pos = trackerMap_.find(batchMessageId); if (pos == trackerMap_.end() || @@ -154,8 +152,7 @@ const MessageId BatchAcknowledgementTracker::getGreatestCumulativeAckReady(const Lock lock(mutex_); // Remove batch index - MessageId batchMessageId = - MessageId(messageId.partition(), messageId.ledgerId(), messageId.entryId(), -1 /* Batch index */); + auto batchMessageId = discardBatch(messageId); TrackerMap::iterator pos = trackerMap_.find(batchMessageId); // element not found diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index b3df8310..67553899 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -18,6 +18,8 @@ */ #include "ClientConnection.h" +#include + #include #include "Commands.h" @@ -43,8 +45,7 @@ static const uint32_t DefaultBufferSize = 64 * 1024; static const int KeepAliveIntervalInSeconds = 30; static MessageId toMessageId(const proto::MessageIdData& messageIdData) { - return MessageId{messageIdData.partition(), static_cast(messageIdData.ledgerid()), - static_cast(messageIdData.entryid()), messageIdData.batch_index()}; + return MessageIdBuilder::from(messageIdData).build(); } // Convert error codes from protobuf to client API Result @@ -830,8 +831,7 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) { int producerId = sendReceipt.producer_id(); uint64_t sequenceId = sendReceipt.sequence_id(); const proto::MessageIdData& messageIdData = sendReceipt.message_id(); - MessageId messageId = MessageId(messageIdData.partition(), messageIdData.ledgerid(), - messageIdData.entryid(), messageIdData.batch_index()); + auto messageId = toMessageId(messageIdData); LOG_DEBUG(cnxString_ << "Got receipt for producer: " << producerId << " -- msg: " << sequenceId << "-- message id: " << messageId); diff --git a/lib/Commands.cc b/lib/Commands.cc index 69492c6f..f97b0eb8 100644 --- a/lib/Commands.cc +++ b/lib/Commands.cc @@ -19,6 +19,7 @@ #include "Commands.h" #include +#include #include #include @@ -807,7 +808,8 @@ uint64_t Commands::serializeSingleMessageInBatchWithPayload(const Message& msg, return msgMetadata.sequence_id(); } -Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex) { +Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex, + int32_t batchSize) { SharedBuffer& uncompressedPayload = batchedMessage.impl_->payload; // Format of batch message @@ -825,7 +827,7 @@ Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32 uncompressedPayload.consume(payloadSize); const MessageId& m = batchedMessage.impl_->messageId; - MessageId singleMessageId(m.partition(), m.ledgerId(), m.entryId(), batchIndex); + auto singleMessageId = MessageIdBuilder::from(m).batchIndex(batchIndex).batchSize(batchSize).build(); Message singleMessage(singleMessageId, batchedMessage.impl_->metadata, payload, metadata, batchedMessage.impl_->getTopicName()); singleMessage.impl_->cnx_ = batchedMessage.impl_->cnx_; diff --git a/lib/Commands.h b/lib/Commands.h index 09f6f8be..6681f138 100644 --- a/lib/Commands.h +++ b/lib/Commands.h @@ -132,7 +132,8 @@ class Commands { static PULSAR_PUBLIC uint64_t serializeSingleMessageInBatchWithPayload( const Message& msg, SharedBuffer& batchPayLoad, unsigned long maxMessageSizeInBytes); - static Message deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex); + static Message deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex, + int32_t batchSize); static SharedBuffer newConsumerStats(uint64_t consumerId, uint64_t requestId); diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 5698b46c..6e79f7aa 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -18,6 +18,8 @@ */ #include "ConsumerImpl.h" +#include + #include #include "AckGroupingTracker.h" @@ -424,8 +426,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: // Only a non-batched messages can be a chunk if (!metadata.has_num_messages_in_batch() && isChunkedMessage) { const auto& messageIdData = msg.message_id(); - MessageId messageId(messageIdData.partition(), messageIdData.ledgerid(), messageIdData.entryid(), - messageIdData.batch_index()); + auto messageId = MessageIdBuilder::from(messageIdData).build(); auto optionalPayload = processMessageChunk(payload, metadata, messageId, messageIdData, cnx); if (optionalPayload.is_present()) { payload = optionalPayload.value(); @@ -582,7 +583,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection for (int i = 0; i < batchSize; i++) { // This is a cheap copy since message contains only one shared pointer (impl_) - Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i); + Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i, batchSize); msg.impl_->setRedeliveryCount(redeliveryCount); msg.impl_->setTopicName(batchedMessage.getTopicName()); msg.impl_->convertPayloadToKeyValue(config_.getSchema()); @@ -882,13 +883,17 @@ Optional ConsumerImpl::clearReceiveQueue() { if (incomingMessages_.peekAndClear(nextMessageInQueue)) { // There was at least one message pending in the queue const MessageId& nextMessageId = nextMessageInQueue.getMessageId(); - MessageId previousMessageId; - if (nextMessageId.batchIndex() >= 0) { - previousMessageId = MessageId(-1, nextMessageId.ledgerId(), nextMessageId.entryId(), - nextMessageId.batchIndex() - 1); - } else { - previousMessageId = MessageId(-1, nextMessageId.ledgerId(), nextMessageId.entryId() - 1, -1); - } + auto previousMessageId = (nextMessageId.batchIndex() >= 0) + ? MessageIdBuilder() + .ledgerId(nextMessageId.ledgerId()) + .entryId(nextMessageId.entryId()) + .batchIndex(nextMessageId.batchIndex() - 1) + .batchSize(nextMessageId.batchSize()) + .build() + : MessageIdBuilder() + .ledgerId(nextMessageId.ledgerId()) + .entryId(nextMessageId.entryId() - 1) + .build(); return Optional::of(previousMessageId); } else if (lastDequedMessageId_ != MessageId::earliest()) { // If the queue was empty we need to restart from the message just after the last one that has been diff --git a/lib/Message.cc b/lib/Message.cc index 84f203f7..561a723d 100644 --- a/lib/Message.cc +++ b/lib/Message.cc @@ -18,6 +18,7 @@ */ #include #include +#include #include #include @@ -63,9 +64,7 @@ Message::Message(MessageImplPtr& impl) : impl_(impl) {} Message::Message(const proto::CommandMessage& msg, proto::MessageMetadata& metadata, SharedBuffer& payload, int32_t partition) : impl_(std::make_shared()) { - impl_->messageId = - MessageId(partition, msg.message_id().ledgerid(), msg.message_id().entryid(), /* batchId */ - -1); + impl_->messageId = MessageIdBuilder::from(msg.message_id()).batchIndex(-1).build(); impl_->metadata = metadata; impl_->payload = payload; } diff --git a/lib/MessageAndCallbackBatch.cc b/lib/MessageAndCallbackBatch.cc index 3f50dc02..56725389 100644 --- a/lib/MessageAndCallbackBatch.cc +++ b/lib/MessageAndCallbackBatch.cc @@ -18,6 +18,8 @@ */ #include "MessageAndCallbackBatch.h" +#include + #include "ClientConnection.h" #include "Commands.h" #include "LogUtils.h" @@ -54,8 +56,7 @@ static void completeSendCallbacks(const std::vector& callbacks, Re int32_t numOfMessages = static_cast(callbacks.size()); LOG_DEBUG("Batch complete [Result = " << result << "] [numOfMessages = " << numOfMessages << "]"); for (int32_t i = 0; i < numOfMessages; i++) { - MessageId idInBatch(id.partition(), id.ledgerId(), id.entryId(), i); - callbacks[i](result, idInBatch); + callbacks[i](result, MessageIdBuilder::from(id).batchIndex(i).batchSize(numOfMessages).build()); } } diff --git a/lib/MessageBatch.cc b/lib/MessageBatch.cc index 12144ff5..f61b56ad 100644 --- a/lib/MessageBatch.cc +++ b/lib/MessageBatch.cc @@ -47,7 +47,7 @@ MessageBatch& MessageBatch::parseFrom(const SharedBuffer& payload, uint32_t batc batch_.clear(); for (int i = 0; i < batchSize; ++i) { - batch_.push_back(Commands::deSerializeSingleMessageInBatch(batchMessage_, i)); + batch_.push_back(Commands::deSerializeSingleMessageInBatch(batchMessage_, i, batchSize)); } return *this; } diff --git a/lib/MessageId.cc b/lib/MessageId.cc index 5b133282..9a1a38c8 100644 --- a/lib/MessageId.cc +++ b/lib/MessageId.cc @@ -18,6 +18,7 @@ */ #include +#include #include #include @@ -42,14 +43,16 @@ MessageId& MessageId::operator=(const MessageId& m) { MessageId::MessageId(int32_t partition, int64_t ledgerId, int64_t entryId, int32_t batchIndex) : impl_(std::make_shared(partition, ledgerId, entryId, batchIndex)) {} +MessageId::MessageId(const MessageIdImplPtr& impl) : impl_(impl) {} + const MessageId& MessageId::earliest() { - static const MessageId _earliest(-1, -1, -1, -1); + static const auto _earliest = MessageIdBuilder().build(); return _earliest; } const MessageId& MessageId::latest() { static const int64_t long_max = std::numeric_limits::max(); - static const MessageId _latest(-1, long_max, long_max, -1); + static const auto _latest = MessageIdBuilder().ledgerId(long_max).entryId(long_max).build(); return _latest; } @@ -77,7 +80,7 @@ MessageId MessageId::deserialize(const std::string& serializedMessageId) { throw std::invalid_argument("Failed to parse serialized message id"); } - return MessageId(idData.partition(), idData.ledgerid(), idData.entryid(), idData.batch_index()); + return MessageIdBuilder::from(idData).build(); } int64_t MessageId::ledgerId() const { return impl_->ledgerId_; } @@ -88,6 +91,8 @@ int32_t MessageId::batchIndex() const { return impl_->batchIndex_; } int32_t MessageId::partition() const { return impl_->partition_; } +int32_t MessageId::batchSize() const { return impl_->batchSize_; } + PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const pulsar::MessageId& messageId) { s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_ << ',' << messageId.impl_->partition_ << ',' << messageId.impl_->batchIndex_ << ')'; diff --git a/lib/MessageIdBuilder.cc b/lib/MessageIdBuilder.cc new file mode 100644 index 00000000..8857daf5 --- /dev/null +++ b/lib/MessageIdBuilder.cc @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +#include "MessageIdImpl.h" +#include "PulsarApi.pb.h" + +namespace pulsar { + +MessageIdBuilder::MessageIdBuilder() : impl_(std::make_shared()) {} + +MessageIdBuilder MessageIdBuilder::from(const MessageId& messageId) { + MessageIdBuilder builder; + *builder.impl_ = *(messageId.impl_); + return builder; +} + +MessageIdBuilder MessageIdBuilder::from(const proto::MessageIdData& messageIdData) { + return MessageIdBuilder() + .ledgerId(messageIdData.ledgerid()) + .entryId(messageIdData.entryid()) + .partition(messageIdData.partition()) + .batchIndex(messageIdData.batch_index()) + .batchSize(messageIdData.batch_size()); +} + +MessageId MessageIdBuilder::build() const { + assert(impl_->batchIndex_ < 0 || (impl_->batchSize_ > impl_->batchIndex_)); + return MessageId{impl_}; +} + +MessageIdBuilder& MessageIdBuilder::ledgerId(int64_t ledgerId) { + impl_->ledgerId_ = ledgerId; + return *this; +} + +MessageIdBuilder& MessageIdBuilder::entryId(int64_t entryId) { + impl_->entryId_ = entryId; + return *this; +} + +MessageIdBuilder& MessageIdBuilder::partition(int32_t partition) { + impl_->partition_ = partition; + return *this; +} + +MessageIdBuilder& MessageIdBuilder::batchIndex(int32_t batchIndex) { + impl_->batchIndex_ = batchIndex; + return *this; +} + +MessageIdBuilder& MessageIdBuilder::batchSize(int32_t batchSize) { + impl_->batchSize_ = batchSize; + return *this; +} + +} // namespace pulsar diff --git a/lib/MessageIdImpl.h b/lib/MessageIdImpl.h index 9db758c5..57d1c4eb 100644 --- a/lib/MessageIdImpl.h +++ b/lib/MessageIdImpl.h @@ -26,23 +26,24 @@ namespace pulsar { class MessageIdImpl { public: - MessageIdImpl() : ledgerId_(-1), entryId_(-1), partition_(-1), batchIndex_(-1), topicName_() {} + MessageIdImpl() = default; MessageIdImpl(int32_t partition, int64_t ledgerId, int64_t entryId, int32_t batchIndex) : ledgerId_(ledgerId), entryId_(entryId), partition_(partition), batchIndex_(batchIndex), topicName_() {} - const int64_t ledgerId_; - const int64_t entryId_; - const int32_t partition_; - const int32_t batchIndex_; + int64_t ledgerId_ = -1; + int64_t entryId_ = -1; + int32_t partition_ = -1; + int32_t batchIndex_ = -1; + int32_t batchSize_ = 0; const std::string& getTopicName() { return *topicName_; } void setTopicName(const std::string& topicName) { topicName_ = &topicName; } private: - const std::string* topicName_; + const std::string* topicName_ = nullptr; friend class MessageImpl; friend class MultiTopicsConsumerImpl; friend class UnAckedMessageTrackerEnabled; diff --git a/lib/MessageIdUtil.h b/lib/MessageIdUtil.h index 1f4ffd36..70af7fe0 100644 --- a/lib/MessageIdUtil.h +++ b/lib/MessageIdUtil.h @@ -17,6 +17,7 @@ * under the License. */ #include +#include namespace pulsar { @@ -35,4 +36,8 @@ inline int compareLedgerAndEntryId(const MessageId& lhs, const MessageId& rhs) { return internal::compare(lhs.entryId(), rhs.entryId()); } +inline MessageId discardBatch(const MessageId& messageId) { + return MessageIdBuilder::from(messageId).batchIndex(-1).batchSize(0).build(); +} + } // namespace pulsar diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc index 3ccf0bea..6ff322df 100644 --- a/lib/NegativeAcksTracker.cc +++ b/lib/NegativeAcksTracker.cc @@ -26,6 +26,7 @@ #include "ConsumerImpl.h" #include "ExecutorService.h" #include "LogUtils.h" +#include "MessageIdUtil.h" DECLARE_LOG_OBJECT() namespace pulsar { @@ -90,8 +91,7 @@ void NegativeAcksTracker::add(const MessageId &m) { auto now = Clock::now(); // Erase batch id to group all nacks from same batch - MessageId batchMessageId = MessageId(m.partition(), m.ledgerId(), m.entryId(), -1); - nackedMessages_[batchMessageId] = now + nackDelay_; + nackedMessages_[discardBatch(m)] = now + nackDelay_; if (!timer_) { scheduleTimer(); diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index f3e61204..ab1f3448 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -18,6 +18,8 @@ */ #include "ProducerImpl.h" +#include + #include #include "BatchMessageContainer.h" @@ -813,8 +815,7 @@ bool ProducerImpl::removeCorruptMessage(uint64_t sequenceId) { } bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) { - MessageId messageId(partition_, rawMessageId.ledgerId(), rawMessageId.entryId(), - rawMessageId.batchIndex()); + auto messageId = MessageIdBuilder::from(rawMessageId).partition(partition_).build(); Lock lock(mutex_); if (pendingMessagesQueue_.empty()) { diff --git a/lib/UnAckedMessageTrackerEnabled.cc b/lib/UnAckedMessageTrackerEnabled.cc index 0579777e..ff1b928f 100644 --- a/lib/UnAckedMessageTrackerEnabled.cc +++ b/lib/UnAckedMessageTrackerEnabled.cc @@ -24,6 +24,7 @@ #include "ConsumerImplBase.h" #include "ExecutorService.h" #include "LogUtils.h" +#include "MessageIdUtil.h" DECLARE_LOG_OBJECT(); @@ -96,7 +97,7 @@ UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, long bool UnAckedMessageTrackerEnabled::add(const MessageId& msgId) { std::lock_guard acquire(lock_); - MessageId id(msgId.partition(), msgId.ledgerId(), msgId.entryId(), -1); + auto id = discardBatch(msgId); if (messageIdPartitionMap.count(id) == 0) { std::set& partition = timePartitions.back(); bool emplace = messageIdPartitionMap.emplace(id, partition).second; @@ -113,7 +114,7 @@ bool UnAckedMessageTrackerEnabled::isEmpty() { bool UnAckedMessageTrackerEnabled::remove(const MessageId& msgId) { std::lock_guard acquire(lock_); - MessageId id(msgId.partition(), msgId.ledgerId(), msgId.entryId(), -1); + auto id = discardBatch(msgId); bool removed = false; std::map&>::iterator exist = messageIdPartitionMap.find(id); diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc index aee679e4..4b181d0f 100644 --- a/tests/BasicEndToEndTest.cc +++ b/tests/BasicEndToEndTest.cc @@ -230,10 +230,10 @@ TEST(BasicEndToEndTest, testProduceConsume) { // Send synchronously std::string content = "msg-1-content"; Message msg = MessageBuilder().setContent(content).build(); - ASSERT_EQ(MessageId(-1, -1, -1, -1), msg.getMessageId()); + ASSERT_EQ(MessageId::earliest(), msg.getMessageId()); result = producer.send(msg); ASSERT_EQ(ResultOk, result); - ASSERT_NE(MessageId(-1, -1, -1, -1), msg.getMessageId()); + ASSERT_NE(MessageId::earliest(), msg.getMessageId()); Message receivedMsg; consumer.receive(receivedMsg); diff --git a/tests/BatchMessageTest.cc b/tests/BatchMessageTest.cc index 273146ca..e46cb457 100644 --- a/tests/BatchMessageTest.cc +++ b/tests/BatchMessageTest.cc @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -237,7 +238,8 @@ TEST(BatchMessageTest, testBatchSizeInBytes) { std::string expectedMessageContent = prefix + std::to_string(i); LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = " << receivedMsg.getMessageId() << "]"); - ASSERT_LT(pulsar::PulsarFriend::getBatchIndex(receivedMsg.getMessageId()), 2); + ASSERT_LT(receivedMsg.getMessageId().batchIndex(), 2); + ASSERT_EQ(receivedMsg.getMessageId().batchSize(), 2); ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++)); ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString()); ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg)); @@ -970,7 +972,7 @@ TEST(BatchMessageTest, testPraseMessageBatchEntry) { } MessageBatch messageBatch; - MessageId fakeId(0, 5000, 10, -1); + auto fakeId = MessageIdBuilder().ledgerId(5000L).entryId(10L).partition(0).build(); messageBatch.withMessageId(fakeId).parseFrom(payload, static_cast(cases.size())); const std::vector& messages = messageBatch.messages(); diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index a77636ee..a5ef32fe 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -31,6 +31,7 @@ #include "lib/ClientConnection.h" #include "lib/Future.h" #include "lib/LogUtils.h" +#include "lib/MessageIdUtil.h" #include "lib/MultiTopicsConsumerImpl.h" #include "lib/TimeUtils.h" #include "lib/UnAckedMessageTrackerDisabled.h" @@ -683,8 +684,7 @@ TEST(ConsumerTest, testBatchUnAckedMessageTracker) { Message msg; ASSERT_EQ(ResultOk, consumer.receive(msg, 1000)); MessageId msgId = msg.getMessageId(); - MessageId id(msgId.partition(), msgId.ledgerId(), msgId.entryId(), -1); - msgIdInBatchMap[id].emplace_back(msgId); + msgIdInBatchMap[discardBatch(msgId)].emplace_back(msgId); } ASSERT_EQ(batchCount, msgIdInBatchMap.size()); diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc index 61a97144..c68feb78 100644 --- a/tests/MessageChunkingTest.cc +++ b/tests/MessageChunkingTest.cc @@ -121,6 +121,8 @@ TEST_P(MessageChunkingTest, testEndToEnd) { ASSERT_EQ(ResultOk, consumer.receive(msg, 3000)); LOG_INFO("Receive " << msg.getLength() << " bytes from " << msg.getMessageId()); ASSERT_EQ(msg.getDataAsString(), largeMessage); + ASSERT_EQ(msg.getMessageId().batchIndex(), -1); + ASSERT_EQ(msg.getMessageId().batchSize(), 0); receivedMessageIds.emplace_back(msg.getMessageId()); } ASSERT_EQ(receivedMessageIds, sendMessageIds); diff --git a/tests/MessageIdTest.cc b/tests/MessageIdTest.cc index 55257d92..e653aa1c 100644 --- a/tests/MessageIdTest.cc +++ b/tests/MessageIdTest.cc @@ -17,7 +17,7 @@ * under the License. */ #include -#include +#include #include @@ -27,7 +27,7 @@ using namespace pulsar; TEST(MessageIdTest, testSerialization) { - MessageId msgId = PulsarFriend::getMessageId(-1, 1, 2, 3); + auto msgId = MessageIdBuilder().ledgerId(1L).entryId(2L).batchIndex(3L).build(); std::string serialized; msgId.serialize(serialized); @@ -38,10 +38,10 @@ TEST(MessageIdTest, testSerialization) { } TEST(MessageIdTest, testCompareLedgerAndEntryId) { - MessageId id1(-1, 2L, 1L, 0); - MessageId id2(-1, 2L, 1L, 1); - MessageId id3(-1, 2L, 2L, 0); - MessageId id4(-1, 3L, 0L, 0); + auto id1 = MessageIdBuilder().ledgerId(2L).entryId(1L).batchIndex(0).build(); + auto id2 = MessageIdBuilder::from(id1).batchIndex(1).build(); + auto id3 = MessageIdBuilder().ledgerId(2L).entryId(2L).batchIndex(0).build(); + auto id4 = MessageIdBuilder().ledgerId(3L).entryId(0L).batchIndex(0).build(); ASSERT_EQ(compareLedgerAndEntryId(id1, id2), 0); ASSERT_EQ(compareLedgerAndEntryId(id1, id2), 0); diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h index 938b2844..138dbb55 100644 --- a/tests/PulsarFriend.h +++ b/tests/PulsarFriend.h @@ -36,12 +36,6 @@ using std::string; namespace pulsar { class PulsarFriend { public: - static MessageId getMessageId(int32_t partition, int64_t ledgerId, int64_t entryId, int32_t batchIndex) { - return MessageId(partition, ledgerId, entryId, batchIndex); - } - - static int getBatchIndex(const MessageId& mId) { return mId.batchIndex(); } - static ProducerStatsImplPtr getProducerStatsPtr(Producer producer) { ProducerImpl* producerImpl = static_cast(producer.impl_.get()); return std::static_pointer_cast(producerImpl->producerStatsBasePtr_);