Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/pulsar/MessageId.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ class PULSAR_PUBLIC MessageId {
MessageId();

/**
* @deprecated
*
* Construct the MessageId
*
* NOTE: This API still exists for backward compatibility, use MessageIdBuilder instead.
*
* @param partition the partition number of a topic
* @param ledgerId the ledger id
* @param entryId the entry id
Expand Down Expand Up @@ -88,6 +92,7 @@ class PULSAR_PUBLIC MessageId {
int64_t entryId() const;
int32_t batchIndex() const;
int32_t partition() const;
int32_t batchSize() const;

private:
friend class ConsumerImpl;
Expand All @@ -102,11 +107,14 @@ class PULSAR_PUBLIC MessageId {
friend class PulsarWrapper;
friend class PulsarFriend;
friend class NegativeAcksTracker;
friend class MessageIdBuilder;

friend PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const MessageId& messageId);

typedef std::shared_ptr<MessageIdImpl> MessageIdImplPtr;
MessageIdImplPtr impl_;

explicit MessageId(const MessageIdImplPtr& impl);
};

typedef std::vector<MessageId> MessageIdList;
Expand Down
115 changes: 115 additions & 0 deletions include/pulsar/MessageIdBuilder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once

#include <pulsar/MessageId.h>

#include <memory>

namespace pulsar {

namespace proto {
class MessageIdData;
}

/**
* The builder to build a MessageId.
*
* Example of building a single MessageId:
*
* ```c++
* MessageId msgId = MessageIdBuilder()
* .ledgerId(0L)
* .entryId(0L)
* .build();
* ```
*
* Example of building a batched MessageId:
*
* ```c++
* MessageId msgId = MessageIdBuilder()
* .ledgerId(0L)
* .entryId(0L)
* .batchIndex(0)
* .batchSize(2)
* .build();
* ```
*/
class PULSAR_PUBLIC MessageIdBuilder {
public:
explicit MessageIdBuilder();

/**
* Create an instance that copies the data from messageId.
*/
static MessageIdBuilder from(const MessageId& messageId);

/**
* Create an instance from the proto::MessageIdData instance.
*
* @note It's an internal API that converts the MessageIdData defined by PulsarApi.proto
* @see https://github.com/apache/pulsar-client-cpp/blob/main/proto/PulsarApi.proto
*/
static MessageIdBuilder from(const proto::MessageIdData& messageIdData);

/**
* Build a MessageId.
*/
MessageId build() const;

/**
* Set the ledger ID field.
*
* Default: -1L
*/
MessageIdBuilder& ledgerId(int64_t ledgerId);

/**
* Set the entry ID field.
*
* Default: -1L
*/
MessageIdBuilder& entryId(int64_t entryId);

/**
* Set the partition index.
*
* Default: -1
*/
MessageIdBuilder& partition(int32_t partition);

/**
* Set the batch index.
*
* Default: -1
*/
MessageIdBuilder& batchIndex(int32_t batchIndex);

/**
* Set the batch size.
*
* Default: 0
*/
MessageIdBuilder& batchSize(int32_t batchSize);

private:
std::shared_ptr<MessageIdImpl> impl_;
};

} // namespace pulsar
11 changes: 4 additions & 7 deletions lib/BatchAcknowledgementTracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "BatchAcknowledgementTracker.h"

#include "LogUtils.h"
#include "MessageIdUtil.h"
#include "MessageImpl.h"

