Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ public Optional<Policies> getPolicies(NamespaceName ns) throws MetadataStoreExce
return get(joinPath(BASE_POLICIES_PATH, ns.toString()));
}

/**
* Get the namespace policy from the metadata cache. This method will not trigger the load of metadata cache.
*
* @deprecated Since this method may introduce inconsistent namespace policies. we should use
* #{@link NamespaceResources#getPoliciesAsync}
*/
@Deprecated
public Optional<Policies> getPoliciesIfCached(NamespaceName ns) {
return getCache().getIfCached(joinPath(BASE_POLICIES_PATH, ns.toString()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2209,9 +2209,10 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su
internalCreateSubscriptionForNonPartitionedTopic(asyncResponse,
subscriptionName, targetMessageId, authoritative, replicated, properties);
} else {
boolean allowAutoTopicCreation = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName);
getPartitionedTopicMetadataAsync(topicName,
authoritative, allowAutoTopicCreation).thenAccept(partitionMetadata -> {
pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName)
.thenCompose(allowAutoTopicCreation ->
getPartitionedTopicMetadataAsync(topicName, authoritative, allowAutoTopicCreation))
.thenAccept(partitionMetadata -> {
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
final CompletableFuture<Void> future = new CompletableFuture<>();
Expand Down Expand Up @@ -2307,13 +2308,13 @@ private void internalCreateSubscriptionForNonPartitionedTopic(
MessageIdImpl targetMessageId, boolean authoritative, boolean replicated,
Map<String, String> properties) {

boolean isAllowAutoTopicCreation = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName);

validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> {
validateTopicOperation(topicName, TopicOperation.SUBSCRIBE, subscriptionName);
return pulsar().getBrokerService().getTopic(topicName.toString(), isAllowAutoTopicCreation);
}).thenApply(optTopic -> {
pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName)
.thenCompose(isAllowAutoTopicCreation -> validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> {
validateTopicOperation(topicName, TopicOperation.SUBSCRIBE, subscriptionName);
return pulsar().getBrokerService().getTopic(topicName.toString(), isAllowAutoTopicCreation);
}))
.thenApply(optTopic -> {
if (optTopic.isPresent()) {
return optTopic.get();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,11 @@ protected CompletableFuture<LookupData> internalLookupTopicAsync(TopicName topic
// Currently, it's hard to check the non-persistent-non-partitioned topic, because it only exists
// in the broker, it doesn't have metadata. If the topic is non-persistent and non-partitioned,
// we'll return the true flag.
CompletableFuture<Boolean> existFuture = pulsar().getBrokerService()
.isAllowAutoTopicCreation(topicName)
|| (!topicName.isPersistent() && !topicName.isPartitioned())
? CompletableFuture.completedFuture(true)
: pulsar().getNamespaceService().checkTopicExists(topicName);
return existFuture;
return pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName)
.thenCompose(isAllowAutoTopicCreation ->
isAllowAutoTopicCreation || (!topicName.isPersistent() && !topicName.isPartitioned())
? CompletableFuture.completedFuture(true) :
pulsar().getNamespaceService().checkTopicExists(topicName));
})
.thenCompose(exist -> {
if (!exist) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;
Expand All @@ -41,6 +42,7 @@
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.ToLongFunction;
import javax.annotation.Nonnull;
import lombok.Getter;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.commons.collections4.CollectionUtils;
Expand All @@ -58,6 +60,7 @@
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
Expand Down Expand Up @@ -1114,6 +1117,12 @@ public PublishRateLimiter getBrokerPublishRateLimiter() {
return brokerService.getBrokerPublishRateLimiter();
}

/**
* @deprecated Avoid using the deprecated method
* #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and we can use
* #{@link AbstractTopic#updateResourceGroupLimiter(Policies)} to instead of it.
*/
@Deprecated
public void updateResourceGroupLimiter(Optional<Policies> optPolicies) {
Policies policies;
try {
Expand All @@ -1127,17 +1136,20 @@ public void updateResourceGroupLimiter(Optional<Policies> optPolicies) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
policies = new Policies();
}
updateResourceGroupLimiter(policies);
}

public void updateResourceGroupLimiter(@Nonnull Policies namespacePolicies) {
requireNonNull(namespacePolicies);
// attach the resource-group level rate limiters, if set
String rgName = policies.resource_group_name;
String rgName = namespacePolicies.resource_group_name;
if (rgName != null) {
final ResourceGroup resourceGroup =
brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName);
brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName);
if (resourceGroup != null) {
this.resourceGroupRateLimitingEnabled = true;
this.resourceGroupPublishLimiter = resourceGroup.getResourceGroupPublishLimiter();
this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(),
() -> this.enableCnxAutoRead());
this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(), this::enableCnxAutoRead);
log.info("Using resource group {} rate limiter for topic {}", rgName, topic);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Objects.requireNonNull;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static org.apache.commons.collections4.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
Expand Down Expand Up @@ -56,6 +57,7 @@
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
Expand All @@ -72,6 +74,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
Expand Down Expand Up @@ -945,7 +948,15 @@ public CompletableFuture<Optional<Topic>> getTopicIfExists(final String topic) {
}

public CompletableFuture<Topic> getOrCreateTopic(final String topic) {
return getTopic(topic, isAllowAutoTopicCreation(topic)).thenApply(Optional::get);
final TopicName topicName;
try {
topicName = TopicName.get(topic);
} catch (Throwable ex) {
return FutureUtil.failedFuture(ex);
}
return isAllowAutoTopicCreationAsync(topicName)
.thenCompose(isAllowAutoTopicCreation -> getTopic(topic, isAllowAutoTopicCreation)
.thenApply(Optional::get));
}

public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean createIfMissing) {
Expand Down Expand Up @@ -2882,7 +2893,8 @@ public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadata
if (metadata.partitions == 0
&& !topicExists
&& !topicName.isPartitioned()
&& pulsar.getBrokerService().isAllowAutoTopicCreation(topicName, policies)
&& pulsar.getBrokerService()
.isAllowAutoTopicCreation(topicName, policies)
&& pulsar.getBrokerService()
.isDefaultTopicTypePartitioned(topicName, policies)) {

Expand Down Expand Up @@ -3099,18 +3111,36 @@ public Optional<Integer> getListenPortTls() {
}
}


/**
* @deprecated Avoid using the deprecated method
* #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
* You can use #{@link BrokerService#isAllowAutoTopicCreationAsync(TopicName)}
*/
@Deprecated
public boolean isAllowAutoTopicCreation(final String topic) {
TopicName topicName = TopicName.get(topic);
return isAllowAutoTopicCreation(topicName);
}

/**
* @deprecated Avoid using the deprecated method
* #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)}
* You can use #{@link BrokerService#isAllowAutoTopicCreationAsync(TopicName)}
*/
@Deprecated
public boolean isAllowAutoTopicCreation(final TopicName topicName) {
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesIfCached(topicName.getNamespaceObject());
return isAllowAutoTopicCreation(topicName, policies);
}

public CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(final TopicName topicName) {
return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject())
.thenApply(policies -> isAllowAutoTopicCreation(topicName, policies));
}

public boolean isAllowAutoTopicCreation(final TopicName topicName, final Optional<Policies> policies) {
if (policies.isPresent() && policies.get().deleted) {
log.info("Preventing AutoTopicCreation on a namespace that is being deleted {}",
Expand Down Expand Up @@ -3157,11 +3187,23 @@ private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName t
return null;
}

/**
* @deprecated Avoid using the deprecated method
* #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking
* call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it.
*/
@Deprecated
public boolean isAllowAutoSubscriptionCreation(final String topic) {
TopicName topicName = TopicName.get(topic);
return isAllowAutoSubscriptionCreation(topicName);
}

/**
* @deprecated Avoid using the deprecated method
* #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking
* call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it.
*/
@Deprecated
public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) {
AutoSubscriptionCreationOverride autoSubscriptionCreationOverride =
getAutoSubscriptionCreationOverride(topicName);
Expand All @@ -3172,6 +3214,12 @@ public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) {
}
}

/**
* @deprecated Avoid using the deprecated method
* #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking
* call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it.
*/
@Deprecated
private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) {
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources().getPoliciesIfCached(topicName.getNamespaceObject());
Expand All @@ -3183,6 +3231,19 @@ private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(fin
return null;
}

public @Nonnull CompletionStage<Boolean> isAllowAutoSubscriptionCreationAsync(@Nonnull TopicName tpName) {
requireNonNull(tpName);
// namespace level policies
return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(tpName.getNamespaceObject())
.thenApply(policies -> {
if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) {
return policies.get().autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation();
}
// broker level policies
return pulsar.getConfiguration().isAllowAutoSubscriptionCreation();
});
}

public boolean isSystemTopic(String topic) {
return isSystemTopic(TopicName.get(topic));
}
Expand Down
Loading