Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pulsar-client-cpp/lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -304,13 +304,17 @@ SharedBuffer Commands::newUnsubscribe(uint64_t consumerId, uint64_t requestId) {
SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId,
const std::string& producerName, uint64_t requestId,
const std::map<std::string, std::string>& metadata,
const SchemaInfo& schemaInfo) {
const SchemaInfo& schemaInfo, uint64_t epoch,
bool userProvidedProducerName) {
BaseCommand cmd;
cmd.set_type(BaseCommand::PRODUCER);
CommandProducer* producer = cmd.mutable_producer();
producer->set_topic(topic);
producer->set_producer_id(producerId);
producer->set_request_id(requestId);
producer->set_epoch(epoch);
producer->set_user_provided_producer_name(userProvidedProducerName);

for (std::map<std::string, std::string>::const_iterator it = metadata.begin(); it != metadata.end();
it++) {
proto::KeyValue* keyValue = proto::KeyValue().New();
Expand Down
3 changes: 2 additions & 1 deletion pulsar-client-cpp/lib/Commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ class Commands {
static SharedBuffer newProducer(const std::string& topic, uint64_t producerId,
const std::string& producerName, uint64_t requestId,
const std::map<std::string, std::string>& metadata,
const SchemaInfo& schemaInfo);
const SchemaInfo& schemaInfo, uint64_t epoch,
bool userProvidedProducerName);

static SharedBuffer newAck(uint64_t consumerId, const proto::MessageIdData& messageId,
proto::CommandAck_AckType ackType, int validationError);
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
operationTimeut_(seconds(client->conf().getOperationTimeoutSeconds())),
state_(Pending),
backoff_(backoff),
epoch_(0),
timer_(client->getIOExecutorProvider()->get()->createDeadlineTimer()) {}

HandlerBase::~HandlerBase() { timer_->cancel(); }
Expand Down Expand Up @@ -140,6 +141,7 @@ void HandlerBase::handleTimeout(const boost::system::error_code& ec, HandlerBase
LOG_DEBUG(handler->getName() << "Ignoring timer cancelled event, code[" << ec << "]");
return;
} else {
handler->epoch_++;
handler->grabCnx();
}
}
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/HandlerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class HandlerBase {

State state_;
Backoff backoff_;
uint64_t epoch_;

private:
DeadlineTimerPtr timer_;
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/Producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,5 @@ void Producer::producerFailMessages(Result result) {
producerImpl->failPendingMessages(result);
}
}

} // namespace pulsar
10 changes: 8 additions & 2 deletions pulsar-client-cpp/lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const
pendingMessagesQueue_(conf_.getMaxPendingMessages()),
partition_(partition),
producerName_(conf_.getProducerName()),
userProvidedProducerName_(false),
producerStr_("[" + topic_ + ", " + producerName_ + "] "),
producerId_(client->newProducerId()),
msgSequenceGenerator_(0),
Expand All @@ -63,6 +64,10 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const
lastSequenceIdPublished_ = initialSequenceId;
msgSequenceGenerator_ = initialSequenceId + 1;

if (!producerName_.empty()) {
userProvidedProducerName_ = true;
}

unsigned int statsIntervalInSeconds = client->getClientConfig().getStatsIntervalInSeconds();
if (statsIntervalInSeconds) {
producerStatsBasePtr_ =
Expand Down Expand Up @@ -137,8 +142,9 @@ void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
ClientImplPtr client = client_.lock();
int requestId = client->newRequestId();

SharedBuffer cmd = Commands::newProducer(topic_, producerId_, producerName_, requestId,
conf_.getProperties(), conf_.getSchema());
SharedBuffer cmd =
Commands::newProducer(topic_, producerId_, producerName_, requestId, conf_.getProperties(),
conf_.getSchema(), epoch_, userProvidedProducerName_);
cnx->sendRequestWithId(cmd, requestId)
.addListener(std::bind(&ProducerImpl::handleCreateProducer, shared_from_this(), cnx,
std::placeholders::_1, std::placeholders::_2));
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ class ProducerImpl : public HandlerBase,

int32_t partition_; // -1 if topic is non-partitioned
std::string producerName_;
bool userProvidedProducerName_;
std::string producerStr_;
uint64_t producerId_;
int64_t msgSequenceGenerator_;
Expand Down
23 changes: 23 additions & 0 deletions pulsar-client-cpp/tests/ProducerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

using namespace pulsar;

static std::string serviceUrl = "pulsar://localhost:6650";

TEST(ProducerTest, producerNotInitialized) {
Producer producer;

Expand All @@ -48,3 +50,24 @@ TEST(ProducerTest, producerNotInitialized) {

ASSERT_TRUE(producer.getTopic().empty());
}

TEST(ProducerTest, exactlyOnceWithProducerNameSpecified) {
Client client(serviceUrl);

std::string topicName = "persistent://public/default/exactlyOnceWithProducerNameSpecified";

Producer producer1;
ProducerConfiguration producerConfiguration1;
producerConfiguration1.setProducerName("p-name-1");

ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration1, producer1));

Producer producer2;
ProducerConfiguration producerConfiguration2;
producerConfiguration2.setProducerName("p-name-2");
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration2, producer2));

Producer producer3;
Result result = client.createProducer(topicName, producerConfiguration2, producer3);
ASSERT_EQ(ResultProducerBusy, result);
}