Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ public Optional<Policies> getPolicies(NamespaceName ns) throws MetadataStoreExce
return get(joinPath(BASE_POLICIES_PATH, ns.toString()));
}

/**
* Get the namespace policy from the metadata cache. This method will not trigger the load of metadata cache.
*
* @deprecated Since this method may introduce inconsistent namespace policies. we should use
* #{@link NamespaceResources#getPoliciesAsync}
*/
@Deprecated
public Optional<Policies> getPoliciesIfCached(NamespaceName ns) {
return getCache().getIfCached(joinPath(BASE_POLICIES_PATH, ns.toString()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import com.google.common.base.MoreObjects;
import java.util.ArrayList;
Expand All @@ -40,6 +41,7 @@
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.ToLongFunction;
import javax.annotation.Nonnull;
import lombok.Getter;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.commons.collections4.CollectionUtils;
Expand All @@ -61,6 +63,7 @@
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
Expand Down Expand Up @@ -1128,6 +1131,12 @@ public PublishRateLimiter getBrokerPublishRateLimiter() {
return brokerService.getBrokerPublishRateLimiter();
}

/**
* @deprecated Avoid using the deprecated method
* #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and we can use
* #{@link AbstractTopic#updateResourceGroupLimiter(Policies)} to instead of it.
*/
@Deprecated
Comment thread
mattisonchao marked this conversation as resolved.
public void updateResourceGroupLimiter(Optional<Policies> optPolicies) {
Policies policies;
try {
Expand All @@ -1141,17 +1150,20 @@ public void updateResourceGroupLimiter(Optional<Policies> optPolicies) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
policies = new Policies();
}
updateResourceGroupLimiter(policies);
}

public void updateResourceGroupLimiter(@Nonnull Policies namespacePolicies) {
requireNonNull(namespacePolicies);
// attach the resource-group level rate limiters, if set
String rgName = policies.resource_group_name;
String rgName = namespacePolicies.resource_group_name;
if (rgName != null) {
final ResourceGroup resourceGroup =
brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName);
brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName);
if (resourceGroup != null) {
this.resourceGroupRateLimitingEnabled = true;
this.resourceGroupPublishLimiter = resourceGroup.getResourceGroupPublishLimiter();
this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(),
() -> this.enableCnxAutoRead());
this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(), this::enableCnxAutoRead);
log.info("Using resource group {} rate limiter for topic {}", rgName, topic);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static org.apache.bookkeeper.mledger.ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY;
import static org.apache.commons.collections4.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
Expand Down Expand Up @@ -48,11 +49,11 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
Expand All @@ -68,6 +69,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
Expand Down Expand Up @@ -1934,7 +1936,7 @@ private void addTopicToStatsMaps(TopicName topicName, Topic topic) {
}

public void refreshTopicToStatsMaps(NamespaceBundle oldBundle) {
Objects.requireNonNull(oldBundle);
requireNonNull(oldBundle);
try {
// retrieve all topics under existing old bundle
List<Topic> topics = getAllTopicsFromNamespaceBundle(oldBundle.getNamespaceObject().toString(),
Expand Down Expand Up @@ -3267,10 +3269,9 @@ public CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(final String top
}

public CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(final TopicName topicName) {
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesIfCached(topicName.getNamespaceObject());
return isAllowAutoTopicCreationAsync(topicName, policies);
return pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesAsync(topicName.getNamespaceObject())
.thenCompose(policies -> isAllowAutoTopicCreationAsync(topicName, policies));
}

private CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(final TopicName topicName,
Expand Down Expand Up @@ -3340,11 +3341,23 @@ private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName t
return null;
}

/**
* @deprecated Avoid using the deprecated method
* #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking
* call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it.
*/
@Deprecated
Comment thread
mattisonchao marked this conversation as resolved.
public boolean isAllowAutoSubscriptionCreation(final String topic) {
TopicName topicName = TopicName.get(topic);
return isAllowAutoSubscriptionCreation(topicName);
}

/**
* @deprecated Avoid using the deprecated method
* #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking
* call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it.
*/
@Deprecated
public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) {
AutoSubscriptionCreationOverride autoSubscriptionCreationOverride =
getAutoSubscriptionCreationOverride(topicName);
Expand All @@ -3355,6 +3368,12 @@ public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) {
}
}

/**
* @deprecated Avoid using the deprecated method
* #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking
* call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it.
*/
@Deprecated
private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) {
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (topicPolicies.isPresent() && topicPolicies.get().getAutoSubscriptionCreationOverride() != null) {
Expand All @@ -3371,6 +3390,25 @@ private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(fin
return null;
}

public @Nonnull CompletionStage<Boolean> isAllowAutoSubscriptionCreationAsync(@Nonnull TopicName tpName) {
requireNonNull(tpName);
// topic level policies
final var topicPolicies = getTopicPolicies(tpName);
if (topicPolicies.isPresent() && topicPolicies.get().getAutoSubscriptionCreationOverride() != null) {
return CompletableFuture.completedFuture(topicPolicies.get().getAutoSubscriptionCreationOverride()
.isAllowAutoSubscriptionCreation());
}
// namespace level policies
return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(tpName.getNamespaceObject())
.thenApply(policies -> {
if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) {
return policies.get().autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation();
}
// broker level policies
return pulsar.getConfiguration().isAllowAutoSubscriptionCreation();
});
}

public boolean isSystemTopic(String topic) {
return isSystemTopic(TopicName.get(topic));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -1210,39 +1211,43 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
.failedFuture(new TopicNotFoundException(
"Topic " + topicName + " does not exist"));
}
final Topic topic = optTopic.get();
return service.isAllowAutoSubscriptionCreationAsync(topicName)
.thenCompose(isAllowedAutoSubscriptionCreation -> {
boolean rejectSubscriptionIfDoesNotExist = isDurable
&& !isAllowedAutoSubscriptionCreation
&& !topic.getSubscriptions().containsKey(subscriptionName)
&& topic.isPersistent();

if (rejectSubscriptionIfDoesNotExist) {
return FutureUtil
.failedFuture(
new SubscriptionNotFoundException(
"Subscription does not exist"));
}

Topic topic = optTopic.get();

boolean rejectSubscriptionIfDoesNotExist = isDurable
&& !service.isAllowAutoSubscriptionCreation(topicName.toString())
&& !topic.getSubscriptions().containsKey(subscriptionName)
&& topic.isPersistent();

if (rejectSubscriptionIfDoesNotExist) {
return FutureUtil
.failedFuture(
new SubscriptionNotFoundException(
"Subscription does not exist"));
}

SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this)
.subscriptionName(subscriptionName)
.consumerId(consumerId).subType(subType).priorityLevel(priorityLevel)
.consumerName(consumerName).isDurable(isDurable)
.startMessageId(startMessageId).metadata(metadata).readCompacted(readCompacted)
.initialPosition(initialPosition)
.startMessageRollbackDurationSec(startMessageRollbackDurationSec)
.replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta)
.subscriptionProperties(subscriptionProperties)
.consumerEpoch(consumerEpoch)
.schemaType(schema == null ? null : schema.getType())
.build();
if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) {
return topic.addSchemaIfIdleOrCheckCompatible(schema)
.thenCompose(v -> topic.subscribe(option));
} else {
return topic.subscribe(option);
}
SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this)
.subscriptionName(subscriptionName)
.consumerId(consumerId).subType(subType)
.priorityLevel(priorityLevel)
.consumerName(consumerName).isDurable(isDurable)
.startMessageId(startMessageId).metadata(metadata)
.readCompacted(readCompacted)
.initialPosition(initialPosition)
.startMessageRollbackDurationSec(startMessageRollbackDurationSec)
.replicatedSubscriptionStateArg(isReplicated)
.keySharedMeta(keySharedMeta)
.subscriptionProperties(subscriptionProperties)
.consumerEpoch(consumerEpoch)
.schemaType(schema == null ? null : schema.getType())
.build();
if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) {
return topic.addSchemaIfIdleOrCheckCompatible(schema)
.thenCompose(v -> topic.subscribe(option));
} else {
return topic.subscribe(option);
}
});
})
.thenAccept(consumer -> {
if (consumerFuture.complete(consumer)) {
Expand Down Expand Up @@ -1461,33 +1466,38 @@ protected void handleProducer(final CommandProducer cmdProducer) {

schemaVersionFuture.thenAccept(schemaVersion -> {
topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future -> {
CompletableFuture<Subscription> createInitSubFuture;
CompletionStage<Subscription> createInitSubFuture;
if (!Strings.isNullOrEmpty(initialSubscriptionName)
&& topic.isPersistent()
&& !topic.getSubscriptions().containsKey(initialSubscriptionName)) {
if (!this.getBrokerService().isAllowAutoSubscriptionCreation(topicName)) {
String msg =
"Could not create the initial subscription due to the auto subscription "
+ "creation is not allowed.";
if (producerFuture.completeExceptionally(
new BrokerServiceException.NotAllowedException(msg))) {
log.warn("[{}] {} initialSubscriptionName: {}, topic: {}",
remoteAddress, msg, initialSubscriptionName, topicName);
commandSender.sendErrorResponse(requestId,
ServerError.NotAllowedError, msg);
}
producers.remove(producerId, producerFuture);
return;
}
createInitSubFuture =
topic.createSubscription(initialSubscriptionName, InitialPosition.Earliest,
false, null);
createInitSubFuture = service.isAllowAutoSubscriptionCreationAsync(topicName)
.thenCompose(isAllowAutoSubscriptionCreation -> {
if (!isAllowAutoSubscriptionCreation) {
return CompletableFuture.failedFuture(
new BrokerServiceException.NotAllowedException(
"Could not create the initial subscription due to"
+ " the auto subscription creation is not allowed."));
}
return topic.createSubscription(initialSubscriptionName,
InitialPosition.Earliest, false, null);
});
} else {
createInitSubFuture = CompletableFuture.completedFuture(null);
}

createInitSubFuture.whenComplete((sub, ex) -> {
if (ex != null) {
final Throwable rc = FutureUtil.unwrapCompletionException(ex);
if (rc instanceof BrokerServiceException.NotAllowedException) {
log.warn("[{}] {} initialSubscriptionName: {}, topic: {}",
remoteAddress, rc.getMessage(), initialSubscriptionName, topicName);
if (producerFuture.completeExceptionally(rc)) {
commandSender.sendErrorResponse(requestId,
ServerError.NotAllowedError, rc.getMessage());
}
producers.remove(producerId, producerFuture);
return;
}
String msg =
"Failed to create the initial subscription: " + ex.getCause().getMessage();
log.warn("[{}] {} initialSubscriptionName: {}, topic: {}",
Expand Down
Loading