From 95ddebaedcdbda4419f4be35ceb1b8f947280698 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Wed, 26 Jul 2023 10:19:14 +0800 Subject: [PATCH 1/3] [branch-2.10][fix][broker] Fix inconsensus namespace policies by getPoliciesIfCached (#20873) --- .../broker/resources/NamespaceResources.java | 7 + .../admin/impl/PersistentTopicsBase.java | 21 +-- .../pulsar/broker/service/AbstractTopic.java | 20 ++- .../pulsar/broker/service/BrokerService.java | 65 +++++++++- .../pulsar/broker/service/ServerCnx.java | 120 ++++++++++-------- .../nonpersistent/NonPersistentTopic.java | 6 +- .../persistent/DispatchRateLimiter.java | 6 + .../service/persistent/PersistentTopic.java | 11 +- .../persistent/SubscribeRateLimiter.java | 14 +- .../lookup/http/HttpTopicLookupv2Test.java | 3 + 10 files changed, 194 insertions(+), 79 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 2223e951f6628..327e6a784dd5c 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -126,6 +126,13 @@ public Optional getPolicies(NamespaceName ns) throws MetadataStoreExce return get(joinPath(BASE_POLICIES_PATH, ns.toString())); } + /** + * Get the namespace policy from the metadata cache. This method will not trigger the load of metadata cache. + * + * @deprecated Since this method may introduce inconsistent namespace policies. we should use + * #{@link NamespaceResources#getPoliciesAsync} + */ + @Deprecated public Optional getPoliciesIfCached(NamespaceName ns) { return getCache().getIfCached(joinPath(BASE_POLICIES_PATH, ns.toString())); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 8d6534e95d181..465be8b01c51e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2209,9 +2209,10 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su internalCreateSubscriptionForNonPartitionedTopic(asyncResponse, subscriptionName, targetMessageId, authoritative, replicated, properties); } else { - boolean allowAutoTopicCreation = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName); - getPartitionedTopicMetadataAsync(topicName, - authoritative, allowAutoTopicCreation).thenAccept(partitionMetadata -> { + pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName) + .thenCompose(allowAutoTopicCreation -> + getPartitionedTopicMetadataAsync(topicName, authoritative, allowAutoTopicCreation)) + .thenAccept(partitionMetadata -> { final int numPartitions = partitionMetadata.partitions; if (numPartitions > 0) { final CompletableFuture future = new CompletableFuture<>(); @@ -2307,13 +2308,13 @@ private void internalCreateSubscriptionForNonPartitionedTopic( MessageIdImpl targetMessageId, boolean authoritative, boolean replicated, Map properties) { - boolean isAllowAutoTopicCreation = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName); - - validateTopicOwnershipAsync(topicName, authoritative) - .thenCompose(__ -> { - validateTopicOperation(topicName, TopicOperation.SUBSCRIBE, subscriptionName); - return pulsar().getBrokerService().getTopic(topicName.toString(), isAllowAutoTopicCreation); - }).thenApply(optTopic -> { + pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName) + .thenCompose(isAllowAutoTopicCreation -> validateTopicOwnershipAsync(topicName, authoritative) + .thenCompose(__ -> { + validateTopicOperation(topicName, TopicOperation.SUBSCRIBE, subscriptionName); + return pulsar().getBrokerService().getTopic(topicName.toString(), isAllowAutoTopicCreation); + })) + .thenApply(optTopic -> { if (optTopic.isPresent()) { return optTopic.get(); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 78cdce29eac43..6b9740ddf348d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC; import com.google.common.base.MoreObjects; import com.google.common.collect.Lists; @@ -41,6 +42,7 @@ import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.ToLongFunction; +import javax.annotation.Nonnull; import lombok.Getter; import org.apache.bookkeeper.mledger.util.StatsBuckets; import org.apache.commons.collections4.CollectionUtils; @@ -58,6 +60,7 @@ import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; @@ -1114,6 +1117,12 @@ public PublishRateLimiter getBrokerPublishRateLimiter() { return brokerService.getBrokerPublishRateLimiter(); } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and we can use + * #{@link AbstractTopic#updateResourceGroupLimiter(Policies)} to instead of it. + */ + @Deprecated public void updateResourceGroupLimiter(Optional optPolicies) { Policies policies; try { @@ -1127,17 +1136,20 @@ public void updateResourceGroupLimiter(Optional optPolicies) { log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage()); policies = new Policies(); } + updateResourceGroupLimiter(policies); + } + public void updateResourceGroupLimiter(@Nonnull Policies namespacePolicies) { + requireNonNull(namespacePolicies); // attach the resource-group level rate limiters, if set - String rgName = policies.resource_group_name; + String rgName = namespacePolicies.resource_group_name; if (rgName != null) { final ResourceGroup resourceGroup = - brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName); + brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName); if (resourceGroup != null) { this.resourceGroupRateLimitingEnabled = true; this.resourceGroupPublishLimiter = resourceGroup.getResourceGroupPublishLimiter(); - this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(), - () -> this.enableCnxAutoRead()); + this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(), this::enableCnxAutoRead); log.info("Using resource group {} rate limiter for topic {}", rgName, topic); return; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index ed055f8a19585..2952fbc0861eb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import static org.apache.commons.collections4.CollectionUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotBlank; @@ -56,6 +57,7 @@ import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -72,6 +74,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.function.Predicate; +import javax.annotation.Nonnull; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; @@ -945,7 +948,15 @@ public CompletableFuture> getTopicIfExists(final String topic) { } public CompletableFuture getOrCreateTopic(final String topic) { - return getTopic(topic, isAllowAutoTopicCreation(topic)).thenApply(Optional::get); + final TopicName topicName; + try { + topicName = TopicName.get(topic); + } catch (Throwable ex) { + return FutureUtil.failedFuture(ex); + } + return isAllowAutoTopicCreationAsync(topicName) + .thenCompose(isAllowAutoTopicCreation -> getTopic(topic, isAllowAutoTopicCreation) + .thenApply(Optional::get)); } public CompletableFuture> getTopic(final String topic, boolean createIfMissing) { @@ -2882,7 +2893,8 @@ public CompletableFuture fetchPartitionedTopicMetadata if (metadata.partitions == 0 && !topicExists && !topicName.isPartitioned() - && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName, policies) + && pulsar.getBrokerService() + .isAllowAutoTopicCreation(topicName, policies) && pulsar.getBrokerService() .isDefaultTopicTypePartitioned(topicName, policies)) { @@ -3099,11 +3111,24 @@ public Optional getListenPortTls() { } } + + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} + * You can use #{@link BrokerService#isAllowAutoTopicCreationAsync(TopicName)} + */ + @Deprecated public boolean isAllowAutoTopicCreation(final String topic) { TopicName topicName = TopicName.get(topic); return isAllowAutoTopicCreation(topicName); } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} + * You can use #{@link BrokerService#isAllowAutoTopicCreationAsync(TopicName)} + */ + @Deprecated public boolean isAllowAutoTopicCreation(final TopicName topicName) { Optional policies = pulsar.getPulsarResources().getNamespaceResources() @@ -3111,6 +3136,11 @@ public boolean isAllowAutoTopicCreation(final TopicName topicName) { return isAllowAutoTopicCreation(topicName, policies); } + public CompletableFuture isAllowAutoTopicCreationAsync(final TopicName topicName) { + return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject()) + .thenApply(policies -> isAllowAutoTopicCreation(topicName, policies)); + } + public boolean isAllowAutoTopicCreation(final TopicName topicName, final Optional policies) { if (policies.isPresent() && policies.get().deleted) { log.info("Preventing AutoTopicCreation on a namespace that is being deleted {}", @@ -3157,11 +3187,23 @@ private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName t return null; } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking + * call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it. + */ + @Deprecated public boolean isAllowAutoSubscriptionCreation(final String topic) { TopicName topicName = TopicName.get(topic); return isAllowAutoSubscriptionCreation(topicName); } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking + * call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it. + */ + @Deprecated public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) { AutoSubscriptionCreationOverride autoSubscriptionCreationOverride = getAutoSubscriptionCreationOverride(topicName); @@ -3172,6 +3214,12 @@ public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) { } } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking + * call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it. + */ + @Deprecated private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) { Optional policies = pulsar.getPulsarResources().getNamespaceResources().getPoliciesIfCached(topicName.getNamespaceObject()); @@ -3183,6 +3231,19 @@ private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(fin return null; } + public @Nonnull CompletionStage isAllowAutoSubscriptionCreationAsync(@Nonnull TopicName tpName) { + requireNonNull(tpName); + // namespace level policies + return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(tpName.getNamespaceObject()) + .thenApply(policies -> { + if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) { + return policies.get().autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation(); + } + // broker level policies + return pulsar.getConfiguration().isAllowAutoSubscriptionCreation(); + }); + } + public boolean isSystemTopic(String topic) { return isSystemTopic(TopicName.get(topic)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index df595adcf4167..ab24fb81fa5b7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -53,6 +53,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -1089,48 +1090,53 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { return null; } - boolean createTopicIfDoesNotExist = forceTopicCreation - && service.isAllowAutoTopicCreation(topicName.toString()); - - service.getTopic(topicName.toString(), createTopicIfDoesNotExist) + service.isAllowAutoTopicCreationAsync(topicName) + .thenCompose(isAllowAutoTopicCreation -> { + final boolean createTopicIfDoesNotExist = forceTopicCreation && isAllowAutoTopicCreation; + return service.getTopic(topicName.toString(), createTopicIfDoesNotExist); + }) .thenCompose(optTopic -> { if (!optTopic.isPresent()) { return FutureUtil .failedFuture(new TopicNotFoundException( "Topic " + topicName + " does not exist")); } - - Topic topic = optTopic.get(); - - boolean rejectSubscriptionIfDoesNotExist = isDurable - && !service.isAllowAutoSubscriptionCreation(topicName.toString()) - && !topic.getSubscriptions().containsKey(subscriptionName) - && topic.isPersistent(); - - if (rejectSubscriptionIfDoesNotExist) { - return FutureUtil - .failedFuture( - new SubscriptionNotFoundException( - "Subscription does not exist")); - } - - SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this) - .subscriptionName(subscriptionName) - .consumerId(consumerId).subType(subType).priorityLevel(priorityLevel) - .consumerName(consumerName).isDurable(isDurable) - .startMessageId(startMessageId).metadata(metadata).readCompacted(readCompacted) - .initialPosition(initialPosition) - .startMessageRollbackDurationSec(startMessageRollbackDurationSec) - .replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta) - .subscriptionProperties(subscriptionProperties) - .consumerEpoch(consumerEpoch) - .build(); - if (schema != null) { - return topic.addSchemaIfIdleOrCheckCompatible(schema) - .thenCompose(v -> topic.subscribe(option)); - } else { - return topic.subscribe(option); - } + final Topic topic = optTopic.get(); + return service.isAllowAutoSubscriptionCreationAsync(topicName) + .thenCompose(isAllowAutoSubscriptionCreation -> { + boolean rejectSubscriptionIfDoesNotExist = isDurable + && !isAllowAutoSubscriptionCreation + && !topic.getSubscriptions().containsKey(subscriptionName) + && topic.isPersistent(); + + if (rejectSubscriptionIfDoesNotExist) { + return FutureUtil + .failedFuture( + new SubscriptionNotFoundException( + "Subscription does not exist")); + } + + SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this) + .subscriptionName(subscriptionName) + .consumerId(consumerId).subType(subType) + .priorityLevel(priorityLevel) + .consumerName(consumerName).isDurable(isDurable) + .startMessageId(startMessageId).metadata(metadata) + .readCompacted(readCompacted) + .initialPosition(initialPosition) + .startMessageRollbackDurationSec(startMessageRollbackDurationSec) + .replicatedSubscriptionStateArg(isReplicated) + .keySharedMeta(keySharedMeta) + .subscriptionProperties(subscriptionProperties) + .consumerEpoch(consumerEpoch) + .build(); + if (schema != null) { + return topic.addSchemaIfIdleOrCheckCompatible(schema) + .thenCompose(v -> topic.subscribe(option)); + } else { + return topic.subscribe(option); + } + }); }) .thenAccept(consumer -> { if (consumerFuture.complete(consumer)) { @@ -1346,33 +1352,39 @@ protected void handleProducer(final CommandProducer cmdProducer) { schemaVersionFuture.thenAccept(schemaVersion -> { topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future -> { - CompletableFuture createInitSubFuture; + CompletionStage createInitSubFuture; if (!Strings.isNullOrEmpty(initialSubscriptionName) && topic.isPersistent() && !topic.getSubscriptions().containsKey(initialSubscriptionName)) { - if (!this.getBrokerService().isAllowAutoSubscriptionCreation(topicName)) { - String msg = - "Could not create the initial subscription due to the auto subscription " - + "creation is not allowed."; - if (producerFuture.completeExceptionally( - new BrokerServiceException.NotAllowedException(msg))) { - log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", - remoteAddress, msg, initialSubscriptionName, topicName); - commandSender.sendErrorResponse(requestId, - ServerError.NotAllowedError, msg); - } - producers.remove(producerId, producerFuture); - return; - } - createInitSubFuture = - topic.createSubscription(initialSubscriptionName, InitialPosition.Earliest, - false, null); + createInitSubFuture = service.isAllowAutoSubscriptionCreationAsync(topicName) + .thenCompose(isAllowAutoSubscriptionCreation -> { + if (!isAllowAutoSubscriptionCreation) { + return FutureUtil.failedFuture( + new BrokerServiceException.NotAllowedException( + "Could not create the initial subscription due to" + + " the auto subscription creation is not allowed.")); + } + return topic.createSubscription(initialSubscriptionName, + InitialPosition.Earliest, false, null); + }); } else { createInitSubFuture = CompletableFuture.completedFuture(null); } createInitSubFuture.whenComplete((sub, ex) -> { if (ex != null) { + final Throwable rc = FutureUtil.unwrapCompletionException(ex); + if (rc instanceof BrokerServiceException.NotAllowedException) { + log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", + remoteAddress, rc.getMessage(), initialSubscriptionName, topicName); + if (producerFuture.completeExceptionally(rc)) { + commandSender.sendErrorResponse(requestId, + ServerError.NotAllowedError, rc.getMessage()); + } + producers.remove(producerId, producerFuture); + return; + } + String msg = "Failed to create the initial subscription: " + ex.getCause().getMessage(); log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 958f198dccdf9..2b028f5fc91cb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -158,17 +158,19 @@ public CompletableFuture initialize() { return brokerService.pulsar().getPulsarResources().getNamespaceResources() .getPoliciesAsync(TopicName.get(topic).getNamespaceObject()) .thenAccept(optPolicies -> { + final Policies policies; if (!optPolicies.isPresent()) { log.warn("[{}] Policies not present and isEncryptionRequired will be set to false", topic); isEncryptionRequired = false; + policies = new Policies(); } else { - Policies policies = optPolicies.get(); + policies = optPolicies.get(); updateTopicPolicyByNamespacePolicy(policies); isEncryptionRequired = policies.encryption_required; isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema; } updatePublishDispatcher(); - updateResourceGroupLimiter(optPolicies); + updateResourceGroupLimiter(policies); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index d71dc4edade0f..6a237df37da03 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -206,6 +206,12 @@ public static CompletableFuture> getPoliciesAsync(BrokerServi return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace); } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking + * call. we can use #{@link DispatchRateLimiter#getPoliciesAsync(BrokerService, String)} to instead of it. + */ + @Deprecated public static Optional getPolicies(BrokerService brokerService, String topicName) { final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject(); return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(namespace); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index f0301d24d6689..82f15342e3291 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service.persistent; import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter.isSubscribeRateEnabled; import static org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopic; @@ -49,6 +50,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.stream.Collectors; +import javax.annotation.Nonnull; import lombok.Getter; import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.mledger.AsyncCallbacks; @@ -315,7 +317,7 @@ public CompletableFuture initialize() { if (!optPolicies.isPresent()) { isEncryptionRequired = false; updatePublishDispatcher(); - updateResourceGroupLimiter(optPolicies); + updateResourceGroupLimiter(new Policies()); initializeDispatchRateLimiterIfNeeded(); updateSubscribeRateLimiter(); return; @@ -331,7 +333,7 @@ public CompletableFuture initialize() { updatePublishDispatcher(); - updateResourceGroupLimiter(optPolicies); + updateResourceGroupLimiter(policies); this.isEncryptionRequired = policies.encryption_required; @@ -2442,7 +2444,8 @@ public void updateDispatchRateLimiter() { } @Override - public CompletableFuture onPoliciesUpdate(Policies data) { + public CompletableFuture onPoliciesUpdate(@Nonnull Policies data) { + requireNonNull(data); if (log.isDebugEnabled()) { log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required); @@ -2464,7 +2467,7 @@ public CompletableFuture onPoliciesUpdate(Policies data) { updatePublishDispatcher(); - this.updateResourceGroupLimiter(Optional.of(data)); + updateResourceGroupLimiter(data); List> producerCheckFutures = new ArrayList<>(producers.size()); producers.values().forEach(producer -> producerCheckFutures.add( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java index 89af6f6be882f..f602659400f00 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java @@ -28,6 +28,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.util.RateLimiter; @@ -159,14 +160,21 @@ public void onSubscribeRateUpdate(SubscribeRate subscribeRate) { } /** - * Gets configured subscribe-rate from namespace policies. Returns null if subscribe-rate is not configured - * - * @return + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking + * call. */ + @Deprecated public SubscribeRate getPoliciesSubscribeRate() { return getPoliciesSubscribeRate(brokerService, topicName); } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking + * call. + */ + @Deprecated public static SubscribeRate getPoliciesSubscribeRate(BrokerService brokerService, final String topicName) { final String cluster = brokerService.pulsar().getConfiguration().getClusterName(); final Optional policies = DispatchRateLimiter.getPolicies(brokerService, topicName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java index bb20559c061be..e9c89bda544c1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java @@ -146,6 +146,9 @@ public void testLookupTopicNotExist() throws Exception { doReturn(uri).when(uriInfo).getRequestUri(); doReturn(true).when(config).isAuthorizationEnabled(); + BrokerService brokerService = pulsar.getBrokerService(); + doReturn(CompletableFuture.completedFuture(false)) + .when(brokerService).isAllowAutoTopicCreationAsync(any()); NamespaceService namespaceService = pulsar.getNamespaceService(); CompletableFuture future = new CompletableFuture<>(); future.complete(false); From a69ee8444f14eb87ef99189360635f8757da2aff Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Mon, 31 Jul 2023 09:45:54 +0800 Subject: [PATCH 2/3] Add missing part --- .../apache/pulsar/broker/lookup/TopicLookupBase.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java index 3c29ef1597a3f..f50b99611f6ba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java @@ -68,12 +68,11 @@ protected CompletableFuture internalLookupTopicAsync(TopicName topic // Currently, it's hard to check the non-persistent-non-partitioned topic, because it only exists // in the broker, it doesn't have metadata. If the topic is non-persistent and non-partitioned, // we'll return the true flag. - CompletableFuture existFuture = pulsar().getBrokerService() - .isAllowAutoTopicCreation(topicName) - || (!topicName.isPersistent() && !topicName.isPartitioned()) - ? CompletableFuture.completedFuture(true) - : pulsar().getNamespaceService().checkTopicExists(topicName); - return existFuture; + return pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName) + .thenCompose(isAllowAutoTopicCreation -> + isAllowAutoTopicCreation || (!topicName.isPersistent() && !topicName.isPartitioned()) + ? CompletableFuture.completedFuture(true) : + pulsar().getNamespaceService().checkTopicExists(topicName)); }) .thenCompose(exist -> { if (!exist) { From bf046d80f70000a415db573b914d05c3378504cc Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Mon, 31 Jul 2023 22:35:44 +0800 Subject: [PATCH 3/3] Fix test --- .../pulsar/broker/lookup/http/HttpTopicLookupv2Test.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java index e9c89bda544c1..2fd6a6ab781f3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java @@ -222,6 +222,10 @@ public void testValidateReplicationSettingsOnNamespace() throws Exception { UriInfo uriInfo = mock(UriInfo.class); uriField.set(destLookup, uriInfo); doReturn(false).when(config).isAuthorizationEnabled(); + + BrokerService brokerService = pulsar.getBrokerService(); + doReturn(CompletableFuture.completedFuture(false)) + .when(brokerService).isAllowAutoTopicCreationAsync(any()); AsyncResponse asyncResponse = mock(AsyncResponse.class); ArgumentCaptor arg = ArgumentCaptor.forClass(RestException.class); // // Test policy not found