diff --git a/include/pulsar/ProducerConfiguration.h b/include/pulsar/ProducerConfiguration.h index 67550cfa..7defcd11 100644 --- a/include/pulsar/ProducerConfiguration.h +++ b/include/pulsar/ProducerConfiguration.h @@ -94,7 +94,13 @@ class PULSAR_PUBLIC ProducerConfiguration { /** * Producer creation is pending until it can acquire exclusive access. */ - WaitForExclusive = 2 + WaitForExclusive = 2, + + /** + * Acquire exclusive access for the producer. Any existing producer will be removed and + * invalidated immediately. + */ + ExclusiveWithFencing = 3 }; ProducerConfiguration(); diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc index 7c997713..103311c0 100644 --- a/tests/ProducerTest.cc +++ b/tests/ProducerTest.cc @@ -334,6 +334,53 @@ TEST(ProducerTest, testWaitForExclusiveProducer) { producer2.close(); } +TEST(ProducerTest, testExclusiveWithFencingProducer) { + Client client(serviceUrl); + + std::string topicName = + "persistent://public/default/testExclusiveWithFencingProducer" + std::to_string(time(nullptr)); + + Producer producer1; + ProducerConfiguration producerConfiguration1; + producerConfiguration1.setProducerName("p-name-1"); + producerConfiguration1.setAccessMode(ProducerConfiguration::Exclusive); + + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration1, producer1)); + producer1.send(MessageBuilder().setContent("content").build()); + + Producer producer2; + ProducerConfiguration producerConfiguration2; + producerConfiguration2.setProducerName("p-name-2"); + producerConfiguration2.setAccessMode(ProducerConfiguration::WaitForExclusive); + + Latch latch(1); + client.createProducerAsync(topicName, producerConfiguration2, + [&latch, &producer2](Result res, Producer producer) { + // producer 2 fenced + ASSERT_EQ(ResultProducerFenced, res); + latch.countdown(); + producer2 = producer; + }); + + Producer producer3; + ProducerConfiguration producerConfiguration3; + producerConfiguration3.setProducerName("p-name-3"); + producerConfiguration3.setAccessMode(ProducerConfiguration::ExclusiveWithFencing); + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration3, producer3)); + ASSERT_EQ(ResultOk, producer3.send(MessageBuilder().setContent("content").build())); + + latch.wait(); + + // producer 1 fenced + ASSERT_EQ(ResultProducerFenced, producer1.send(MessageBuilder().setContent("content").build())); + + ASSERT_EQ(ResultProducerNotInitialized, producer2.send(MessageBuilder().setContent("content").build())); + + producer1.close(); + producer2.close(); + producer3.close(); +} + TEST_P(ProducerTest, testFlushNoBatch) { Client client(serviceUrl);