Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f5e1350
Adding configuration to disable auto-topic creation
ConcurrencyPractitioner Jan 27, 2019
81f5e8a
Adding config
ConcurrencyPractitioner Jan 28, 2019
ab4325c
Starting to add backbone
ConcurrencyPractitioner Jan 29, 2019
01ff186
Adding work for completableFuture
ConcurrencyPractitioner Jan 29, 2019
80dd5f9
Fixing issue
ConcurrencyPractitioner Jan 29, 2019
5dc6920
Adding detail for PersistentTopicBases
ConcurrencyPractitioner Jan 29, 2019
d2eed96
Simplifying call structure
ConcurrencyPractitioner Jan 30, 2019
d0094a4
Fixing compilation error
ConcurrencyPractitioner Jan 31, 2019
1c72664
Fixing compilation errror
ConcurrencyPractitioner Jan 31, 2019
36abfed
Modifying config location
ConcurrencyPractitioner Feb 2, 2019
8302804
Fixing config error
ConcurrencyPractitioner Feb 3, 2019
b493cf8
Adding test for PersistentTopicsBAse
ConcurrencyPractitioner Feb 6, 2019
52faeaa
Fixing extra lines
ConcurrencyPractitioner Feb 6, 2019
5aec8dd
Reversing unneccesary test
ConcurrencyPractitioner Feb 6, 2019
ed97106
Tentative test addition
ConcurrencyPractitioner Feb 7, 2019
4e5705a
Simplifying call structure
ConcurrencyPractitioner Feb 7, 2019
e071ef6
method call
ConcurrencyPractitioner Feb 7, 2019
42fd77b
Modifying test to except error
ConcurrencyPractitioner Feb 9, 2019
e355dd4
Removing accessory lines
ConcurrencyPractitioner Feb 9, 2019
b834a83
Fixing failing tests
ConcurrencyPractitioner Feb 9, 2019
9afd12e
Temporarily add Ignore annotation
ConcurrencyPractitioner Feb 9, 2019
c409248
Removing ignore annotation
ConcurrencyPractitioner Feb 9, 2019
145a7c7
Fixing curr test error
ConcurrencyPractitioner Feb 10, 2019
bbd59cb
Fixing nit comments
ConcurrencyPractitioner Feb 12, 2019
c795fab
Fixing false value
ConcurrencyPractitioner Feb 12, 2019
632c43a
Fixing silly comment
ConcurrencyPractitioner Feb 13, 2019
c6b9bfa
Removing excess comment
ConcurrencyPractitioner Feb 13, 2019
b28e5c9
Fixing tests and addressing comment
ConcurrencyPractitioner Feb 14, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Rate limit the amount of writes per second generated by consumer acking the messages"
)
private double managedLedgerDefaultMarkDeleteRateLimit = 1.0;

@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Allow automated creation of non-partition topics if set to true (default value)."
)
private boolean allowAutoTopicCreation = true;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Number of threads to be used for managed ledger tasks dispatching"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.offload.OffloaderUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.ZkAdminPaths;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
Expand Down Expand Up @@ -1296,7 +1298,13 @@ private Topic getTopicReference(TopicName topicName) {
TopicName partitionTopicName = TopicName.get(topicName.getPartitionedTopicName());
PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(partitionTopicName, false);
if (partitionedTopicMetadata == null || partitionedTopicMetadata.partitions == 0) {
return new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
final String errSrc;
if (partitionedTopicMetadata != null) {
errSrc = " has zero partitions";
} else {
errSrc = " has no metadata";
}
return new RestException(Status.NOT_FOUND, "Partitioned Topic not found: " + topicName.toString() + errSrc);
} else if (!internalGetList().contains(topicName.toString())) {
return new RestException(Status.NOT_FOUND, "Topic partitions were not yet created");
}
Expand All @@ -1306,7 +1314,7 @@ private Topic getTopicReference(TopicName topicName) {
}

private Topic getOrCreateTopic(TopicName topicName) {
return pulsar().getBrokerService().getOrCreateTopic(topicName.toString()).join();
return pulsar().getBrokerService().getTopic(topicName.toString(), true).thenApply(Optional::get).join();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,11 +462,11 @@ public CompletableFuture<Optional<Topic>> getTopicIfExists(final String topic) {
return getTopic(topic, false /* createIfMissing */);
}

public CompletableFuture<Topic> getOrCreateTopic(final String topic) {
return getTopic(topic, true /* createIfMissing */).thenApply(Optional::get);
public CompletableFuture<Topic> getOrCreateTopic(final String topic) {
return getTopic(topic, pulsar.getConfiguration().isAllowAutoTopicCreation()).thenApply(Optional::get);
}

private CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean createIfMissing) {
public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean createIfMissing) {
try {
CompletableFuture<Optional<Topic>> topicFuture = topics.get(topic);
if (topicFuture != null) {
Expand Down Expand Up @@ -618,7 +618,8 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
return topicFuture;
}

private void createPersistentTopic(final String topic, boolean createIfMissing, CompletableFuture<Optional<Topic>> topicFuture) {
private void createPersistentTopic(final String topic, boolean createIfMissing,
CompletableFuture<Optional<Topic>> topicFuture) {

final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
TopicName topicName = TopicName.get(topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand Down Expand Up @@ -98,7 +99,7 @@ public void testGetSubscriptions() {
try {
persistentTopics.getSubscriptions(testTenant, testNamespace, testLocalTopicName + "-partition-0", true);
} catch (Exception e) {
Assert.assertEquals("Partitioned Topic not found", e.getMessage());
Assert.assertEquals("Partitioned Topic not found: persistent://my-tenant/my-namespace/topic-not-found-partition-0 has zero partitions", e.getMessage());
}
persistentTopics.createPartitionedTopic(testTenant, testNamespace, testLocalTopicName, 3, true);
try {
Expand All @@ -115,4 +116,15 @@ public void testGetSubscriptions() {
persistentTopics.deletePartitionedTopic(testTenant, testNamespace, testLocalTopicName, true, true);
}

@Test
public void testGetSubscriptionsWithAutoTopicCreationDisabled() {
pulsar.getConfiguration().setAllowAutoTopicCreation(false);
final String nonPartitionTopic = "non-partitioned-topic";
persistentTopics.createSubscription(testTenant, testNamespace, nonPartitionTopic, "test", true, (MessageIdImpl) MessageId.latest);
try {
persistentTopics.getSubscriptions(testTenant, testNamespace, nonPartitionTopic + "-partition-0", true);
} catch (RestException exc) {
Assert.assertTrue(exc.getMessage().contains("zero partitions"));
}
}
}