namespace pulsar {
Expand Down Expand Up @@ -71,8 +72,7 @@ void BatchAcknowledgementTracker::deleteAckedMessage(const MessageId& messageId,
return;
}

MessageId batchMessageId =
MessageId(messageId.partition(), messageId.ledgerId(), messageId.entryId(), -1 /* Batch index */);
auto batchMessageId = discardBatch(messageId);

Lock lock(mutex_);
if (ackType == CommandAck_AckType_Cumulative) {
Expand Down Expand Up @@ -114,9 +114,7 @@ void BatchAcknowledgementTracker::deleteAckedMessage(const MessageId& messageId,

bool BatchAcknowledgementTracker::isBatchReady(const MessageId& msgID, CommandAck_AckType ackType) {
Lock lock(mutex_);
// Remove batch index
MessageId batchMessageId =
MessageId(msgID.partition(), msgID.ledgerId(), msgID.entryId(), -1 /* Batch index */);
auto batchMessageId = discardBatch(msgID);

TrackerMap::iterator pos = trackerMap_.find(batchMessageId);
if (pos == trackerMap_.end() ||
Expand Down Expand Up @@ -154,8 +152,7 @@ const MessageId BatchAcknowledgementTracker::getGreatestCumulativeAckReady(const
Lock lock(mutex_);

// Remove batch index
MessageId batchMessageId =
MessageId(messageId.partition(), messageId.ledgerId(), messageId.entryId(), -1 /* Batch index */);
auto batchMessageId = discardBatch(messageId);
TrackerMap::iterator pos = trackerMap_.find(batchMessageId);

// element not found
Expand Down
8 changes: 4 additions & 4 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
#include "ClientConnection.h"

#include <pulsar/MessageIdBuilder.h>

#include <fstream>

#include "Commands.h"
Expand All @@ -43,8 +45,7 @@ static const uint32_t DefaultBufferSize = 64 * 1024;
static const int KeepAliveIntervalInSeconds = 30;

static MessageId toMessageId(const proto::MessageIdData& messageIdData) {
return MessageId{messageIdData.partition(), static_cast<int64_t>(messageIdData.ledgerid()),
static_cast<int64_t>(messageIdData.entryid()), messageIdData.batch_index()};
return MessageIdBuilder::from(messageIdData).build();
}

// Convert error codes from protobuf to client API Result
Expand Down Expand Up @@ -830,8 +831,7 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
int producerId = sendReceipt.producer_id();
uint64_t sequenceId = sendReceipt.sequence_id();
const proto::MessageIdData& messageIdData = sendReceipt.message_id();
MessageId messageId = MessageId(messageIdData.partition(), messageIdData.ledgerid(),
messageIdData.entryid(), messageIdData.batch_index());
auto messageId = toMessageId(messageIdData);

LOG_DEBUG(cnxString_ << "Got receipt for producer: " << producerId
<< " -- msg: " << sequenceId << "-- message id: " << messageId);
Expand Down
6 changes: 4 additions & 2 deletions lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "Commands.h"

#include <pulsar/MessageBuilder.h>
#include <pulsar/MessageIdBuilder.h>
#include <pulsar/Schema.h>
#include <pulsar/Version.h>

Expand Down Expand Up @@ -807,7 +808,8 @@ uint64_t Commands::serializeSingleMessageInBatchWithPayload(const Message& msg,
return msgMetadata.sequence_id();
}

Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex) {
Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex,
int32_t batchSize) {
SharedBuffer& uncompressedPayload = batchedMessage.impl_->payload;

// Format of batch message
Expand All @@ -825,7 +827,7 @@ Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32
uncompressedPayload.consume(payloadSize);

const MessageId& m = batchedMessage.impl_->messageId;
MessageId singleMessageId(m.partition(), m.ledgerId(), m.entryId(), batchIndex);
auto singleMessageId = MessageIdBuilder::from(m).batchIndex(batchIndex).batchSize(batchSize).build();
Message singleMessage(singleMessageId, batchedMessage.impl_->metadata, payload, metadata,
batchedMessage.impl_->getTopicName());
singleMessage.impl_->cnx_ = batchedMessage.impl_->cnx_;
Expand Down
3 changes: 2 additions & 1 deletion lib/Commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ class Commands {
static PULSAR_PUBLIC uint64_t serializeSingleMessageInBatchWithPayload(
const Message& msg, SharedBuffer& batchPayLoad, unsigned long maxMessageSizeInBytes);

static Message deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex);
static Message deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex,
int32_t batchSize);

static SharedBuffer newConsumerStats(uint64_t consumerId, uint64_t requestId);

Expand Down
25 changes: 15 additions & 10 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
#include "ConsumerImpl.h"

#include <pulsar/MessageIdBuilder.h>

#include <algorithm>

#include "AckGroupingTracker.h"
Expand Down Expand Up @@ -424,8 +426,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
// Only a non-batched messages can be a chunk
if (!metadata.has_num_messages_in_batch() && isChunkedMessage) {
const auto& messageIdData = msg.message_id();
MessageId messageId(messageIdData.partition(), messageIdData.ledgerid(), messageIdData.entryid(),
messageIdData.batch_index());
auto messageId = MessageIdBuilder::from(messageIdData).build();
auto optionalPayload = processMessageChunk(payload, metadata, messageId, messageIdData, cnx);
if (optionalPayload.is_present()) {
payload = optionalPayload.value();
Expand Down Expand Up @@ -582,7 +583,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection

for (int i = 0; i < batchSize; i++) {
// This is a cheap copy since message contains only one shared pointer (impl_)
Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i);
Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i, batchSize);
msg.impl_->setRedeliveryCount(redeliveryCount);
msg.impl_->setTopicName(batchedMessage.getTopicName());
msg.impl_->convertPayloadToKeyValue(config_.getSchema());
Expand Down Expand Up @@ -882,13 +883,17 @@ Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
if (incomingMessages_.peekAndClear(nextMessageInQueue)) {
// There was at least one message pending in the queue
const MessageId& nextMessageId = nextMessageInQueue.getMessageId();
MessageId previousMessageId;
if (nextMessageId.batchIndex() >= 0) {
previousMessageId = MessageId(-1, nextMessageId.ledgerId(), nextMessageId.entryId(),
nextMessageId.batchIndex() - 1);
} else {
previousMessageId = MessageId(-1, nextMessageId.ledgerId(), nextMessageId.entryId() - 1, -1);
}
auto previousMessageId = (nextMessageId.batchIndex() >= 0)
? MessageIdBuilder()
.ledgerId(nextMessageId.ledgerId())
.entryId(nextMessageId.entryId())
.batchIndex(nextMessageId.batchIndex() - 1)
.batchSize(nextMessageId.batchSize())
.build()
: MessageIdBuilder()
.ledgerId(nextMessageId.ledgerId())
.entryId(nextMessageId.entryId() - 1)
.build();
return Optional<MessageId>::of(previousMessageId);
} else if (lastDequedMessageId_ != MessageId::earliest()) {
// If the queue was empty we need to restart from the message just after the last one that has been
Expand Down
5 changes: 2 additions & 3 deletions lib/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
#include <pulsar/Message.h>
#include <pulsar/MessageBuilder.h>
#include <pulsar/MessageIdBuilder.h>
#include <pulsar/defines.h>

#include <iostream>
Expand Down Expand Up @@ -63,9 +64,7 @@ Message::Message(MessageImplPtr& impl) : impl_(impl) {}
Message::Message(const proto::CommandMessage& msg, proto::MessageMetadata& metadata, SharedBuffer& payload,
int32_t partition)
: impl_(std::make_shared<MessageImpl>()) {
impl_->messageId =
MessageId(partition, msg.message_id().ledgerid(), msg.message_id().entryid(), /* batchId */
-1);
impl_->messageId = MessageIdBuilder::from(msg.message_id()).batchIndex(-1).build();
impl_->metadata = metadata;
impl_->payload = payload;
}
Expand Down
5 changes: 3 additions & 2 deletions lib/MessageAndCallbackBatch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
#include "MessageAndCallbackBatch.h"

#include <pulsar/MessageIdBuilder.h>

#include "ClientConnection.h"
#include "Commands.h"
#include "LogUtils.h"
Expand Down Expand Up @@ -54,8 +56,7 @@ static void completeSendCallbacks(const std::vector<SendCallback>& callbacks, Re
int32_t numOfMessages = static_cast<int32_t>(callbacks.size());
LOG_DEBUG("Batch complete [Result = " << result << "] [numOfMessages = " << numOfMessages << "]");
for (int32_t i = 0; i < numOfMessages; i++) {
MessageId idInBatch(id.partition(), id.ledgerId(), id.entryId(), i);
callbacks[i](result, idInBatch);
callbacks[i](result, MessageIdBuilder::from(id).batchIndex(i).batchSize(numOfMessages).build());
}
}

Expand Down
2 changes: 1 addition & 1 deletion lib/MessageBatch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ MessageBatch& MessageBatch::parseFrom(const SharedBuffer& payload, uint32_t batc
batch_.clear();

for (int i = 0; i < batchSize; ++i) {
batch_.push_back(Commands::deSerializeSingleMessageInBatch(batchMessage_, i));
batch_.push_back(Commands::deSerializeSingleMessageInBatch(batchMessage_, i, batchSize));
}
return *this;
}
Expand Down
Loading