Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions include/pulsar/ConsumerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <memory>

#include "BatchReceivePolicy.h"
#include "DeadLetterPolicy.h"

namespace pulsar {

Expand Down Expand Up @@ -398,6 +399,20 @@ class PULSAR_PUBLIC ConsumerConfiguration {
*/
const BatchReceivePolicy& getBatchReceivePolicy() const;

/**
* Set dead letter policy.
*
* @param deadLetterPolicy thd default is empty
*/
void setDeadLetterPolicy(const DeadLetterPolicy& deadLetterPolicy);

/**
* Get dead letter policy.
*
* @return dead letter policy
*/
const DeadLetterPolicy& getDeadLetterPolicy() const;

/**
* Set whether the subscription status should be replicated.
* The default value is `false`.
Expand Down
71 changes: 71 additions & 0 deletions include/pulsar/DeadLetterPolicy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef DEAD_LETTER_POLICY_HPP_
#define DEAD_LETTER_POLICY_HPP_

#include <pulsar/defines.h>

#include <memory>
#include <string>

namespace pulsar {

struct DeadLetterPolicyImpl;

/**
* Configuration for the "dead letter queue" feature in consumer.
*
* see @DeadLetterPolicyBuilder
*/
class PULSAR_PUBLIC DeadLetterPolicy {
public:
DeadLetterPolicy();

/**
* Get dead letter topic
*
* @return
*/
std::string getDeadLetterTopic() const;

/**
* Get max redeliver count
*
* @return
*/
int getMaxRedeliverCount() const;

/**
* Get initial subscription name
*
* @return
*/
std::string getInitialSubscriptionName() const;

private:
friend class DeadLetterPolicyBuilder;

typedef std::shared_ptr<DeadLetterPolicyImpl> DeadLetterPolicyImplPtr;
DeadLetterPolicyImplPtr impl_;

explicit DeadLetterPolicy(const DeadLetterPolicyImplPtr& impl);
};
} // namespace pulsar

#endif /* DEAD_LETTER_POLICY_HPP_ */
81 changes: 81 additions & 0 deletions include/pulsar/DeadLetterPolicyBuilder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef DEAD_LETTER_POLICY_BUILD_HPP_
#define DEAD_LETTER_POLICY_BUILD_HPP_

#include <pulsar/DeadLetterPolicy.h>
#include <pulsar/defines.h>

#include <memory>

namespace pulsar {

struct DeadLetterPolicyImpl;

/**
* The builder to build a DeadLetterPolicyBuilder
*
* Example of building DeadLetterPolicy:
*
* ```c++
* DeadLetterPolicy dlqPolicy = DeadLetterPolicyBuilder()
* .deadLetterTopic("dlq-topic")
* .maxRedeliverCount(10)
* .initialSubscriptionName("init-sub-name")
* .build();
* ```
*/
class PULSAR_PUBLIC DeadLetterPolicyBuilder {
public:
DeadLetterPolicyBuilder();

/**
* Set dead letter topic
*
* @return
*/
DeadLetterPolicyBuilder& deadLetterTopic(const std::string& deadLetterTopic);

/**
* Set max redeliver count
*
* @return
*/
DeadLetterPolicyBuilder& maxRedeliverCount(int maxRedeliverCount);

/**
* Set initial subscription name
*
* @return
*/
DeadLetterPolicyBuilder& initialSubscriptionName(const std::string& initialSubscriptionName);

/**
* Build DeadLetterPolicy.
*
* @return
*/
DeadLetterPolicy build();

private:
std::shared_ptr<DeadLetterPolicyImpl> impl_;
};
} // namespace pulsar

#endif /* DEAD_LETTER_POLICY_BUILD_HPP_ */
12 changes: 12 additions & 0 deletions include/pulsar/ProducerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,18 @@ class PULSAR_PUBLIC ProducerConfiguration {
*/
ProducerAccessMode getAccessMode() const;

/**
* Use this configuration to automatically create an initial subscription when creating a topic.
*
* If this field is not set, the initial subscription is not created.
*/
ProducerConfiguration& setInitialSubscriptionName(const std::string& initialSubscriptionName);

/**
* Get initial subscription name.
*/
const std::string& getInitialSubscriptionName() const;

friend class PulsarWrapper;

private:
Expand Down
5 changes: 3 additions & 2 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <pulsar/MessageIdBuilder.h>

#include <boost/optional.hpp>
#include <fstream>

#include "Commands.h"
Expand Down Expand Up @@ -1093,9 +1094,9 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
data.schemaVersion = producerSuccess.schema_version();
}
if (producerSuccess.has_topic_epoch()) {
data.topicEpoch = Optional<uint64_t>::of(producerSuccess.topic_epoch());
data.topicEpoch = boost::make_optional(producerSuccess.topic_epoch());
} else {
data.topicEpoch = Optional<uint64_t>::empty();
data.topicEpoch = boost::none;
}
requestData.promise.setValue(data);
requestData.timer->cancel();
Expand Down
4 changes: 2 additions & 2 deletions lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/strand.hpp>
#include <boost/optional.hpp>
#include <deque>
#include <functional>
#include <memory>
Expand All @@ -40,7 +41,6 @@
#include "LookupDataResult.h"
#include "SharedBuffer.h"
#include "UtilAllocator.h"
#include "Utils.h"

namespace pulsar {

Expand Down Expand Up @@ -83,7 +83,7 @@ struct ResponseData {
std::string producerName;
int64_t lastSequenceId;
std::string schemaVersion;
Optional<uint64_t> topicEpoch;
boost::optional<uint64_t> topicEpoch;
};

typedef std::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
Expand Down
12 changes: 8 additions & 4 deletions lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& authentication,
SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& subscription,
uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType subType,
const std::string& consumerName, SubscriptionMode subscriptionMode,
Optional<MessageId> startMessageId, bool readCompacted,
boost::optional<MessageId> startMessageId, bool readCompacted,
const std::map<std::string, std::string>& metadata,
const std::map<std::string, std::string>& subscriptionProperties,
const SchemaInfo& schemaInfo,
Expand All @@ -323,7 +323,7 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
subscribe->set_allocated_schema(getSchema(schemaInfo));
}

if (startMessageId.is_present()) {
if (startMessageId) {
MessageIdData& messageIdData = *subscribe->mutable_start_message_id();
messageIdData.set_ledgerid(startMessageId.value().ledgerId());
messageIdData.set_entryid(startMessageId.value().entryId());
Expand Down Expand Up @@ -383,7 +383,8 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
const std::map<std::string, std::string>& metadata,
const SchemaInfo& schemaInfo, uint64_t epoch,
bool userProvidedProducerName, bool encrypted,
ProducerAccessMode accessMode, Optional<uint64_t> topicEpoch) {
ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch,
std::string initialSubscriptionName) {
BaseCommand cmd;
cmd.set_type(BaseCommand::PRODUCER);
CommandProducer* producer = cmd.mutable_producer();
Expand All @@ -394,9 +395,12 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
producer->set_user_provided_producer_name(userProvidedProducerName);
producer->set_encrypted(encrypted);
producer->set_producer_access_mode(static_cast<proto::ProducerAccessMode>(accessMode));
if (topicEpoch.is_present()) {
if (topicEpoch) {
producer->set_topic_epoch(topicEpoch.value());
}
if (!initialSubscriptionName.empty()) {
producer->set_initial_subscription_name(initialSubscriptionName);
}

for (std::map<std::string, std::string>::const_iterator it = metadata.begin(); it != metadata.end();
it++) {
Expand Down
23 changes: 11 additions & 12 deletions lib/Commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
#include <pulsar/Schema.h>
#include <pulsar/defines.h>

#include <boost/optional.hpp>
#include <set>

#include "ProtoApiEnums.h"
#include "SharedBuffer.h"
#include "Utils.h"

using namespace pulsar;

Expand Down Expand Up @@ -89,16 +89,14 @@ class Commands {
uint64_t sequenceId, ChecksumType checksumType,
const proto::MessageMetadata& metadata, const SharedBuffer& payload);

static SharedBuffer newSubscribe(const std::string& topic, const std::string& subscription,
uint64_t consumerId, uint64_t requestId,
CommandSubscribe_SubType subType, const std::string& consumerName,
SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId,
bool readCompacted, const std::map<std::string, std::string>& metadata,
const std::map<std::string, std::string>& subscriptionProperties,
const SchemaInfo& schemaInfo,
CommandSubscribe_InitialPosition subscriptionInitialPosition,
bool replicateSubscriptionState, KeySharedPolicy keySharedPolicy,
int priorityLevel = 0);
static SharedBuffer newSubscribe(
const std::string& topic, const std::string& subscription, uint64_t consumerId, uint64_t requestId,
CommandSubscribe_SubType subType, const std::string& consumerName, SubscriptionMode subscriptionMode,
boost::optional<MessageId> startMessageId, bool readCompacted,
const std::map<std::string, std::string>& metadata,
const std::map<std::string, std::string>& subscriptionProperties, const SchemaInfo& schemaInfo,
CommandSubscribe_InitialPosition subscriptionInitialPosition, bool replicateSubscriptionState,
KeySharedPolicy keySharedPolicy, int priorityLevel = 0);

static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId);

Expand All @@ -107,7 +105,8 @@ class Commands {
const std::map<std::string, std::string>& metadata,
const SchemaInfo& schemaInfo, uint64_t epoch,
bool userProvidedProducerName, bool encrypted,
ProducerAccessMode accessMode, Optional<uint64_t> topicEpoch);
ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch,
std::string initialSubscriptionName);

static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t entryId,
CommandAck_AckType ackType, CommandAck_ValidationError validationError);
Expand Down
6 changes: 6 additions & 0 deletions lib/ConsumerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -287,4 +287,10 @@ const BatchReceivePolicy& ConsumerConfiguration::getBatchReceivePolicy() const {
return impl_->batchReceivePolicy;
}

void ConsumerConfiguration::setDeadLetterPolicy(const DeadLetterPolicy& deadLetterPolicy) {
impl_->deadLetterPolicy = deadLetterPolicy;
}

const DeadLetterPolicy& ConsumerConfiguration::getDeadLetterPolicy() const { return impl_->deadLetterPolicy; }

} // namespace pulsar
1 change: 1 addition & 0 deletions lib/ConsumerConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ struct ConsumerConfigurationImpl {
bool readCompacted{false};
InitialPosition subscriptionInitialPosition{InitialPosition::InitialPositionLatest};
BatchReceivePolicy batchReceivePolicy{};
DeadLetterPolicy deadLetterPolicy;
int patternAutoDiscoveryPeriod{60};
bool replicateSubscriptionStateEnabled{false};
std::map<std::string, std::string> properties;
Expand Down
Loading