From 3fadf3b46845162e2d8a4483f45dbb62ee1c80fb Mon Sep 17 00:00:00 2001 From: hangc0276 Date: Sat, 3 Oct 2020 09:59:36 +0800 Subject: [PATCH 1/4] add epoch for C++ client HandleBase to handle create producer timeout --- pulsar-client-cpp/lib/Commands.cc | 5 ++++- pulsar-client-cpp/lib/Commands.h | 2 +- pulsar-client-cpp/lib/HandlerBase.cc | 2 ++ pulsar-client-cpp/lib/HandlerBase.h | 1 + pulsar-client-cpp/lib/Producer.cc | 4 ++++ pulsar-client-cpp/lib/ProducerImpl.cc | 7 ++++++- pulsar-client-cpp/lib/ProducerImpl.h | 1 + pulsar-client-cpp/tests/ProducerTest.cc | 24 ++++++++++++++++++++++++ 8 files changed, 43 insertions(+), 3 deletions(-) diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc index 2a85b3704fd79..69de82a3cf445 100644 --- a/pulsar-client-cpp/lib/Commands.cc +++ b/pulsar-client-cpp/lib/Commands.cc @@ -304,13 +304,16 @@ SharedBuffer Commands::newUnsubscribe(uint64_t consumerId, uint64_t requestId) { SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId, const std::string& producerName, uint64_t requestId, const std::map& metadata, - const SchemaInfo& schemaInfo) { + const SchemaInfo& schemaInfo, uint64_t epoch, bool userProvidedProducerName) { BaseCommand cmd; cmd.set_type(BaseCommand::PRODUCER); CommandProducer* producer = cmd.mutable_producer(); producer->set_topic(topic); producer->set_producer_id(producerId); producer->set_request_id(requestId); + producer->set_epoch(epoch); + producer->set_user_provided_producer_name(userProvidedProducerName); + for (std::map::const_iterator it = metadata.begin(); it != metadata.end(); it++) { proto::KeyValue* keyValue = proto::KeyValue().New(); diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h index 6ece1536c7812..55d764ccc305c 100644 --- a/pulsar-client-cpp/lib/Commands.h +++ b/pulsar-client-cpp/lib/Commands.h @@ -93,7 +93,7 @@ class Commands { static SharedBuffer newProducer(const std::string& topic, uint64_t producerId, const std::string& producerName, uint64_t requestId, const std::map& metadata, - const SchemaInfo& schemaInfo); + const SchemaInfo& schemaInfo, uint64_t epoch, bool userProvidedProducerName); static SharedBuffer newAck(uint64_t consumerId, const proto::MessageIdData& messageId, proto::CommandAck_AckType ackType, int validationError); diff --git a/pulsar-client-cpp/lib/HandlerBase.cc b/pulsar-client-cpp/lib/HandlerBase.cc index 60cdc89c89fef..9ecedcaa7b9c9 100644 --- a/pulsar-client-cpp/lib/HandlerBase.cc +++ b/pulsar-client-cpp/lib/HandlerBase.cc @@ -36,6 +36,7 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, operationTimeut_(seconds(client->conf().getOperationTimeoutSeconds())), state_(Pending), backoff_(backoff), + epoch_(0), timer_(client->getIOExecutorProvider()->get()->createDeadlineTimer()) {} HandlerBase::~HandlerBase() { timer_->cancel(); } @@ -140,6 +141,7 @@ void HandlerBase::handleTimeout(const boost::system::error_code& ec, HandlerBase LOG_DEBUG(handler->getName() << "Ignoring timer cancelled event, code[" << ec << "]"); return; } else { + handler->epoch_++; handler->grabCnx(); } } diff --git a/pulsar-client-cpp/lib/HandlerBase.h b/pulsar-client-cpp/lib/HandlerBase.h index 26c428e18e1c1..7f232013cf7be 100644 --- a/pulsar-client-cpp/lib/HandlerBase.h +++ b/pulsar-client-cpp/lib/HandlerBase.h @@ -105,6 +105,7 @@ class HandlerBase { State state_; Backoff backoff_; + uint64_t epoch_; private: DeadlineTimerPtr timer_; diff --git a/pulsar-client-cpp/lib/Producer.cc b/pulsar-client-cpp/lib/Producer.cc index 2c34d3c74c243..27d84acc266c1 100644 --- a/pulsar-client-cpp/lib/Producer.cc +++ b/pulsar-client-cpp/lib/Producer.cc @@ -106,4 +106,8 @@ void Producer::producerFailMessages(Result result) { producerImpl->failPendingMessages(result); } } + +HandlerBase Producer::getHandlerBase() { + if (impl_) +} } // namespace pulsar diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index b4692e92557ca..51bae92955623 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -52,6 +52,7 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const pendingMessagesQueue_(conf_.getMaxPendingMessages()), partition_(partition), producerName_(conf_.getProducerName()), + userProvidedProducerName_(false), producerStr_("[" + topic_ + ", " + producerName_ + "] "), producerId_(client->newProducerId()), msgSequenceGenerator_(0), @@ -63,6 +64,10 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const lastSequenceIdPublished_ = initialSequenceId; msgSequenceGenerator_ = initialSequenceId + 1; + if (!producerName_.empty()) { + userProvidedProducerName_ = true; + } + unsigned int statsIntervalInSeconds = client->getClientConfig().getStatsIntervalInSeconds(); if (statsIntervalInSeconds) { producerStatsBasePtr_ = @@ -138,7 +143,7 @@ void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) { int requestId = client->newRequestId(); SharedBuffer cmd = Commands::newProducer(topic_, producerId_, producerName_, requestId, - conf_.getProperties(), conf_.getSchema()); + conf_.getProperties(), conf_.getSchema() , epoch_, userProvidedProducerName_); cnx->sendRequestWithId(cmd, requestId) .addListener(std::bind(&ProducerImpl::handleCreateProducer, shared_from_this(), cnx, std::placeholders::_1, std::placeholders::_2)); diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h index 1293fb1ecf8a7..e2346ef0de02f 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.h +++ b/pulsar-client-cpp/lib/ProducerImpl.h @@ -141,6 +141,7 @@ class ProducerImpl : public HandlerBase, int32_t partition_; // -1 if topic is non-partitioned std::string producerName_; + bool userProvidedProducerName_; std::string producerStr_; uint64_t producerId_; int64_t msgSequenceGenerator_; diff --git a/pulsar-client-cpp/tests/ProducerTest.cc b/pulsar-client-cpp/tests/ProducerTest.cc index c440c2f20d1a0..7528258541c64 100644 --- a/pulsar-client-cpp/tests/ProducerTest.cc +++ b/pulsar-client-cpp/tests/ProducerTest.cc @@ -18,12 +18,15 @@ */ #include #include +#include #include "../lib/Future.h" #include "../lib/Utils.h" using namespace pulsar; +static std::string serviceUrl = "pulsar://localhost:6650"; + TEST(ProducerTest, producerNotInitialized) { Producer producer; @@ -48,3 +51,24 @@ TEST(ProducerTest, producerNotInitialized) { ASSERT_TRUE(producer.getTopic().empty()); } + +TEST(ProducerTest, exactlyOnceWithProducerNameSpecified) { + Client client(serviceUrl); + + std::string topicName = "persistent://public/default/exactlyOnceWithProducerNameSpecified"; + + Producer producer1; + ProducerConfiguration producerConfiguration1; + producerConfiguration1.setProducerName("p-name-1"); + + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration1, producer1)); + + Producer producer2; + ProducerConfiguration producerConfiguration2; + producerConfiguration2.setProducerName("p-name-2"); + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration2, producer2)); + + Producer producer3; + Result result = client.createProducer(topicName, producerConfiguration2, producer3); + ASSERT_EQ(ResultProducerBusy, result); +} From 20c7326a3890fc9443849e248b23fe61eabaaf07 Mon Sep 17 00:00:00 2001 From: hangc0276 Date: Sat, 3 Oct 2020 10:03:13 +0800 Subject: [PATCH 2/4] format code --- pulsar-client-cpp/tests/ProducerTest.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-client-cpp/tests/ProducerTest.cc b/pulsar-client-cpp/tests/ProducerTest.cc index 7528258541c64..676935345ca07 100644 --- a/pulsar-client-cpp/tests/ProducerTest.cc +++ b/pulsar-client-cpp/tests/ProducerTest.cc @@ -18,7 +18,6 @@ */ #include #include -#include #include "../lib/Future.h" #include "../lib/Utils.h" From 566325f76f403bdba6570c156ceeabc2c9dc3e21 Mon Sep 17 00:00:00 2001 From: hangc0276 Date: Sat, 3 Oct 2020 11:50:14 +0800 Subject: [PATCH 3/4] format code --- pulsar-client-cpp/lib/Commands.cc | 3 ++- pulsar-client-cpp/lib/Commands.h | 3 ++- pulsar-client-cpp/lib/Producer.cc | 3 --- pulsar-client-cpp/lib/ProducerImpl.cc | 3 ++- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc index 69de82a3cf445..232790615f963 100644 --- a/pulsar-client-cpp/lib/Commands.cc +++ b/pulsar-client-cpp/lib/Commands.cc @@ -304,7 +304,8 @@ SharedBuffer Commands::newUnsubscribe(uint64_t consumerId, uint64_t requestId) { SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId, const std::string& producerName, uint64_t requestId, const std::map& metadata, - const SchemaInfo& schemaInfo, uint64_t epoch, bool userProvidedProducerName) { + const SchemaInfo& schemaInfo, uint64_t epoch, + bool userProvidedProducerName) { BaseCommand cmd; cmd.set_type(BaseCommand::PRODUCER); CommandProducer* producer = cmd.mutable_producer(); diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h index 55d764ccc305c..429e04674f183 100644 --- a/pulsar-client-cpp/lib/Commands.h +++ b/pulsar-client-cpp/lib/Commands.h @@ -93,7 +93,8 @@ class Commands { static SharedBuffer newProducer(const std::string& topic, uint64_t producerId, const std::string& producerName, uint64_t requestId, const std::map& metadata, - const SchemaInfo& schemaInfo, uint64_t epoch, bool userProvidedProducerName); + const SchemaInfo& schemaInfo, uint64_t epoch, + bool userProvidedProducerName); static SharedBuffer newAck(uint64_t consumerId, const proto::MessageIdData& messageId, proto::CommandAck_AckType ackType, int validationError); diff --git a/pulsar-client-cpp/lib/Producer.cc b/pulsar-client-cpp/lib/Producer.cc index 27d84acc266c1..7c7e53ca4707f 100644 --- a/pulsar-client-cpp/lib/Producer.cc +++ b/pulsar-client-cpp/lib/Producer.cc @@ -107,7 +107,4 @@ void Producer::producerFailMessages(Result result) { } } -HandlerBase Producer::getHandlerBase() { - if (impl_) -} } // namespace pulsar diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index 51bae92955623..ecf0a6cc8296a 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -143,7 +143,8 @@ void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) { int requestId = client->newRequestId(); SharedBuffer cmd = Commands::newProducer(topic_, producerId_, producerName_, requestId, - conf_.getProperties(), conf_.getSchema() , epoch_, userProvidedProducerName_); + conf_.getProperties(), + conf_.getSchema(), epoch_, userProvidedProducerName_); cnx->sendRequestWithId(cmd, requestId) .addListener(std::bind(&ProducerImpl::handleCreateProducer, shared_from_this(), cnx, std::placeholders::_1, std::placeholders::_2)); From 591c1768b75426b35d35a969c3a66c11f62ffde5 Mon Sep 17 00:00:00 2001 From: hangc0276 Date: Sat, 3 Oct 2020 16:23:05 +0800 Subject: [PATCH 4/4] fformat code --- pulsar-client-cpp/lib/ProducerImpl.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index ecf0a6cc8296a..fe31ed5055206 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -142,9 +142,9 @@ void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) { ClientImplPtr client = client_.lock(); int requestId = client->newRequestId(); - SharedBuffer cmd = Commands::newProducer(topic_, producerId_, producerName_, requestId, - conf_.getProperties(), - conf_.getSchema(), epoch_, userProvidedProducerName_); + SharedBuffer cmd = + Commands::newProducer(topic_, producerId_, producerName_, requestId, conf_.getProperties(), + conf_.getSchema(), epoch_, userProvidedProducerName_); cnx->sendRequestWithId(cmd, requestId) .addListener(std::bind(&ProducerImpl::handleCreateProducer, shared_from_this(), cnx, std::placeholders::_1, std::placeholders::_2));