From 7ef200bab397f54a5bcff0d1c06b1046d0a4dfb9 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Wed, 9 Nov 2022 10:55:02 +0800 Subject: [PATCH] [feat] Support WaitForExclusive producer access mode. --- include/pulsar/ProducerConfiguration.h | 7 ++++- lib/ClientConnection.cc | 37 ++++++++++++++---------- lib/ClientConnection.h | 1 + lib/HandlerBase.cc | 1 + lib/HandlerBase.h | 3 +- lib/ProducerImpl.cc | 13 ++++++++- tests/ProducerTest.cc | 39 +++++++++++++++++++++++++- 7 files changed, 82 insertions(+), 19 deletions(-) diff --git a/include/pulsar/ProducerConfiguration.h b/include/pulsar/ProducerConfiguration.h index 39ecbe09..873e1383 100644 --- a/include/pulsar/ProducerConfiguration.h +++ b/include/pulsar/ProducerConfiguration.h @@ -89,7 +89,12 @@ class PULSAR_PUBLIC ProducerConfiguration { /** * Require exclusive access for producer. Fail immediately if there's already a producer connected. */ - Exclusive = 1 + Exclusive = 1, + + /** + * Producer creation is pending until it can acquire exclusive access. + */ + WaitForExclusive = 2 }; ProducerConfiguration(); diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index b3df8310..154f0c5d 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -1063,22 +1063,29 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) { PendingRequestsMap::iterator it = pendingRequests_.find(producerSuccess.request_id()); if (it != pendingRequests_.end()) { PendingRequestData requestData = it->second; - pendingRequests_.erase(it); - lock.unlock(); - - ResponseData data; - data.producerName = producerSuccess.producer_name(); - data.lastSequenceId = producerSuccess.last_sequence_id(); - if (producerSuccess.has_schema_version()) { - data.schemaVersion = producerSuccess.schema_version(); - } - if (producerSuccess.has_topic_epoch()) { - data.topicEpoch = Optional::of(producerSuccess.topic_epoch()); + if (!producerSuccess.producer_ready()) { + LOG_INFO(cnxString_ << " Producer " << producerSuccess.producer_name() + << " has been queued up at broker. req_id: " + << producerSuccess.request_id()); + requestData.hasGotResponse->store(true); + lock.unlock(); } else { - data.topicEpoch = Optional::empty(); + pendingRequests_.erase(it); + lock.unlock(); + ResponseData data; + data.producerName = producerSuccess.producer_name(); + data.lastSequenceId = producerSuccess.last_sequence_id(); + if (producerSuccess.has_schema_version()) { + data.schemaVersion = producerSuccess.schema_version(); + } + if (producerSuccess.has_topic_epoch()) { + data.topicEpoch = Optional::of(producerSuccess.topic_epoch()); + } else { + data.topicEpoch = Optional::empty(); + } + requestData.promise.setValue(data); + requestData.timer->cancel(); } - requestData.promise.setValue(data); - requestData.timer->cancel(); } break; } @@ -1481,7 +1488,7 @@ Future ClientConnection::sendRequestWithId(SharedBuffer cm void ClientConnection::handleRequestTimeout(const boost::system::error_code& ec, PendingRequestData pendingRequestData) { - if (!ec) { + if (!ec && !pendingRequestData.hasGotResponse->load()) { pendingRequestData.promise.setFailed(ResultTimeout); } } diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index ad5c3adf..a07e2cd4 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -172,6 +172,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this promise; DeadlineTimerPtr timer; + std::shared_ptr hasGotResponse{std::make_shared(false)}; }; struct LookupRequestData { diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc index 0989eacc..1e13fb1c 100644 --- a/lib/HandlerBase.cc +++ b/lib/HandlerBase.cc @@ -130,6 +130,7 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con case NotStarted: case Closing: case Closed: + case Producer_Fenced: case Failed: LOG_DEBUG(handler->getName() << "Ignoring connection closed event since the handler is not used anymore"); diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h index 4a5df5c7..6faaebd8 100644 --- a/lib/HandlerBase.h +++ b/lib/HandlerBase.h @@ -118,7 +118,8 @@ class HandlerBase { Ready, Closing, Closed, - Failed + Failed, + Producer_Fenced }; std::atomic state_; diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index f3e61204..23b2d96a 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -236,7 +236,15 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId); } - if (producerCreatedPromise_.isComplete()) { + if (result == ResultProducerFenced) { + state_ = Producer_Fenced; + failPendingMessages(result, true); + auto client = client_.lock(); + if (client) { + client->cleanupProducer(this); + } + producerCreatedPromise_.setFailed(result); + } else if (producerCreatedPromise_.isComplete()) { if (result == ResultProducerBlockedQuotaExceededException) { LOG_WARN(getName() << "Backlog is exceeded on topic. Sending exception to producer"); failPendingMessages(ResultProducerBlockedQuotaExceededException, true); @@ -378,6 +386,9 @@ bool ProducerImpl::isValidProducerState(const SendCallback& callback) const { case HandlerBase::Closed: callback(ResultAlreadyClosed, {}); return false; + case HandlerBase::Producer_Fenced: + callback(ResultProducerFenced, {}); + return false; case HandlerBase::NotStarted: case HandlerBase::Failed: default: diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc index 77a79e1a..a0b1e7e2 100644 --- a/tests/ProducerTest.cc +++ b/tests/ProducerTest.cc @@ -275,7 +275,8 @@ TEST(ProducerTest, testChunkingMaxMessageSize) { TEST(ProducerTest, testExclusiveProducer) { Client client(serviceUrl); - std::string topicName = "persistent://public/default/testExclusiveProducer"; + std::string topicName = + "persistent://public/default/testExclusiveProducer" + std::to_string(time(nullptr)); Producer producer1; ProducerConfiguration producerConfiguration1; @@ -296,6 +297,42 @@ TEST(ProducerTest, testExclusiveProducer) { ASSERT_EQ(ResultProducerBusy, client.createProducer(topicName, producerConfiguration3, producer3)); } +TEST(ProducerTest, testWaitForExclusiveProducer) { + Client client(serviceUrl); + + std::string topicName = + "persistent://public/default/testWaitForExclusiveProducer" + std::to_string(time(nullptr)); + + Producer producer1; + ProducerConfiguration producerConfiguration1; + producerConfiguration1.setProducerName("p-name-1"); + producerConfiguration1.setAccessMode(ProducerConfiguration::Exclusive); + + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration1, producer1)); + + ASSERT_EQ(ResultOk, producer1.send(MessageBuilder().setContent("content").build())); + + Producer producer2; + ProducerConfiguration producerConfiguration2; + producerConfiguration2.setProducerName("p-name-2"); + producerConfiguration2.setAccessMode(ProducerConfiguration::WaitForExclusive); + + Latch latch(1); + client.createProducerAsync(topicName, producerConfiguration2, + [&latch, &producer2](Result res, Producer producer) { + ASSERT_EQ(ResultOk, res); + latch.countdown(); + producer2 = producer; + }); + + // when p1 close, p2 success created. + producer1.close(); + latch.wait(); + ASSERT_EQ(ResultOk, producer2.send(MessageBuilder().setContent("content").build())); + + producer2.close(); +} + TEST_P(ProducerTest, testFlushNoBatch) { Client client(serviceUrl);