Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion include/pulsar/ProducerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,12 @@ class PULSAR_PUBLIC ProducerConfiguration {
/**
* Require exclusive access for producer. Fail immediately if there's already a producer connected.
*/
Exclusive = 1
Exclusive = 1,

/**
* Producer creation is pending until it can acquire exclusive access.
*/
WaitForExclusive = 2
};

ProducerConfiguration();
Expand Down
37 changes: 22 additions & 15 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1063,22 +1063,29 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
PendingRequestsMap::iterator it = pendingRequests_.find(producerSuccess.request_id());
if (it != pendingRequests_.end()) {
PendingRequestData requestData = it->second;
pendingRequests_.erase(it);
lock.unlock();

ResponseData data;
data.producerName = producerSuccess.producer_name();
data.lastSequenceId = producerSuccess.last_sequence_id();
if (producerSuccess.has_schema_version()) {
data.schemaVersion = producerSuccess.schema_version();
}
if (producerSuccess.has_topic_epoch()) {
data.topicEpoch = Optional<uint64_t>::of(producerSuccess.topic_epoch());
if (!producerSuccess.producer_ready()) {
LOG_INFO(cnxString_ << " Producer " << producerSuccess.producer_name()
<< " has been queued up at broker. req_id: "
<< producerSuccess.request_id());
requestData.hasGotResponse->store(true);
lock.unlock();
} else {
data.topicEpoch = Optional<uint64_t>::empty();
pendingRequests_.erase(it);
lock.unlock();
ResponseData data;
data.producerName = producerSuccess.producer_name();
data.lastSequenceId = producerSuccess.last_sequence_id();
if (producerSuccess.has_schema_version()) {
data.schemaVersion = producerSuccess.schema_version();
}
if (producerSuccess.has_topic_epoch()) {
data.topicEpoch = Optional<uint64_t>::of(producerSuccess.topic_epoch());
} else {
data.topicEpoch = Optional<uint64_t>::empty();
}
requestData.promise.setValue(data);
requestData.timer->cancel();
}
requestData.promise.setValue(data);
requestData.timer->cancel();
}
break;
}
Expand Down Expand Up @@ -1481,7 +1488,7 @@ Future<Result, ResponseData> ClientConnection::sendRequestWithId(SharedBuffer cm

void ClientConnection::handleRequestTimeout(const boost::system::error_code& ec,
PendingRequestData pendingRequestData) {
if (!ec) {
if (!ec && !pendingRequestData.hasGotResponse->load()) {
pendingRequestData.promise.setFailed(ResultTimeout);
}
}
Expand Down
1 change: 1 addition & 0 deletions lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
struct PendingRequestData {
Promise<Result, ResponseData> promise;
DeadlineTimerPtr timer;
std::shared_ptr<std::atomic_bool> hasGotResponse{std::make_shared<std::atomic_bool>(false)};
};

struct LookupRequestData {
Expand Down
1 change: 1 addition & 0 deletions lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con
case NotStarted:
case Closing:
case Closed:
case Producer_Fenced:
case Failed:
LOG_DEBUG(handler->getName()
<< "Ignoring connection closed event since the handler is not used anymore");
Expand Down
3 changes: 2 additions & 1 deletion lib/HandlerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ class HandlerBase {
Ready,
Closing,
Closed,
Failed
Failed,
Producer_Fenced
};

std::atomic<State> state_;
Expand Down
13 changes: 12 additions & 1 deletion lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,15 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
}

if (producerCreatedPromise_.isComplete()) {
if (result == ResultProducerFenced) {
state_ = Producer_Fenced;
failPendingMessages(result, true);
auto client = client_.lock();
if (client) {
client->cleanupProducer(this);
}
producerCreatedPromise_.setFailed(result);
} else if (producerCreatedPromise_.isComplete()) {
if (result == ResultProducerBlockedQuotaExceededException) {
LOG_WARN(getName() << "Backlog is exceeded on topic. Sending exception to producer");
failPendingMessages(ResultProducerBlockedQuotaExceededException, true);
Expand Down Expand Up @@ -378,6 +386,9 @@ bool ProducerImpl::isValidProducerState(const SendCallback& callback) const {
case HandlerBase::Closed:
callback(ResultAlreadyClosed, {});
return false;
case HandlerBase::Producer_Fenced:
callback(ResultProducerFenced, {});
return false;
case HandlerBase::NotStarted:
case HandlerBase::Failed:
default:
Expand Down
39 changes: 38 additions & 1 deletion tests/ProducerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ TEST(ProducerTest, testChunkingMaxMessageSize) {
TEST(ProducerTest, testExclusiveProducer) {
Client client(serviceUrl);

std::string topicName = "persistent://public/default/testExclusiveProducer";
std::string topicName =
"persistent://public/default/testExclusiveProducer" + std::to_string(time(nullptr));

Producer producer1;
ProducerConfiguration producerConfiguration1;
Expand All @@ -296,6 +297,42 @@ TEST(ProducerTest, testExclusiveProducer) {
ASSERT_EQ(ResultProducerBusy, client.createProducer(topicName, producerConfiguration3, producer3));
}

TEST(ProducerTest, testWaitForExclusiveProducer) {
Client client(serviceUrl);

std::string topicName =
"persistent://public/default/testWaitForExclusiveProducer" + std::to_string(time(nullptr));

Producer producer1;
ProducerConfiguration producerConfiguration1;
producerConfiguration1.setProducerName("p-name-1");
producerConfiguration1.setAccessMode(ProducerConfiguration::Exclusive);

ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration1, producer1));

ASSERT_EQ(ResultOk, producer1.send(MessageBuilder().setContent("content").build()));

Producer producer2;
ProducerConfiguration producerConfiguration2;
producerConfiguration2.setProducerName("p-name-2");
producerConfiguration2.setAccessMode(ProducerConfiguration::WaitForExclusive);

Latch latch(1);
client.createProducerAsync(topicName, producerConfiguration2,
[&latch, &producer2](Result res, Producer producer) {
ASSERT_EQ(ResultOk, res);
latch.countdown();
producer2 = producer;
});

// when p1 close, p2 success created.
producer1.close();
latch.wait();
ASSERT_EQ(ResultOk, producer2.send(MessageBuilder().setContent("content").build()));

producer2.close();
}

TEST_P(ProducerTest, testFlushNoBatch) {
Client client(serviceUrl);

Expand Down