diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 048aecd2ddd74..296ef49759f74 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -684,7 +684,11 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "Rate limit the amount of writes per second generated by consumer acking the messages" ) private double managedLedgerDefaultMarkDeleteRateLimit = 1.0; - + @FieldContext( + category = CATEGORY_STORAGE_ML, + doc = "Allow automated creation of non-partition topics if set to true (default value)." + ) + private boolean allowAutoTopicCreation = true; @FieldContext( category = CATEGORY_STORAGE_ML, doc = "Number of threads to be used for managed ledger tasks dispatching" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 417ec215577a4..0dccc8d698362 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -49,6 +49,7 @@ import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.LedgerOffloaderFactory; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.bookkeeper.mledger.offload.OffloaderUtils; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index e6df76df6047a..be5badb19b1d7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -35,6 +35,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; @@ -67,6 +68,7 @@ import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.admin.ZkAdminPaths; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; @@ -1296,7 +1298,13 @@ private Topic getTopicReference(TopicName topicName) { TopicName partitionTopicName = TopicName.get(topicName.getPartitionedTopicName()); PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(partitionTopicName, false); if (partitionedTopicMetadata == null || partitionedTopicMetadata.partitions == 0) { - return new RestException(Status.NOT_FOUND, "Partitioned Topic not found"); + final String errSrc; + if (partitionedTopicMetadata != null) { + errSrc = " has zero partitions"; + } else { + errSrc = " has no metadata"; + } + return new RestException(Status.NOT_FOUND, "Partitioned Topic not found: " + topicName.toString() + errSrc); } else if (!internalGetList().contains(topicName.toString())) { return new RestException(Status.NOT_FOUND, "Topic partitions were not yet created"); } @@ -1306,7 +1314,7 @@ private Topic getTopicReference(TopicName topicName) { } private Topic getOrCreateTopic(TopicName topicName) { - return pulsar().getBrokerService().getOrCreateTopic(topicName.toString()).join(); + return pulsar().getBrokerService().getTopic(topicName.toString(), true).thenApply(Optional::get).join(); } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index ff2a8dcce7954..efb84c127b904 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -462,11 +462,11 @@ public CompletableFuture> getTopicIfExists(final String topic) { return getTopic(topic, false /* createIfMissing */); } - public CompletableFuture getOrCreateTopic(final String topic) { - return getTopic(topic, true /* createIfMissing */).thenApply(Optional::get); + public CompletableFuture getOrCreateTopic(final String topic) { + return getTopic(topic, pulsar.getConfiguration().isAllowAutoTopicCreation()).thenApply(Optional::get); } - private CompletableFuture> getTopic(final String topic, boolean createIfMissing) { + public CompletableFuture> getTopic(final String topic, boolean createIfMissing) { try { CompletableFuture> topicFuture = topics.get(topic); if (topicFuture != null) { @@ -618,7 +618,8 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S return topicFuture; } - private void createPersistentTopic(final String topic, boolean createIfMissing, CompletableFuture> topicFuture) { + private void createPersistentTopic(final String topic, boolean createIfMissing, + CompletableFuture> topicFuture) { final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); TopicName topicName = TopicName.get(topic); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 6dfc5b0d211a7..2d23f50a3932c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -23,6 +23,7 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationDataHttps; import org.apache.pulsar.broker.web.PulsarWebResource; +import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.policies.data.ClusterData; @@ -98,7 +99,7 @@ public void testGetSubscriptions() { try { persistentTopics.getSubscriptions(testTenant, testNamespace, testLocalTopicName + "-partition-0", true); } catch (Exception e) { - Assert.assertEquals("Partitioned Topic not found", e.getMessage()); + Assert.assertEquals("Partitioned Topic not found: persistent://my-tenant/my-namespace/topic-not-found-partition-0 has zero partitions", e.getMessage()); } persistentTopics.createPartitionedTopic(testTenant, testNamespace, testLocalTopicName, 3, true); try { @@ -115,4 +116,15 @@ public void testGetSubscriptions() { persistentTopics.deletePartitionedTopic(testTenant, testNamespace, testLocalTopicName, true, true); } + @Test + public void testGetSubscriptionsWithAutoTopicCreationDisabled() { + pulsar.getConfiguration().setAllowAutoTopicCreation(false); + final String nonPartitionTopic = "non-partitioned-topic"; + persistentTopics.createSubscription(testTenant, testNamespace, nonPartitionTopic, "test", true, (MessageIdImpl) MessageId.latest); + try { + persistentTopics.getSubscriptions(testTenant, testNamespace, nonPartitionTopic + "-partition-0", true); + } catch (RestException exc) { + Assert.assertTrue(exc.getMessage().contains("zero partitions")); + } + } }