diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index e5dd13c32eb23..f70d3f31d6e2f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -117,6 +117,13 @@ public Optional getPolicies(NamespaceName ns) throws MetadataStoreExce return get(joinPath(BASE_POLICIES_PATH, ns.toString())); } + /** + * Get the namespace policy from the metadata cache. This method will not trigger the load of metadata cache. + * + * @deprecated Since this method may introduce inconsistent namespace policies. we should use + * #{@link NamespaceResources#getPoliciesAsync} + */ + @Deprecated public Optional getPoliciesIfCached(NamespaceName ns) { return getCache().getIfCached(joinPath(BASE_POLICIES_PATH, ns.toString())); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 90693091c89b8..ec1593b076b44 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC; import com.google.common.base.MoreObjects; import java.util.ArrayList; @@ -40,6 +41,7 @@ import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.ToLongFunction; +import javax.annotation.Nonnull; import lombok.Getter; import org.apache.bookkeeper.mledger.util.StatsBuckets; import org.apache.commons.collections4.CollectionUtils; @@ -61,6 +63,7 @@ import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; @@ -1128,6 +1131,12 @@ public PublishRateLimiter getBrokerPublishRateLimiter() { return brokerService.getBrokerPublishRateLimiter(); } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and we can use + * #{@link AbstractTopic#updateResourceGroupLimiter(Policies)} to instead of it. + */ + @Deprecated public void updateResourceGroupLimiter(Optional optPolicies) { Policies policies; try { @@ -1141,17 +1150,20 @@ public void updateResourceGroupLimiter(Optional optPolicies) { log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage()); policies = new Policies(); } + updateResourceGroupLimiter(policies); + } + public void updateResourceGroupLimiter(@Nonnull Policies namespacePolicies) { + requireNonNull(namespacePolicies); // attach the resource-group level rate limiters, if set - String rgName = policies.resource_group_name; + String rgName = namespacePolicies.resource_group_name; if (rgName != null) { final ResourceGroup resourceGroup = - brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName); + brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName); if (resourceGroup != null) { this.resourceGroupRateLimitingEnabled = true; this.resourceGroupPublishLimiter = resourceGroup.getResourceGroupPublishLimiter(); - this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(), - () -> this.enableCnxAutoRead()); + this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(), this::enableCnxAutoRead); log.info("Using resource group {} rate limiter for topic {}", rgName, topic); return; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 8dc94ca874094..6c48e4d9ae89f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; import static org.apache.bookkeeper.mledger.ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY; import static org.apache.commons.collections4.CollectionUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotBlank; @@ -48,11 +49,11 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.RejectedExecutionException; @@ -68,6 +69,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.function.Predicate; +import javax.annotation.Nonnull; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; @@ -1934,7 +1936,7 @@ private void addTopicToStatsMaps(TopicName topicName, Topic topic) { } public void refreshTopicToStatsMaps(NamespaceBundle oldBundle) { - Objects.requireNonNull(oldBundle); + requireNonNull(oldBundle); try { // retrieve all topics under existing old bundle List topics = getAllTopicsFromNamespaceBundle(oldBundle.getNamespaceObject().toString(), @@ -3267,10 +3269,9 @@ public CompletableFuture isAllowAutoTopicCreationAsync(final String top } public CompletableFuture isAllowAutoTopicCreationAsync(final TopicName topicName) { - Optional policies = - pulsar.getPulsarResources().getNamespaceResources() - .getPoliciesIfCached(topicName.getNamespaceObject()); - return isAllowAutoTopicCreationAsync(topicName, policies); + return pulsar.getPulsarResources().getNamespaceResources() + .getPoliciesAsync(topicName.getNamespaceObject()) + .thenCompose(policies -> isAllowAutoTopicCreationAsync(topicName, policies)); } private CompletableFuture isAllowAutoTopicCreationAsync(final TopicName topicName, @@ -3340,11 +3341,23 @@ private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName t return null; } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking + * call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it. + */ + @Deprecated public boolean isAllowAutoSubscriptionCreation(final String topic) { TopicName topicName = TopicName.get(topic); return isAllowAutoSubscriptionCreation(topicName); } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking + * call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it. + */ + @Deprecated public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) { AutoSubscriptionCreationOverride autoSubscriptionCreationOverride = getAutoSubscriptionCreationOverride(topicName); @@ -3355,6 +3368,12 @@ public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) { } } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking + * call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it. + */ + @Deprecated private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) { Optional topicPolicies = getTopicPolicies(topicName); if (topicPolicies.isPresent() && topicPolicies.get().getAutoSubscriptionCreationOverride() != null) { @@ -3371,6 +3390,25 @@ private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(fin return null; } + public @Nonnull CompletionStage isAllowAutoSubscriptionCreationAsync(@Nonnull TopicName tpName) { + requireNonNull(tpName); + // topic level policies + final var topicPolicies = getTopicPolicies(tpName); + if (topicPolicies.isPresent() && topicPolicies.get().getAutoSubscriptionCreationOverride() != null) { + return CompletableFuture.completedFuture(topicPolicies.get().getAutoSubscriptionCreationOverride() + .isAllowAutoSubscriptionCreation()); + } + // namespace level policies + return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(tpName.getNamespaceObject()) + .thenApply(policies -> { + if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) { + return policies.get().autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation(); + } + // broker level policies + return pulsar.getConfiguration().isAllowAutoSubscriptionCreation(); + }); + } + public boolean isSystemTopic(String topic) { return isSystemTopic(TopicName.get(topic)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 8dfeb54fd6a32..f3b355d63325f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -55,6 +55,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -1210,39 +1211,43 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { .failedFuture(new TopicNotFoundException( "Topic " + topicName + " does not exist")); } + final Topic topic = optTopic.get(); + return service.isAllowAutoSubscriptionCreationAsync(topicName) + .thenCompose(isAllowedAutoSubscriptionCreation -> { + boolean rejectSubscriptionIfDoesNotExist = isDurable + && !isAllowedAutoSubscriptionCreation + && !topic.getSubscriptions().containsKey(subscriptionName) + && topic.isPersistent(); + + if (rejectSubscriptionIfDoesNotExist) { + return FutureUtil + .failedFuture( + new SubscriptionNotFoundException( + "Subscription does not exist")); + } - Topic topic = optTopic.get(); - - boolean rejectSubscriptionIfDoesNotExist = isDurable - && !service.isAllowAutoSubscriptionCreation(topicName.toString()) - && !topic.getSubscriptions().containsKey(subscriptionName) - && topic.isPersistent(); - - if (rejectSubscriptionIfDoesNotExist) { - return FutureUtil - .failedFuture( - new SubscriptionNotFoundException( - "Subscription does not exist")); - } - - SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this) - .subscriptionName(subscriptionName) - .consumerId(consumerId).subType(subType).priorityLevel(priorityLevel) - .consumerName(consumerName).isDurable(isDurable) - .startMessageId(startMessageId).metadata(metadata).readCompacted(readCompacted) - .initialPosition(initialPosition) - .startMessageRollbackDurationSec(startMessageRollbackDurationSec) - .replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta) - .subscriptionProperties(subscriptionProperties) - .consumerEpoch(consumerEpoch) - .schemaType(schema == null ? null : schema.getType()) - .build(); - if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) { - return topic.addSchemaIfIdleOrCheckCompatible(schema) - .thenCompose(v -> topic.subscribe(option)); - } else { - return topic.subscribe(option); - } + SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this) + .subscriptionName(subscriptionName) + .consumerId(consumerId).subType(subType) + .priorityLevel(priorityLevel) + .consumerName(consumerName).isDurable(isDurable) + .startMessageId(startMessageId).metadata(metadata) + .readCompacted(readCompacted) + .initialPosition(initialPosition) + .startMessageRollbackDurationSec(startMessageRollbackDurationSec) + .replicatedSubscriptionStateArg(isReplicated) + .keySharedMeta(keySharedMeta) + .subscriptionProperties(subscriptionProperties) + .consumerEpoch(consumerEpoch) + .schemaType(schema == null ? null : schema.getType()) + .build(); + if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) { + return topic.addSchemaIfIdleOrCheckCompatible(schema) + .thenCompose(v -> topic.subscribe(option)); + } else { + return topic.subscribe(option); + } + }); }) .thenAccept(consumer -> { if (consumerFuture.complete(consumer)) { @@ -1461,33 +1466,38 @@ protected void handleProducer(final CommandProducer cmdProducer) { schemaVersionFuture.thenAccept(schemaVersion -> { topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future -> { - CompletableFuture createInitSubFuture; + CompletionStage createInitSubFuture; if (!Strings.isNullOrEmpty(initialSubscriptionName) && topic.isPersistent() && !topic.getSubscriptions().containsKey(initialSubscriptionName)) { - if (!this.getBrokerService().isAllowAutoSubscriptionCreation(topicName)) { - String msg = - "Could not create the initial subscription due to the auto subscription " - + "creation is not allowed."; - if (producerFuture.completeExceptionally( - new BrokerServiceException.NotAllowedException(msg))) { - log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", - remoteAddress, msg, initialSubscriptionName, topicName); - commandSender.sendErrorResponse(requestId, - ServerError.NotAllowedError, msg); - } - producers.remove(producerId, producerFuture); - return; - } - createInitSubFuture = - topic.createSubscription(initialSubscriptionName, InitialPosition.Earliest, - false, null); + createInitSubFuture = service.isAllowAutoSubscriptionCreationAsync(topicName) + .thenCompose(isAllowAutoSubscriptionCreation -> { + if (!isAllowAutoSubscriptionCreation) { + return CompletableFuture.failedFuture( + new BrokerServiceException.NotAllowedException( + "Could not create the initial subscription due to" + + " the auto subscription creation is not allowed.")); + } + return topic.createSubscription(initialSubscriptionName, + InitialPosition.Earliest, false, null); + }); } else { createInitSubFuture = CompletableFuture.completedFuture(null); } createInitSubFuture.whenComplete((sub, ex) -> { if (ex != null) { + final Throwable rc = FutureUtil.unwrapCompletionException(ex); + if (rc instanceof BrokerServiceException.NotAllowedException) { + log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", + remoteAddress, rc.getMessage(), initialSubscriptionName, topicName); + if (producerFuture.completeExceptionally(rc)) { + commandSender.sendErrorResponse(requestId, + ServerError.NotAllowedError, rc.getMessage()); + } + producers.remove(producerId, producerFuture); + return; + } String msg = "Failed to create the initial subscription: " + ex.getCause().getMessage(); log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 9fe0a735c90d9..c764283cb4459 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -167,17 +167,19 @@ public CompletableFuture initialize() { return brokerService.pulsar().getPulsarResources().getNamespaceResources() .getPoliciesAsync(TopicName.get(topic).getNamespaceObject()) .thenCompose(optPolicies -> { + final Policies policies; if (!optPolicies.isPresent()) { log.warn("[{}] Policies not present and isEncryptionRequired will be set to false", topic); isEncryptionRequired = false; + policies = new Policies(); } else { - Policies policies = optPolicies.get(); + policies = optPolicies.get(); updateTopicPolicyByNamespacePolicy(policies); isEncryptionRequired = policies.encryption_required; isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema; } updatePublishDispatcher(); - updateResourceGroupLimiter(optPolicies); + updateResourceGroupLimiter(policies); return updateClusterMigrated(); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index 0650d5bc1cf38..233972452e97f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -204,6 +204,12 @@ public static CompletableFuture> getPoliciesAsync(BrokerServi return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace); } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking + * call. we can use #{@link DispatchRateLimiter#getPoliciesAsync(BrokerService, String)} to instead of it. + */ + @Deprecated public static Optional getPolicies(BrokerService brokerService, String topicName) { final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject(); return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(namespace); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index e907caa8c3000..039f576b04420 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service.persistent; +import static java.util.Objects.requireNonNull; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter.isSubscribeRateEnabled; import static org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopic; @@ -340,7 +341,7 @@ public CompletableFuture initialize() { if (!optPolicies.isPresent()) { isEncryptionRequired = false; updatePublishDispatcher(); - updateResourceGroupLimiter(optPolicies); + updateResourceGroupLimiter(new Policies()); initializeDispatchRateLimiterIfNeeded(); updateSubscribeRateLimiter(); return; @@ -356,7 +357,7 @@ public CompletableFuture initialize() { updatePublishDispatcher(); - updateResourceGroupLimiter(optPolicies); + updateResourceGroupLimiter(policies); this.isEncryptionRequired = policies.encryption_required; @@ -487,7 +488,7 @@ public CompletableFuture unloadSubscription(@Nonnull String subName) { private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor, boolean replicated, Map subscriptionProperties) { - Objects.requireNonNull(topicCompactionService); + requireNonNull(topicCompactionService); if (isCompactionSubscription(subscriptionName) && topicCompactionService instanceof PulsarTopicCompactionService pulsarTopicCompactionService) { CompactedTopicImpl compactedTopic = pulsarTopicCompactionService.getCompactedTopic(); @@ -2804,7 +2805,8 @@ public void updateDispatchRateLimiter() { } @Override - public CompletableFuture onPoliciesUpdate(Policies data) { + public CompletableFuture onPoliciesUpdate(@Nonnull Policies data) { + requireNonNull(data); if (log.isDebugEnabled()) { log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required); @@ -2826,7 +2828,7 @@ public CompletableFuture onPoliciesUpdate(Policies data) { updatePublishDispatcher(); - this.updateResourceGroupLimiter(Optional.of(data)); + updateResourceGroupLimiter(data); List> producerCheckFutures = new ArrayList<>(producers.size()); producers.values().forEach(producer -> producerCheckFutures.add( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java index a052f1f6abf33..58f099a9dab80 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java @@ -28,6 +28,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.util.RateLimiter; @@ -159,14 +160,21 @@ public void onSubscribeRateUpdate(SubscribeRate subscribeRate) { } /** - * Gets configured subscribe-rate from namespace policies. Returns null if subscribe-rate is not configured - * - * @return + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking + * call. */ + @Deprecated public SubscribeRate getPoliciesSubscribeRate() { return getPoliciesSubscribeRate(brokerService, topicName); } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking + * call. + */ + @Deprecated public static SubscribeRate getPoliciesSubscribeRate(BrokerService brokerService, final String topicName) { final String cluster = brokerService.pulsar().getConfiguration().getClusterName(); final Optional policies = DispatchRateLimiter.getPolicies(brokerService, topicName);