From f5e1350e03284cb0bf0f17d96b9e6fdb2d6fc9df Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Sat, 26 Jan 2019 17:06:58 -0800 Subject: [PATCH 01/28] Adding configuration to disable auto-topic creation --- .../org/apache/pulsar/client/impl/PulsarClientImpl.java | 9 +++++++-- .../pulsar/client/impl/conf/ClientConfigurationData.java | 2 ++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 42189d886509e..b041d38538b86 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -79,6 +79,7 @@ public class PulsarClientImpl implements PulsarClient { private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class); + private static final String errMessage = "Creation of new non-partition topic not allowed because of client configuration."; private final ClientConfigurationData conf; private LookupService lookup; @@ -226,8 +227,10 @@ private CompletableFuture> createProducerAsync(String topic, if (metadata.partitions > 1) { producer = new PartitionedProducerImpl<>(PulsarClientImpl.this, topic, conf, metadata.partitions, producerCreatedFuture, schema, interceptors); - } else { + } else if (getConfiguration().getAllowAutoTopicCreation()) { producer = new ProducerImpl<>(PulsarClientImpl.this, topic, conf, producerCreatedFuture, -1, schema, interceptors); + } else { + throw new PulsarClientException.InvalidConfigurationException(errMessage); } synchronized (producers) { @@ -331,9 +334,11 @@ private CompletableFuture> doSingleTopicSubscribeAsync(ConsumerC if (metadata.partitions > 1) { consumer = MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf, listenerThread, consumerSubscribedFuture, metadata.partitions, schema, interceptors); - } else { + } else if (getConfiguration().getAllowAutoTopicCreation()) { consumer = new ConsumerImpl<>(PulsarClientImpl.this, topic, conf, listenerThread, -1, consumerSubscribedFuture, schema, interceptors); + } else { + throw new PulsarClientException.InvalidConfigurationException(errMessage); } synchronized (consumers) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java index f7bec63c5a800..505e2ebd85f9b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java @@ -59,6 +59,8 @@ public class ClientConfigurationData implements Serializable, Cloneable { private int maxNumberOfRejectedRequestPerConnection = 50; private int keepAliveIntervalSeconds = 30; private int connectionTimeoutMs = 10000; + private boolean allowAutoTopicCreation = true; + public ClientConfigurationData clone() { try { From 81f5e8a3b3c5655dd28f8e32f1500aa9c4620e8a Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Sun, 27 Jan 2019 20:17:55 -0800 Subject: [PATCH 02/28] Adding config --- .../apache/bookkeeper/mledger/ManagedLedgerConfig.java | 10 ++++++++++ .../apache/pulsar/client/impl/PulsarClientImpl.java | 9 ++------- .../client/impl/conf/ClientConfigurationData.java | 2 -- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 8f890505167fe..e67c370c6d034 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -36,6 +36,7 @@ public class ManagedLedgerConfig { private boolean createIfMissing = true; + private boolean allowAutoTopicCreation = true; private int maxUnackedRangesToPersist = 10000; private int maxUnackedRangesToPersistInZk = 1000; private int maxEntriesPerLedger = 50000; @@ -73,6 +74,15 @@ public ManagedLedgerConfig setCreateIfMissing(boolean createIfMissing) { return this; } + public boolean allowAutoTopicCreation() { + return allowAutoTopicCreation; + } + + public ManagedLedgerConfig setAllowAutoTopicCreation(boolean allowAutoTopicCreation) { + this.allowAutoTopicCreation = allowAutoTopicCreation; + return this; + } + /** * @return the maxEntriesPerLedger */ diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index b041d38538b86..42189d886509e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -79,7 +79,6 @@ public class PulsarClientImpl implements PulsarClient { private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class); - private static final String errMessage = "Creation of new non-partition topic not allowed because of client configuration."; private final ClientConfigurationData conf; private LookupService lookup; @@ -227,10 +226,8 @@ private CompletableFuture> createProducerAsync(String topic, if (metadata.partitions > 1) { producer = new PartitionedProducerImpl<>(PulsarClientImpl.this, topic, conf, metadata.partitions, producerCreatedFuture, schema, interceptors); - } else if (getConfiguration().getAllowAutoTopicCreation()) { - producer = new ProducerImpl<>(PulsarClientImpl.this, topic, conf, producerCreatedFuture, -1, schema, interceptors); } else { - throw new PulsarClientException.InvalidConfigurationException(errMessage); + producer = new ProducerImpl<>(PulsarClientImpl.this, topic, conf, producerCreatedFuture, -1, schema, interceptors); } synchronized (producers) { @@ -334,11 +331,9 @@ private CompletableFuture> doSingleTopicSubscribeAsync(ConsumerC if (metadata.partitions > 1) { consumer = MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf, listenerThread, consumerSubscribedFuture, metadata.partitions, schema, interceptors); - } else if (getConfiguration().getAllowAutoTopicCreation()) { + } else { consumer = new ConsumerImpl<>(PulsarClientImpl.this, topic, conf, listenerThread, -1, consumerSubscribedFuture, schema, interceptors); - } else { - throw new PulsarClientException.InvalidConfigurationException(errMessage); } synchronized (consumers) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java index 505e2ebd85f9b..f7bec63c5a800 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java @@ -59,8 +59,6 @@ public class ClientConfigurationData implements Serializable, Cloneable { private int maxNumberOfRejectedRequestPerConnection = 50; private int keepAliveIntervalSeconds = 30; private int connectionTimeoutMs = 10000; - private boolean allowAutoTopicCreation = true; - public ClientConfigurationData clone() { try { From ab4325c451934834344006270401bc2cd736d69b Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Mon, 28 Jan 2019 19:51:11 -0800 Subject: [PATCH 03/28] Starting to add backbone --- .../org/apache/pulsar/broker/service/BrokerService.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index ff2a8dcce7954..a6236689e5322 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -459,11 +459,11 @@ public void unloadNamespaceBundlesGracefully() { } public CompletableFuture> getTopicIfExists(final String topic) { - return getTopic(topic, false /* createIfMissing */); + return getTopic(topic, false /* allowAutoTopicCreation */); } - public CompletableFuture getOrCreateTopic(final String topic) { - return getTopic(topic, true /* createIfMissing */).thenApply(Optional::get); + public CompletableFuture getOrCreateTopic(final String topic, boolean allowAutoTopicCreation) { + return getTopic(topic, allowAutoTopicCreation).thenApply(Optional::get); } private CompletableFuture> getTopic(final String topic, boolean createIfMissing) { From 01ff186b47d734e8e897cedd45a3739bdf823307 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Mon, 28 Jan 2019 20:12:49 -0800 Subject: [PATCH 04/28] Adding work for completableFuture --- .../java/org/apache/pulsar/broker/PulsarService.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 417ec215577a4..b0409c2afdbe3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -49,6 +49,7 @@ import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.LedgerOffloaderFactory; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.bookkeeper.mledger.offload.OffloaderUtils; @@ -597,7 +598,7 @@ public void loadNamespaceTopics(NamespaceBundle bundle) { try { TopicName topicName = TopicName.get(topic); if (bundle.includes(topicName)) { - CompletableFuture future = brokerService.getOrCreateTopic(topic); + CompletableFuture future = getOrCreateTopic(topic, topicName); if (future != null) { persistentTopics.add(future); } @@ -618,7 +619,12 @@ public void loadNamespaceTopics(NamespaceBundle bundle) { return null; }); } - + + private CompletableFuture getOrCreateTopic(String topic, TopicName topicName) { + final CompletableFuture configFuture = brokerService.getManagedLedgerConfig(topicName); + return configFuture.thenApplyAsync(config -> brokerService.getOrCreateTopic(topic, config.allowAutoTopicCreation()).get()); + } + // No need to synchronize since config is only init once // We only read this from memory later public String getStatusFilePath() { From 80dd5f96d2c61ab66b6f276540caa229b22c6ea5 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Mon, 28 Jan 2019 20:23:58 -0800 Subject: [PATCH 05/28] Fixing issue --- .../main/java/org/apache/pulsar/broker/PulsarService.java | 6 ++++-- .../java/org/apache/pulsar/broker/service/ServerCnx.java | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index b0409c2afdbe3..e100beadbac29 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -598,7 +598,7 @@ public void loadNamespaceTopics(NamespaceBundle bundle) { try { TopicName topicName = TopicName.get(topic); if (bundle.includes(topicName)) { - CompletableFuture future = getOrCreateTopic(topic, topicName); + CompletableFuture future = getOrCreateTopic(brokerService, topic, topicName); if (future != null) { persistentTopics.add(future); } @@ -620,7 +620,9 @@ public void loadNamespaceTopics(NamespaceBundle bundle) { }); } - private CompletableFuture getOrCreateTopic(String topic, TopicName topicName) { + public static CompletableFuture getOrCreateTopic(BrokerService brokerService, + String topic, + TopicName topicName) { final CompletableFuture configFuture = brokerService.getManagedLedgerConfig(topicName); return configFuture.thenApplyAsync(config -> brokerService.getOrCreateTopic(topic, config.allowAutoTopicCreation()).get()); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index e6819bf57913e..52ffb4d6752e0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -587,7 +587,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { } } - service.getOrCreateTopic(topicName.toString()) + PulsarService.getOrCreateTopic(service, topicName.toString(), topicName) .thenCompose(topic -> { if (schema != null) { return topic.addSchemaIfIdleOrCheckCompatible(schema) @@ -797,7 +797,7 @@ protected void handleProducer(final CommandProducer cmdProducer) { log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId); - service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> { + PulsarService.getOrCreateTopic(service, topicName.toString(), topicName).thenAccept((Topic topic) -> { // Before creating producer, check if backlog quota exceeded // on topic if (topic.isBacklogQuotaExceeded(producerName)) { From 5dc69202b5d01247fd0a03198a963a44d6203135 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Mon, 28 Jan 2019 20:33:29 -0800 Subject: [PATCH 06/28] Adding detail for PersistentTopicBases --- .../org/apache/pulsar/broker/PulsarService.java | 13 +++++++++++-- .../broker/admin/impl/PersistentTopicsBase.java | 4 +++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index e100beadbac29..e0ac17ad9e668 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -620,11 +620,20 @@ public void loadNamespaceTopics(NamespaceBundle bundle) { }); } + public static CompletableFuture getOrCreateTopic(BrokerService brokerService, + String topic, + TopicName topicName) { + return getOrCreateTopic(brokerService, topic, topicName, true); + } + public static CompletableFuture getOrCreateTopic(BrokerService brokerService, String topic, - TopicName topicName) { + TopicName topicName, + boolean defaultConfig) { final CompletableFuture configFuture = brokerService.getManagedLedgerConfig(topicName); - return configFuture.thenApplyAsync(config -> brokerService.getOrCreateTopic(topic, config.allowAutoTopicCreation()).get()); + return configFuture.thenApplyAsync( + config -> brokerService.getOrCreateTopic(topic,defaultConfig ? config.allowAutoTopicCreation() + : config.isCreateIfMissing()).get()); } // No need to synchronize since config is only init once diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index e6df76df6047a..c9a862c5a3ec5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -67,6 +67,7 @@ import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.admin.ZkAdminPaths; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; @@ -1306,7 +1307,8 @@ private Topic getTopicReference(TopicName topicName) { } private Topic getOrCreateTopic(TopicName topicName) { - return pulsar().getBrokerService().getOrCreateTopic(topicName.toString()).join(); + final BrokerService service = pulsar().getBrokerService(); + return PulsarService.getOrCreateTopic(service, topicName.toString(), topicName, false).join(); } /** From d2eed9699ae30d371b09907363ea3d1393e1d6a1 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Tue, 29 Jan 2019 19:20:07 -0800 Subject: [PATCH 07/28] Simplifying call structure --- .../apache/pulsar/broker/PulsarService.java | 18 +----------------- .../admin/impl/PersistentTopicsBase.java | 3 +-- .../pulsar/broker/service/BrokerService.java | 11 +++++++++-- .../pulsar/broker/service/ServerCnx.java | 4 ++-- 4 files changed, 13 insertions(+), 23 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index e0ac17ad9e668..ab5f4fe97b6e0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -598,7 +598,7 @@ public void loadNamespaceTopics(NamespaceBundle bundle) { try { TopicName topicName = TopicName.get(topic); if (bundle.includes(topicName)) { - CompletableFuture future = getOrCreateTopic(brokerService, topic, topicName); + CompletableFuture future = brokerService.getOrCreateTopic(topic, topicName, false); if (future != null) { persistentTopics.add(future); } @@ -620,22 +620,6 @@ public void loadNamespaceTopics(NamespaceBundle bundle) { }); } - public static CompletableFuture getOrCreateTopic(BrokerService brokerService, - String topic, - TopicName topicName) { - return getOrCreateTopic(brokerService, topic, topicName, true); - } - - public static CompletableFuture getOrCreateTopic(BrokerService brokerService, - String topic, - TopicName topicName, - boolean defaultConfig) { - final CompletableFuture configFuture = brokerService.getManagedLedgerConfig(topicName); - return configFuture.thenApplyAsync( - config -> brokerService.getOrCreateTopic(topic,defaultConfig ? config.allowAutoTopicCreation() - : config.isCreateIfMissing()).get()); - } - // No need to synchronize since config is only init once // We only read this from memory later public String getStatusFilePath() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index c9a862c5a3ec5..f4557c2275081 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1307,8 +1307,7 @@ private Topic getTopicReference(TopicName topicName) { } private Topic getOrCreateTopic(TopicName topicName) { - final BrokerService service = pulsar().getBrokerService(); - return PulsarService.getOrCreateTopic(service, topicName.toString(), topicName, false).join(); + return pulsar().getBrokerService().getOrCreateTopic(topicName.toString()).join(); } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index a6236689e5322..3280c2e23f939 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -462,8 +462,15 @@ public CompletableFuture> getTopicIfExists(final String topic) { return getTopic(topic, false /* allowAutoTopicCreation */); } - public CompletableFuture getOrCreateTopic(final String topic, boolean allowAutoTopicCreation) { - return getTopic(topic, allowAutoTopicCreation).thenApply(Optional::get); + public CompletableFuture getOrCreateTopic(final String topic) { + return getOrCreateTopic(topic, TopicName.get(topic), false); + } + + public CompletableFuture getOrCreateTopic(String topic, TopicName topicName, boolean defaultConfig) { + final CompletableFuture configFuture = getManagedLedgerConfig(topicName); + return configFuture.thenApplyAsync( + config -> getTopic(topic, defaultConfig ? config.allowAutoTopicCreation() + : config.isCreateIfMissing()).thenApply(Optional::get).get()); } private CompletableFuture> getTopic(final String topic, boolean createIfMissing) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 52ffb4d6752e0..f63c46ceeae4f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -587,7 +587,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { } } - PulsarService.getOrCreateTopic(service, topicName.toString(), topicName) + service.getOrCreateTopic(topicName.toString(), topicName, true) .thenCompose(topic -> { if (schema != null) { return topic.addSchemaIfIdleOrCheckCompatible(schema) @@ -797,7 +797,7 @@ protected void handleProducer(final CommandProducer cmdProducer) { log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId); - PulsarService.getOrCreateTopic(service, topicName.toString(), topicName).thenAccept((Topic topic) -> { + service.getOrCreateTopic(topicName.toString(), topicName, true).thenAccept((Topic topic) -> { // Before creating producer, check if backlog quota exceeded // on topic if (topic.isBacklogQuotaExceeded(producerName)) { From d0094a41273c32d44c86de3f622c4918a2e585df Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Wed, 30 Jan 2019 19:48:10 -0800 Subject: [PATCH 08/28] Fixing compilation error --- .../pulsar/broker/service/BrokerService.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 3280c2e23f939..dcf39bf68a076 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -467,10 +467,16 @@ public CompletableFuture getOrCreateTopic(final String topic) { } public CompletableFuture getOrCreateTopic(String topic, TopicName topicName, boolean defaultConfig) { - final CompletableFuture configFuture = getManagedLedgerConfig(topicName); - return configFuture.thenApplyAsync( - config -> getTopic(topic, defaultConfig ? config.allowAutoTopicCreation() - : config.isCreateIfMissing()).thenApply(Optional::get).get()); + final CompletableFuture configFuture = getManagedLedgerConfig(topicName); + try { + configFuture.thenApplyAsync( + future -> getTopic(topic, defaultConfig ? future.allowAutoTopicCreation() + : true).thenApply(Optional::get) + .get()); + } catch (InterruptedException exc) { + log.warn("[{}] Unexpected exception when loading topic: {}", topic, exc); + return failedFuture(exc); + } } private CompletableFuture> getTopic(final String topic, boolean createIfMissing) { From 1c72664d6c6e823657973d4543e63c43790fae60 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Wed, 30 Jan 2019 20:27:47 -0800 Subject: [PATCH 09/28] Fixing compilation errror --- .../apache/pulsar/broker/PulsarService.java | 2 +- .../pulsar/broker/service/BrokerService.java | 37 ++++++++----------- .../pulsar/broker/service/ServerCnx.java | 4 +- .../broker/service/BrokerServiceTest.java | 2 +- 4 files changed, 19 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index ab5f4fe97b6e0..36b8dcc366ec7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -598,7 +598,7 @@ public void loadNamespaceTopics(NamespaceBundle bundle) { try { TopicName topicName = TopicName.get(topic); if (bundle.includes(topicName)) { - CompletableFuture future = brokerService.getOrCreateTopic(topic, topicName, false); + CompletableFuture future = brokerService.getOrCreateTopic(topic, false); if (future != null) { persistentTopics.add(future); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index dcf39bf68a076..c42de4141b6a0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -459,27 +459,18 @@ public void unloadNamespaceBundlesGracefully() { } public CompletableFuture> getTopicIfExists(final String topic) { - return getTopic(topic, false /* allowAutoTopicCreation */); + return getTopic(topic, false /* allowAutoTopicCreation */, false); } public CompletableFuture getOrCreateTopic(final String topic) { - return getOrCreateTopic(topic, TopicName.get(topic), false); + return getOrCreateTopic(topic, false); } - public CompletableFuture getOrCreateTopic(String topic, TopicName topicName, boolean defaultConfig) { - final CompletableFuture configFuture = getManagedLedgerConfig(topicName); - try { - configFuture.thenApplyAsync( - future -> getTopic(topic, defaultConfig ? future.allowAutoTopicCreation() - : true).thenApply(Optional::get) - .get()); - } catch (InterruptedException exc) { - log.warn("[{}] Unexpected exception when loading topic: {}", topic, exc); - return failedFuture(exc); - } - } - - private CompletableFuture> getTopic(final String topic, boolean createIfMissing) { + public CompletableFuture getOrCreateTopic(String topic, boolean defaultConfig) { + return getTopic(topic, true, defaultConfig).thenApply(Optional::get); + } + + private CompletableFuture> getTopic(final String topic, boolean createIfMissing, boolean defaultConfig) { try { CompletableFuture> topicFuture = topics.get(topic); if (topicFuture != null) { @@ -493,7 +484,7 @@ private CompletableFuture> getTopic(final String topic, boolean } final boolean isPersistentTopic = TopicName.get(topic).getDomain().equals(TopicDomain.persistent); return topics.computeIfAbsent(topic, (topicName) -> { - return isPersistentTopic ? this.loadOrCreatePersistentTopic(topicName, createIfMissing) + return isPersistentTopic ? this.loadOrCreatePersistentTopic(topicName, createIfMissing, defaultConfig) : createNonPersistentTopic(topicName); }); } catch (IllegalArgumentException e) { @@ -600,7 +591,7 @@ public PulsarClient getReplicationClient(String cluster) { * @throws RuntimeException */ protected CompletableFuture> loadOrCreatePersistentTopic(final String topic, - boolean createIfMissing) throws RuntimeException { + boolean createIfMissing, boolean useDefaultConfig) throws RuntimeException { checkTopicNsOwnership(topic); final CompletableFuture> topicFuture = new CompletableFuture<>(); @@ -615,7 +606,7 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); if (topicLoadSemaphore.tryAcquire()) { - createPersistentTopic(topic, createIfMissing, topicFuture); + createPersistentTopic(topic, createIfMissing, useDefaultConfig, topicFuture); topicFuture.handle((persistentTopic, ex) -> { // release permit and process pending topic topicLoadSemaphore.release(); @@ -631,7 +622,8 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S return topicFuture; } - private void createPersistentTopic(final String topic, boolean createIfMissing, CompletableFuture> topicFuture) { + private void createPersistentTopic(final String topic, boolean createIfMissing, boolean useDefaultConfig, + CompletableFuture> topicFuture) { final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); TopicName topicName = TopicName.get(topic); @@ -645,7 +637,8 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, } getManagedLedgerConfig(topicName).thenAccept(managedLedgerConfig -> { - managedLedgerConfig.setCreateIfMissing(createIfMissing); + managedLedgerConfig.setCreateIfMissing(useDefaultConfig ? + managedLedgerConfig.allowAutoTopicCreation() : createIfMissing); // Once we have the configuration, we can proceed with the async open operation managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(), managedLedgerConfig, @@ -1397,7 +1390,7 @@ private void createPendingLoadTopic() { CompletableFuture> pendingFuture = pendingTopic.getRight(); final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); final boolean acquiredPermit = topicLoadSemaphore.tryAcquire(); - createPersistentTopic(topic, true, pendingFuture); + createPersistentTopic(topic, true, false, pendingFuture); pendingFuture.handle((persistentTopic, ex) -> { // release permit and process next pending topic if (acquiredPermit) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index f63c46ceeae4f..9d6bc987687ee 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -587,7 +587,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { } } - service.getOrCreateTopic(topicName.toString(), topicName, true) + service.getOrCreateTopic(topicName.toString(), true) .thenCompose(topic -> { if (schema != null) { return topic.addSchemaIfIdleOrCheckCompatible(schema) @@ -797,7 +797,7 @@ protected void handleProducer(final CommandProducer cmdProducer) { log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId); - service.getOrCreateTopic(topicName.toString(), topicName, true).thenAccept((Topic topic) -> { + service.getOrCreateTopic(topicName.toString(), true).thenAccept((Topic topic) -> { // Before creating producer, check if backlog quota exceeded // on topic if (topic.isBacklogQuotaExceeded(producerName)) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 4e9f4e986f33b..648994369a020 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -742,7 +742,7 @@ public void testTopicLoadingOnDisableNamespaceBundle() throws Exception { // try to create topic which should fail as bundle is disable CompletableFuture> futureResult = pulsar.getBrokerService() - .loadOrCreatePersistentTopic(topicName, true); + .loadOrCreatePersistentTopic(topicName, true, false); try { futureResult.get(); From 36abfed7e3a48c5f0f689b47cd68ccbd286153b3 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Fri, 1 Feb 2019 17:23:55 -0800 Subject: [PATCH 10/28] Modifying config location --- .../apache/bookkeeper/mledger/ManagedLedgerConfig.java | 10 ---------- .../org/apache/pulsar/broker/ServiceConfiguration.java | 7 +++++++ .../apache/pulsar/broker/service/BrokerService.java | 3 ++- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index e67c370c6d034..28a52c52c61d4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -36,7 +36,6 @@ public class ManagedLedgerConfig { private boolean createIfMissing = true; - private boolean allowAutoTopicCreation = true; private int maxUnackedRangesToPersist = 10000; private int maxUnackedRangesToPersistInZk = 1000; private int maxEntriesPerLedger = 50000; @@ -73,15 +72,6 @@ public ManagedLedgerConfig setCreateIfMissing(boolean createIfMissing) { this.createIfMissing = createIfMissing; return this; } - - public boolean allowAutoTopicCreation() { - return allowAutoTopicCreation; - } - - public ManagedLedgerConfig setAllowAutoTopicCreation(boolean allowAutoTopicCreation) { - this.allowAutoTopicCreation = allowAutoTopicCreation; - return this; - } /** * @return the maxEntriesPerLedger diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 048aecd2ddd74..f64e0609e03cd 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -246,6 +246,13 @@ public class ServiceConfiguration implements PulsarConfiguration { + " messages that were already stored in the topic" ) private boolean brokerDeduplicationEnabled = false; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "Allow automated creation of non-partition topics if set to true (default value)." + ) + private boolean allowAutoTopicCreation = true; + @FieldContext( category = CATEGORY_POLICIES, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index c42de4141b6a0..ca704983a627d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -638,7 +638,8 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, getManagedLedgerConfig(topicName).thenAccept(managedLedgerConfig -> { managedLedgerConfig.setCreateIfMissing(useDefaultConfig ? - managedLedgerConfig.allowAutoTopicCreation() : createIfMissing); + pulsar.getConfiguration().getAllowAutoTopicCreation() + : createIfMissing); // Once we have the configuration, we can proceed with the async open operation managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(), managedLedgerConfig, From 830280430f5088bedc78163f1bfc1ca0efca25fb Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Sun, 3 Feb 2019 13:37:55 -0800 Subject: [PATCH 11/28] Fixing config error --- .../apache/pulsar/broker/ServiceConfiguration.java | 13 +++++-------- .../apache/pulsar/broker/service/BrokerService.java | 2 +- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index f64e0609e03cd..296ef49759f74 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -246,13 +246,6 @@ public class ServiceConfiguration implements PulsarConfiguration { + " messages that were already stored in the topic" ) private boolean brokerDeduplicationEnabled = false; - - @FieldContext( - category = CATEGORY_POLICIES, - doc = "Allow automated creation of non-partition topics if set to true (default value)." - ) - private boolean allowAutoTopicCreation = true; - @FieldContext( category = CATEGORY_POLICIES, @@ -691,7 +684,11 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "Rate limit the amount of writes per second generated by consumer acking the messages" ) private double managedLedgerDefaultMarkDeleteRateLimit = 1.0; - + @FieldContext( + category = CATEGORY_STORAGE_ML, + doc = "Allow automated creation of non-partition topics if set to true (default value)." + ) + private boolean allowAutoTopicCreation = true; @FieldContext( category = CATEGORY_STORAGE_ML, doc = "Number of threads to be used for managed ledger tasks dispatching" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index ca704983a627d..477c64d055896 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -638,7 +638,7 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, getManagedLedgerConfig(topicName).thenAccept(managedLedgerConfig -> { managedLedgerConfig.setCreateIfMissing(useDefaultConfig ? - pulsar.getConfiguration().getAllowAutoTopicCreation() + pulsar.getConfiguration().isAllowAutoTopicCreation() : createIfMissing); // Once we have the configuration, we can proceed with the async open operation From b493cf884b9f38e5dd5ac41368553c00a3cdb4dc Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Tue, 5 Feb 2019 20:22:35 -0800 Subject: [PATCH 12/28] Adding test for PersistentTopicsBAse --- .../mledger/ManagedLedgerConfig.java | 2 +- .../apache/pulsar/broker/PulsarService.java | 2 +- .../broker/admin/PersistentTopicsTest.java | 21 +++++++++++++++++-- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 28a52c52c61d4..8f890505167fe 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -72,7 +72,7 @@ public ManagedLedgerConfig setCreateIfMissing(boolean createIfMissing) { this.createIfMissing = createIfMissing; return this; } - + /** * @return the maxEntriesPerLedger */ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 36b8dcc366ec7..40e55afe96b6a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -619,7 +619,7 @@ public void loadNamespaceTopics(NamespaceBundle bundle) { return null; }); } - + // No need to synchronize since config is only init once // We only read this from memory later public String getStatusFilePath() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 6dfc5b0d211a7..2710b37b4e813 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -61,10 +61,12 @@ public void initPersistentTopics() throws Exception { @Override @BeforeMethod protected void setup() throws Exception { + pulsar.getConfiguration().setAllowAutoTopicCreation(false); super.internalSetup(); persistentTopics = spy(new PersistentTopics()); persistentTopics.setServletContext(new MockServletContext()); persistentTopics.setPulsar(pulsar); + pulsar.getConfiguration(); doReturn(mockZookKeeper).when(persistentTopics).globalZk(); doReturn(mockZookKeeper).when(persistentTopics).localZk(); doReturn(pulsar.getConfigurationCache().propertiesCache()).when(persistentTopics).tenantsCache(); @@ -80,7 +82,7 @@ protected void setup() throws Exception { new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet(testLocalCluster, "test"))); admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet(testLocalCluster, "test")); } - + @Override @AfterMethod protected void cleanup() throws Exception { @@ -88,7 +90,12 @@ protected void cleanup() throws Exception { } @Test - public void testGetSubscriptions() { + public void testGetSubscriptions() throws Exception { + try { + setup(); + } catch (Exception exc) { + throw exc; + } String testLocalTopicName = "topic-not-found"; try { persistentTopics.getSubscriptions(testTenant, testNamespace, testLocalTopicName, true); @@ -115,4 +122,14 @@ public void testGetSubscriptions() { persistentTopics.deletePartitionedTopic(testTenant, testNamespace, testLocalTopicName, true, true); } + @Test + public void testGetSubscriptionsWithAutoTopicCreationDisabled() { + final String nonPartitionTopic = "nonPartitionedTopic"; + persistentTopics.createSubscription(testTenant, testNamespace, nonPartitionTopic, "test", true, (MessageIdImpl) MessageId.latest); + List subscriptions = persistentTopics.getSubscriptions(testTenant, testNamespace, nonPartitionTopic + "-partition-0", true); + Assert.assertTrue(subscriptions.contains("test")); + persistentTopics.deleteSubscription(testTenant, testNamespace, nonPartitionTopic, "test", true); + subscriptions = persistentTopics.getSubscriptions(testTenant, testNamespace, nonPartitionTopic + "-partition-0", true); + Assert.assertTrue(subscriptions.isEmpty()); + } } From 52faeaa11ad13b19c6fc30737b373d76c6bd772c Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Tue, 5 Feb 2019 20:23:58 -0800 Subject: [PATCH 13/28] Fixing extra lines --- .../apache/pulsar/broker/admin/PersistentTopicsTest.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 2710b37b4e813..75bb8f91af933 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -66,7 +66,6 @@ protected void setup() throws Exception { persistentTopics = spy(new PersistentTopics()); persistentTopics.setServletContext(new MockServletContext()); persistentTopics.setPulsar(pulsar); - pulsar.getConfiguration(); doReturn(mockZookKeeper).when(persistentTopics).globalZk(); doReturn(mockZookKeeper).when(persistentTopics).localZk(); doReturn(pulsar.getConfigurationCache().propertiesCache()).when(persistentTopics).tenantsCache(); @@ -91,11 +90,6 @@ protected void cleanup() throws Exception { @Test public void testGetSubscriptions() throws Exception { - try { - setup(); - } catch (Exception exc) { - throw exc; - } String testLocalTopicName = "topic-not-found"; try { persistentTopics.getSubscriptions(testTenant, testNamespace, testLocalTopicName, true); From 5aec8ddaf48781fbf4371a6c1597f311e2508eeb Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Tue, 5 Feb 2019 21:00:47 -0800 Subject: [PATCH 14/28] Reversing unneccesary test --- .../pulsar/broker/admin/PersistentTopicsTest.java | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 75bb8f91af933..6dfc5b0d211a7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -61,7 +61,6 @@ public void initPersistentTopics() throws Exception { @Override @BeforeMethod protected void setup() throws Exception { - pulsar.getConfiguration().setAllowAutoTopicCreation(false); super.internalSetup(); persistentTopics = spy(new PersistentTopics()); persistentTopics.setServletContext(new MockServletContext()); @@ -81,7 +80,7 @@ protected void setup() throws Exception { new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet(testLocalCluster, "test"))); admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet(testLocalCluster, "test")); } - + @Override @AfterMethod protected void cleanup() throws Exception { @@ -89,7 +88,7 @@ protected void cleanup() throws Exception { } @Test - public void testGetSubscriptions() throws Exception { + public void testGetSubscriptions() { String testLocalTopicName = "topic-not-found"; try { persistentTopics.getSubscriptions(testTenant, testNamespace, testLocalTopicName, true); @@ -116,14 +115,4 @@ public void testGetSubscriptions() throws Exception { persistentTopics.deletePartitionedTopic(testTenant, testNamespace, testLocalTopicName, true, true); } - @Test - public void testGetSubscriptionsWithAutoTopicCreationDisabled() { - final String nonPartitionTopic = "nonPartitionedTopic"; - persistentTopics.createSubscription(testTenant, testNamespace, nonPartitionTopic, "test", true, (MessageIdImpl) MessageId.latest); - List subscriptions = persistentTopics.getSubscriptions(testTenant, testNamespace, nonPartitionTopic + "-partition-0", true); - Assert.assertTrue(subscriptions.contains("test")); - persistentTopics.deleteSubscription(testTenant, testNamespace, nonPartitionTopic, "test", true); - subscriptions = persistentTopics.getSubscriptions(testTenant, testNamespace, nonPartitionTopic + "-partition-0", true); - Assert.assertTrue(subscriptions.isEmpty()); - } } From ed971065476fc81fe9de6d1d6ea327f1f9760c17 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Wed, 6 Feb 2019 18:38:16 -0800 Subject: [PATCH 15/28] Tentative test addition --- .../org/apache/pulsar/broker/admin/AdminApiTest.java | 2 ++ .../pulsar/broker/admin/PersistentTopicsTest.java | 11 +++++++++++ 2 files changed, 13 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 837db22100ffd..6d52a3d4b3a65 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -432,6 +432,7 @@ public void brokers() throws Exception { public void testUpdateDynamicConfigurationWithZkWatch() throws Exception { final int initValue = 30000; pulsar.getConfiguration().setBrokerShutdownTimeoutMs(initValue); + pulsar.getConfiguration().setAllowAutoTopicCreation(false); // (1) try to update dynamic field final long shutdownTime = 10; // update configuration @@ -524,6 +525,7 @@ public void testUpdateDynamicLocalConfiguration() throws Exception { final long initValue = 30000; final long shutdownTime = 10; pulsar.getConfiguration().setBrokerShutdownTimeoutMs(initValue); + pulsar.getConfiguration().setAllowAutoTopicCreation(false); // update configuration admin.brokers().updateDynamicConfiguration("brokerShutdownTimeoutMs", Long.toString(shutdownTime)); // sleep incrementally as zk-watch notification is async and may take some time diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 6dfc5b0d211a7..c234ffdb822b4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -115,4 +115,15 @@ public void testGetSubscriptions() { persistentTopics.deletePartitionedTopic(testTenant, testNamespace, testLocalTopicName, true, true); } + @Test + public void testGetSubscriptionsWithAutoTopicCreationDisabled() { + pulsar.getConfiguration().setAllowAutoTopicCreation(false); + final String nonPartitionTopic = "non-partitioned-topic"; + persistentTopics.createSubscription(testTenant, testNamespace, nonPartitionTopic, "test", true, (MessageIdImpl) MessageId.latest); + List subscriptions = persistentTopics.getSubscriptions(testTenant, testNamespace, nonPartitionTopic + "-partition-0", true); + Assert.assertTrue(subscriptions.contains("test")); + persistentTopics.deleteSubscription(testTenant, testNamespace, nonPartitionTopic, "test", true); + subscriptions = persistentTopics.getSubscriptions(testTenant, testNamespace, nonPartitionTopic + "-partition-0", true); + Assert.assertTrue(subscriptions.isEmpty()); + } } From 4e5705aab9d286252c8c7e2d09e96e5af35ff683 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Thu, 7 Feb 2019 09:22:07 -0800 Subject: [PATCH 16/28] Simplifying call structure --- .../apache/pulsar/broker/PulsarService.java | 2 +- .../pulsar/broker/service/BrokerService.java | 22 +++++++++---------- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 40e55afe96b6a..a73df948cf7b6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -598,7 +598,7 @@ public void loadNamespaceTopics(NamespaceBundle bundle) { try { TopicName topicName = TopicName.get(topic); if (bundle.includes(topicName)) { - CompletableFuture future = brokerService.getOrCreateTopic(topic, false); + CompletableFuture future = brokerService.getOrCreateTopic(topic, true); if (future != null) { persistentTopics.add(future); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 477c64d055896..9cbea19c10ba1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -459,18 +459,18 @@ public void unloadNamespaceBundlesGracefully() { } public CompletableFuture> getTopicIfExists(final String topic) { - return getTopic(topic, false /* allowAutoTopicCreation */, false); + return getTopic(topic, false /* use allowAutoTopicCreation */); } public CompletableFuture getOrCreateTopic(final String topic) { - return getOrCreateTopic(topic, false); + return getOrCreateTopic(topic, false /* use allowAutoTopicCreation */); } public CompletableFuture getOrCreateTopic(String topic, boolean defaultConfig) { - return getTopic(topic, true, defaultConfig).thenApply(Optional::get); + return getTopic(topic, defaultConfig ? pulsar.getConfiguration().isAllowAutoTopicCreation() : true).thenApply(Optional::get); } - private CompletableFuture> getTopic(final String topic, boolean createIfMissing, boolean defaultConfig) { + private CompletableFuture> getTopic(final String topic, boolean createIfMissing) { try { CompletableFuture> topicFuture = topics.get(topic); if (topicFuture != null) { @@ -484,7 +484,7 @@ private CompletableFuture> getTopic(final String topic, boolean } final boolean isPersistentTopic = TopicName.get(topic).getDomain().equals(TopicDomain.persistent); return topics.computeIfAbsent(topic, (topicName) -> { - return isPersistentTopic ? this.loadOrCreatePersistentTopic(topicName, createIfMissing, defaultConfig) + return isPersistentTopic ? this.loadOrCreatePersistentTopic(topicName, createIfMissing) : createNonPersistentTopic(topicName); }); } catch (IllegalArgumentException e) { @@ -591,7 +591,7 @@ public PulsarClient getReplicationClient(String cluster) { * @throws RuntimeException */ protected CompletableFuture> loadOrCreatePersistentTopic(final String topic, - boolean createIfMissing, boolean useDefaultConfig) throws RuntimeException { + boolean createIfMissing) throws RuntimeException { checkTopicNsOwnership(topic); final CompletableFuture> topicFuture = new CompletableFuture<>(); @@ -606,7 +606,7 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); if (topicLoadSemaphore.tryAcquire()) { - createPersistentTopic(topic, createIfMissing, useDefaultConfig, topicFuture); + createPersistentTopic(topic, createIfMissing, topicFuture); topicFuture.handle((persistentTopic, ex) -> { // release permit and process pending topic topicLoadSemaphore.release(); @@ -622,7 +622,7 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S return topicFuture; } - private void createPersistentTopic(final String topic, boolean createIfMissing, boolean useDefaultConfig, + private void createPersistentTopic(final String topic, boolean createIfMissing, CompletableFuture> topicFuture) { final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); @@ -637,9 +637,7 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, } getManagedLedgerConfig(topicName).thenAccept(managedLedgerConfig -> { - managedLedgerConfig.setCreateIfMissing(useDefaultConfig ? - pulsar.getConfiguration().isAllowAutoTopicCreation() - : createIfMissing); + managedLedgerConfig.setCreateIfMissing(createIfMissing); // Once we have the configuration, we can proceed with the async open operation managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(), managedLedgerConfig, @@ -1391,7 +1389,7 @@ private void createPendingLoadTopic() { CompletableFuture> pendingFuture = pendingTopic.getRight(); final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); final boolean acquiredPermit = topicLoadSemaphore.tryAcquire(); - createPersistentTopic(topic, true, false, pendingFuture); + createPersistentTopic(topic, false, pendingFuture); pendingFuture.handle((persistentTopic, ex) -> { // release permit and process next pending topic if (acquiredPermit) { From e071ef6cb4d99b1f7f3970c4e576574e803c4e3a Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Thu, 7 Feb 2019 09:32:39 -0800 Subject: [PATCH 17/28] method call --- .../org/apache/pulsar/broker/service/BrokerServiceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 648994369a020..4e9f4e986f33b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -742,7 +742,7 @@ public void testTopicLoadingOnDisableNamespaceBundle() throws Exception { // try to create topic which should fail as bundle is disable CompletableFuture> futureResult = pulsar.getBrokerService() - .loadOrCreatePersistentTopic(topicName, true, false); + .loadOrCreatePersistentTopic(topicName, true); try { futureResult.get(); From 42fd77ba452acafc7e094e518b802a44feb1b709 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Sat, 9 Feb 2019 11:29:23 -0800 Subject: [PATCH 18/28] Modifying test to except error --- .../broker/admin/impl/PersistentTopicsBase.java | 8 +++++++- .../pulsar/broker/admin/PersistentTopicsTest.java | 11 ++++++----- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index f4557c2275081..212a8222c3cc3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1297,7 +1297,13 @@ private Topic getTopicReference(TopicName topicName) { TopicName partitionTopicName = TopicName.get(topicName.getPartitionedTopicName()); PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(partitionTopicName, false); if (partitionedTopicMetadata == null || partitionedTopicMetadata.partitions == 0) { - return new RestException(Status.NOT_FOUND, "Partitioned Topic not found"); + final String errSrc; + if (partitionedTopicMetadata != null) { + errSrc = " has zero partitions"; + } else { + errSrc = " has no metadata"; + } + return new RestException(Status.NOT_FOUND, "Partitioned Topic not found: " + topicName.getLocalName() + errSrc); } else if (!internalGetList().contains(topicName.toString())) { return new RestException(Status.NOT_FOUND, "Topic partitions were not yet created"); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index c234ffdb822b4..2b3956b7a680d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -23,6 +23,7 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationDataHttps; import org.apache.pulsar.broker.web.PulsarWebResource; +import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.policies.data.ClusterData; @@ -120,10 +121,10 @@ public void testGetSubscriptionsWithAutoTopicCreationDisabled() { pulsar.getConfiguration().setAllowAutoTopicCreation(false); final String nonPartitionTopic = "non-partitioned-topic"; persistentTopics.createSubscription(testTenant, testNamespace, nonPartitionTopic, "test", true, (MessageIdImpl) MessageId.latest); - List subscriptions = persistentTopics.getSubscriptions(testTenant, testNamespace, nonPartitionTopic + "-partition-0", true); - Assert.assertTrue(subscriptions.contains("test")); - persistentTopics.deleteSubscription(testTenant, testNamespace, nonPartitionTopic, "test", true); - subscriptions = persistentTopics.getSubscriptions(testTenant, testNamespace, nonPartitionTopic + "-partition-0", true); - Assert.assertTrue(subscriptions.isEmpty()); + try { + persistentTopics.getSubscriptions(testTenant, testNamespace, nonPartitionTopic + "-partition-0", true); + } catch (RestException exc) { + Assert.assertTrue(exc.getMessage().contains("zero partitions")); + } } } From e355dd446028f00836403bb47b56435383a58f62 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Sat, 9 Feb 2019 11:33:33 -0800 Subject: [PATCH 19/28] Removing accessory lines --- .../java/org/apache/pulsar/broker/service/BrokerService.java | 2 +- .../test/java/org/apache/pulsar/broker/admin/AdminApiTest.java | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 9cbea19c10ba1..727f3655e75fa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1389,7 +1389,7 @@ private void createPendingLoadTopic() { CompletableFuture> pendingFuture = pendingTopic.getRight(); final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); final boolean acquiredPermit = topicLoadSemaphore.tryAcquire(); - createPersistentTopic(topic, false, pendingFuture); + createPersistentTopic(topic, true, pendingFuture); pendingFuture.handle((persistentTopic, ex) -> { // release permit and process next pending topic if (acquiredPermit) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 6d52a3d4b3a65..837db22100ffd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -432,7 +432,6 @@ public void brokers() throws Exception { public void testUpdateDynamicConfigurationWithZkWatch() throws Exception { final int initValue = 30000; pulsar.getConfiguration().setBrokerShutdownTimeoutMs(initValue); - pulsar.getConfiguration().setAllowAutoTopicCreation(false); // (1) try to update dynamic field final long shutdownTime = 10; // update configuration @@ -525,7 +524,6 @@ public void testUpdateDynamicLocalConfiguration() throws Exception { final long initValue = 30000; final long shutdownTime = 10; pulsar.getConfiguration().setBrokerShutdownTimeoutMs(initValue); - pulsar.getConfiguration().setAllowAutoTopicCreation(false); // update configuration admin.brokers().updateDynamicConfiguration("brokerShutdownTimeoutMs", Long.toString(shutdownTime)); // sleep incrementally as zk-watch notification is async and may take some time From b834a8327714a7a03882afdfddc1c5546ea0c2f2 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Sat, 9 Feb 2019 12:53:38 -0800 Subject: [PATCH 20/28] Fixing failing tests --- .../java/org/apache/pulsar/broker/service/BrokerService.java | 2 +- .../org/apache/pulsar/broker/admin/PersistentTopicsTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 727f3655e75fa..9cbea19c10ba1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1389,7 +1389,7 @@ private void createPendingLoadTopic() { CompletableFuture> pendingFuture = pendingTopic.getRight(); final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); final boolean acquiredPermit = topicLoadSemaphore.tryAcquire(); - createPersistentTopic(topic, true, pendingFuture); + createPersistentTopic(topic, false, pendingFuture); pendingFuture.handle((persistentTopic, ex) -> { // release permit and process next pending topic if (acquiredPermit) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 2b3956b7a680d..9ed112ec2d2e2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -99,7 +99,7 @@ public void testGetSubscriptions() { try { persistentTopics.getSubscriptions(testTenant, testNamespace, testLocalTopicName + "-partition-0", true); } catch (Exception e) { - Assert.assertEquals("Partitioned Topic not found", e.getMessage()); + Assert.assertEquals("Partitioned Topic not found: topic-not-found-partition-0 has zero partitions", e.getMessage()); } persistentTopics.createPartitionedTopic(testTenant, testNamespace, testLocalTopicName, 3, true); try { From 9afd12e39319f5f8e3faa76d4eff92ab13d5778d Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Sat, 9 Feb 2019 14:18:26 -0800 Subject: [PATCH 21/28] Temporarily add Ignore annotation --- .../apache/pulsar/client/impl/BrokerClientIntegrationTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java index feb5b9e4f38d4..53c22df99c177 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java @@ -90,6 +90,7 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; +import org.testng.annotations.Ignore; import org.testng.annotations.Test; import com.google.common.collect.Lists; @@ -571,6 +572,7 @@ public void testCloseConnectionOnBrokerRejectedRequest() throws Exception { * * @throws Exception */ + @Ignore @Test(timeOut = 5000) public void testMaxConcurrentTopicLoading() throws Exception { From c409248ef6219e64d63c4d6cc7fd88bd707413bd Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Sat, 9 Feb 2019 15:42:06 -0800 Subject: [PATCH 22/28] Removing ignore annotation --- .../apache/pulsar/client/impl/BrokerClientIntegrationTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java index 53c22df99c177..dce18c0a42a6d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java @@ -572,7 +572,6 @@ public void testCloseConnectionOnBrokerRejectedRequest() throws Exception { * * @throws Exception */ - @Ignore @Test(timeOut = 5000) public void testMaxConcurrentTopicLoading() throws Exception { From 145a7c78bdf37b6f61c6b77c48b687ae79e613c3 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Sun, 10 Feb 2019 13:35:24 -0800 Subject: [PATCH 23/28] Fixing curr test error --- .../bookkeeper/mledger/impl/MetaStoreImplZookeeper.java | 1 + .../org/apache/pulsar/broker/service/BrokerService.java | 7 +++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java index 4d90b913711e7..04f3ccf3b732a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java @@ -141,6 +141,7 @@ public void getManagedLedgerInfo(final String ledgerName, boolean createIfMissin } } else if (rc == Code.NONODE.intValue()) { // Z-node doesn't exist + log.warn("createIfMissing has value of {}", createIfMissing); if (createIfMissing) { log.info("Creating '{}{}'", prefix, ledgerName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 9cbea19c10ba1..7c0b3e1547f21 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -467,7 +467,9 @@ public CompletableFuture getOrCreateTopic(final String topic) { } public CompletableFuture getOrCreateTopic(String topic, boolean defaultConfig) { - return getTopic(topic, defaultConfig ? pulsar.getConfiguration().isAllowAutoTopicCreation() : true).thenApply(Optional::get); + boolean createIfMissing = defaultConfig ? pulsar.getConfiguration().isAllowAutoTopicCreation() : true; + log.warn("The createIfMissing value had been set to {}", createIfMissing); + return getTopic(topic, createIfMissing).thenApply(Optional::get); } private CompletableFuture> getTopic(final String topic, boolean createIfMissing) { @@ -637,6 +639,7 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, } getManagedLedgerConfig(topicName).thenAccept(managedLedgerConfig -> { + log.warn("createIfMissing has been set to {}", createIfMissing); managedLedgerConfig.setCreateIfMissing(createIfMissing); // Once we have the configuration, we can proceed with the async open operation @@ -1389,7 +1392,7 @@ private void createPendingLoadTopic() { CompletableFuture> pendingFuture = pendingTopic.getRight(); final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); final boolean acquiredPermit = topicLoadSemaphore.tryAcquire(); - createPersistentTopic(topic, false, pendingFuture); + createPersistentTopic(topic, true, pendingFuture); pendingFuture.handle((persistentTopic, ex) -> { // release permit and process next pending topic if (acquiredPermit) { From bbd59cbf609380c7fce82e5f1f356539c411fd60 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Mon, 11 Feb 2019 18:24:04 -0800 Subject: [PATCH 24/28] Fixing nit comments --- .../bookkeeper/mledger/impl/MetaStoreImplZookeeper.java | 1 - .../org/apache/pulsar/broker/service/BrokerService.java | 7 +++---- .../pulsar/client/impl/BrokerClientIntegrationTest.java | 1 - 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java index 04f3ccf3b732a..4d90b913711e7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java @@ -141,7 +141,6 @@ public void getManagedLedgerInfo(final String ledgerName, boolean createIfMissin } } else if (rc == Code.NONODE.intValue()) { // Z-node doesn't exist - log.warn("createIfMissing has value of {}", createIfMissing); if (createIfMissing) { log.info("Creating '{}{}'", prefix, ledgerName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 7c0b3e1547f21..9d5c227d67746 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -463,12 +463,11 @@ public CompletableFuture> getTopicIfExists(final String topic) { } public CompletableFuture getOrCreateTopic(final String topic) { - return getOrCreateTopic(topic, false /* use allowAutoTopicCreation */); + return getOrCreateTopic(topic, true /* use allowAutoTopicCreation */); } - public CompletableFuture getOrCreateTopic(String topic, boolean defaultConfig) { - boolean createIfMissing = defaultConfig ? pulsar.getConfiguration().isAllowAutoTopicCreation() : true; - log.warn("The createIfMissing value had been set to {}", createIfMissing); + public CompletableFuture getOrCreateTopic(String topic, boolean useAllowAutoTopicCreationSetting) { + boolean createIfMissing = useAllowAutoTopicCreationSetting ? pulsar.getConfiguration().isAllowAutoTopicCreation() : true; return getTopic(topic, createIfMissing).thenApply(Optional::get); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java index dce18c0a42a6d..feb5b9e4f38d4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java @@ -90,7 +90,6 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; -import org.testng.annotations.Ignore; import org.testng.annotations.Test; import com.google.common.collect.Lists; From c795fabaeabb8277182c9388cbeea7dfd775d42a Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Mon, 11 Feb 2019 20:07:13 -0800 Subject: [PATCH 25/28] Fixing false value --- .../apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 212a8222c3cc3..9b0ce841fdf6d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1313,7 +1313,7 @@ private Topic getTopicReference(TopicName topicName) { } private Topic getOrCreateTopic(TopicName topicName) { - return pulsar().getBrokerService().getOrCreateTopic(topicName.toString()).join(); + return pulsar().getBrokerService().getOrCreateTopic(topicName.toString(), false).join(); } /** From 632c43ab66adfb9de20bcb068b5c2f62ed1c34a3 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Tue, 12 Feb 2019 18:35:57 -0800 Subject: [PATCH 26/28] Fixing silly comment --- .../apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 2 +- .../java/org/apache/pulsar/broker/service/BrokerService.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 9b0ce841fdf6d..95a21ade2d5b4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1303,7 +1303,7 @@ private Topic getTopicReference(TopicName topicName) { } else { errSrc = " has no metadata"; } - return new RestException(Status.NOT_FOUND, "Partitioned Topic not found: " + topicName.getLocalName() + errSrc); + return new RestException(Status.NOT_FOUND, "Partitioned Topic not found: " + topicName.toString() + errSrc); } else if (!internalGetList().contains(topicName.toString())) { return new RestException(Status.NOT_FOUND, "Topic partitions were not yet created"); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 9d5c227d67746..7c80c468437dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -459,7 +459,7 @@ public void unloadNamespaceBundlesGracefully() { } public CompletableFuture> getTopicIfExists(final String topic) { - return getTopic(topic, false /* use allowAutoTopicCreation */); + return getTopic(topic, false /* createIfMissing */); } public CompletableFuture getOrCreateTopic(final String topic) { From c6b9bfa6af5184f31a58eb577ed4816d60fdb799 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Tue, 12 Feb 2019 21:13:02 -0800 Subject: [PATCH 27/28] Removing excess comment --- .../java/org/apache/pulsar/broker/service/BrokerService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 7c80c468437dd..fd03733b8e4c7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -638,7 +638,6 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, } getManagedLedgerConfig(topicName).thenAccept(managedLedgerConfig -> { - log.warn("createIfMissing has been set to {}", createIfMissing); managedLedgerConfig.setCreateIfMissing(createIfMissing); // Once we have the configuration, we can proceed with the async open operation From b28e5c9650853e8939d20bf6004bc342db16f007 Mon Sep 17 00:00:00 2001 From: ConcurrencyPractitioner Date: Wed, 13 Feb 2019 17:48:18 -0800 Subject: [PATCH 28/28] Fixing tests and addressing comment --- .../java/org/apache/pulsar/broker/PulsarService.java | 2 +- .../broker/admin/impl/PersistentTopicsBase.java | 3 ++- .../apache/pulsar/broker/service/BrokerService.java | 11 +++-------- .../org/apache/pulsar/broker/service/ServerCnx.java | 4 ++-- .../pulsar/broker/admin/PersistentTopicsTest.java | 2 +- 5 files changed, 9 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index a73df948cf7b6..0dccc8d698362 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -598,7 +598,7 @@ public void loadNamespaceTopics(NamespaceBundle bundle) { try { TopicName topicName = TopicName.get(topic); if (bundle.includes(topicName)) { - CompletableFuture future = brokerService.getOrCreateTopic(topic, true); + CompletableFuture future = brokerService.getOrCreateTopic(topic); if (future != null) { persistentTopics.add(future); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 95a21ade2d5b4..be5badb19b1d7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -35,6 +35,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; @@ -1313,7 +1314,7 @@ private Topic getTopicReference(TopicName topicName) { } private Topic getOrCreateTopic(TopicName topicName) { - return pulsar().getBrokerService().getOrCreateTopic(topicName.toString(), false).join(); + return pulsar().getBrokerService().getTopic(topicName.toString(), true).thenApply(Optional::get).join(); } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index fd03733b8e4c7..efb84c127b904 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -462,16 +462,11 @@ public CompletableFuture> getTopicIfExists(final String topic) { return getTopic(topic, false /* createIfMissing */); } - public CompletableFuture getOrCreateTopic(final String topic) { - return getOrCreateTopic(topic, true /* use allowAutoTopicCreation */); - } - - public CompletableFuture getOrCreateTopic(String topic, boolean useAllowAutoTopicCreationSetting) { - boolean createIfMissing = useAllowAutoTopicCreationSetting ? pulsar.getConfiguration().isAllowAutoTopicCreation() : true; - return getTopic(topic, createIfMissing).thenApply(Optional::get); + public CompletableFuture getOrCreateTopic(final String topic) { + return getTopic(topic, pulsar.getConfiguration().isAllowAutoTopicCreation()).thenApply(Optional::get); } - private CompletableFuture> getTopic(final String topic, boolean createIfMissing) { + public CompletableFuture> getTopic(final String topic, boolean createIfMissing) { try { CompletableFuture> topicFuture = topics.get(topic); if (topicFuture != null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 9d6bc987687ee..e6819bf57913e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -587,7 +587,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { } } - service.getOrCreateTopic(topicName.toString(), true) + service.getOrCreateTopic(topicName.toString()) .thenCompose(topic -> { if (schema != null) { return topic.addSchemaIfIdleOrCheckCompatible(schema) @@ -797,7 +797,7 @@ protected void handleProducer(final CommandProducer cmdProducer) { log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId); - service.getOrCreateTopic(topicName.toString(), true).thenAccept((Topic topic) -> { + service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> { // Before creating producer, check if backlog quota exceeded // on topic if (topic.isBacklogQuotaExceeded(producerName)) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 9ed112ec2d2e2..2d23f50a3932c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -99,7 +99,7 @@ public void testGetSubscriptions() { try { persistentTopics.getSubscriptions(testTenant, testNamespace, testLocalTopicName + "-partition-0", true); } catch (Exception e) { - Assert.assertEquals("Partitioned Topic not found: topic-not-found-partition-0 has zero partitions", e.getMessage()); + Assert.assertEquals("Partitioned Topic not found: persistent://my-tenant/my-namespace/topic-not-found-partition-0 has zero partitions", e.getMessage()); } persistentTopics.createPartitionedTopic(testTenant, testNamespace, testLocalTopicName, 3, true); try {