From 7cc0f13e49c8745cfa4a24749476f6548635ebca Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Tue, 25 Jul 2023 22:04:29 +0800 Subject: [PATCH 1/7] [branch-2.10][fix][broker] Fix inconsensus namespace policies by getPoliciesIfCached --- .../broker/resources/NamespaceResources.java | 7 ++ .../pulsar/broker/service/AbstractTopic.java | 21 +++- .../pulsar/broker/service/BrokerService.java | 46 ++++++-- .../pulsar/broker/service/ServerCnx.java | 111 ++++++++++-------- .../nonpersistent/NonPersistentTopic.java | 6 +- .../persistent/DispatchRateLimiter.java | 6 + .../service/persistent/PersistentTopic.java | 12 +- .../persistent/SubscribeRateLimiter.java | 14 ++- 8 files changed, 149 insertions(+), 74 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 74d4bcf6c472e..90e3971c4cf5b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -122,6 +122,13 @@ public Optional getPolicies(NamespaceName ns) throws MetadataStoreExce return get(joinPath(BASE_POLICIES_PATH, ns.toString())); } + /** + * Get the namespace policy from the metadata cache. This method will not trigger the load of metadata cache. + * + * @deprecated Since this method may introduce inconsistent namespace policies. we should use + * #{@link NamespaceResources#getPoliciesAsync} + */ + @Deprecated public Optional getPoliciesIfCached(NamespaceName ns) { return getCache().getIfCached(joinPath(BASE_POLICIES_PATH, ns.toString())); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 736fcf1f5e1e5..d8002748a5250 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC; import com.google.common.base.MoreObjects; import com.google.common.collect.Lists; @@ -57,6 +58,7 @@ import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; @@ -75,6 +77,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + public abstract class AbstractTopic implements Topic, TopicPolicyListener { protected static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60; @@ -1065,6 +1069,12 @@ public PublishRateLimiter getBrokerPublishRateLimiter() { return brokerService.getBrokerPublishRateLimiter(); } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and we can use + * #{@link AbstractTopic#updateResourceGroupLimiter(Policies)} to instead of it. + */ + @Deprecated public void updateResourceGroupLimiter(Optional optPolicies) { Policies policies; try { @@ -1078,17 +1088,20 @@ public void updateResourceGroupLimiter(Optional optPolicies) { log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage()); policies = new Policies(); } + updateResourceGroupLimiter(policies); + } + public void updateResourceGroupLimiter(@Nonnull Policies namespacePolicies) { + requireNonNull(namespacePolicies); // attach the resource-group level rate limiters, if set - String rgName = policies.resource_group_name; + String rgName = namespacePolicies.resource_group_name; if (rgName != null) { final ResourceGroup resourceGroup = - brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName); + brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName); if (resourceGroup != null) { this.resourceGroupRateLimitingEnabled = true; this.resourceGroupPublishLimiter = resourceGroup.getResourceGroupPublishLimiter(); - this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(), - () -> this.enableCnxAutoRead()); + this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(), this::enableCnxAutoRead); log.info("Using resource group {} rate limiter for topic {}", rgName, topic); return; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index cac70f6896e5f..d40c97effab89 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import static org.apache.commons.collections.CollectionUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotBlank; @@ -54,17 +55,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; @@ -175,6 +166,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + @Getter(AccessLevel.PUBLIC) @Setter(AccessLevel.PROTECTED) public class BrokerService implements Closeable { @@ -3005,11 +2998,23 @@ private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName t return null; } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking + * call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it. + */ + @Deprecated public boolean isAllowAutoSubscriptionCreation(final String topic) { TopicName topicName = TopicName.get(topic); return isAllowAutoSubscriptionCreation(topicName); } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking + * call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it. + */ + @Deprecated public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) { AutoSubscriptionCreationOverride autoSubscriptionCreationOverride = getAutoSubscriptionCreationOverride(topicName); @@ -3020,6 +3025,12 @@ public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) { } } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking + * call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it. + */ + @Deprecated private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) { Optional policies = pulsar.getPulsarResources().getNamespaceResources().getPoliciesIfCached(topicName.getNamespaceObject()); @@ -3031,6 +3042,19 @@ private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(fin return null; } + public @Nonnull CompletionStage isAllowAutoSubscriptionCreationAsync(@Nonnull TopicName tpName) { + requireNonNull(tpName); + // namespace level policies + return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(tpName.getNamespaceObject()) + .thenApply(policies -> { + if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) { + return policies.get().autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation(); + } + // broker level policies + return pulsar.getConfiguration().isAllowAutoSubscriptionCreation(); + }); + } + public boolean isSystemTopic(String topic) { return isSystemTopic(TopicName.get(topic)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index c9d8a2ff700b4..cdba1be768069 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -53,6 +53,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -1044,38 +1045,42 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { .failedFuture(new TopicNotFoundException( "Topic " + topicName + " does not exist")); } - - Topic topic = optTopic.get(); - - boolean rejectSubscriptionIfDoesNotExist = isDurable - && !service.isAllowAutoSubscriptionCreation(topicName.toString()) - && !topic.getSubscriptions().containsKey(subscriptionName) - && topic.isPersistent(); - - if (rejectSubscriptionIfDoesNotExist) { - return FutureUtil - .failedFuture( - new SubscriptionNotFoundException( - "Subscription does not exist")); - } - - SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this) - .subscriptionName(subscriptionName) - .consumerId(consumerId).subType(subType).priorityLevel(priorityLevel) - .consumerName(consumerName).isDurable(isDurable) - .startMessageId(startMessageId).metadata(metadata).readCompacted(readCompacted) - .initialPosition(initialPosition) - .startMessageRollbackDurationSec(startMessageRollbackDurationSec) - .replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta) - .subscriptionProperties(subscriptionProperties) - .consumerEpoch(consumerEpoch) - .build(); - if (schema != null) { - return topic.addSchemaIfIdleOrCheckCompatible(schema) - .thenCompose(v -> topic.subscribe(option)); - } else { - return topic.subscribe(option); - } + final Topic topic = optTopic.get(); + return service.isAllowAutoSubscriptionCreationAsync(topicName) + .thenCompose(isAllowAutoSubscriptionCreation -> { + boolean rejectSubscriptionIfDoesNotExist = isDurable + && !isAllowAutoSubscriptionCreation + && !topic.getSubscriptions().containsKey(subscriptionName) + && topic.isPersistent(); + + if (rejectSubscriptionIfDoesNotExist) { + return FutureUtil + .failedFuture( + new SubscriptionNotFoundException( + "Subscription does not exist")); + } + + SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this) + .subscriptionName(subscriptionName) + .consumerId(consumerId).subType(subType) + .priorityLevel(priorityLevel) + .consumerName(consumerName).isDurable(isDurable) + .startMessageId(startMessageId).metadata(metadata) + .readCompacted(readCompacted) + .initialPosition(initialPosition) + .startMessageRollbackDurationSec(startMessageRollbackDurationSec) + .replicatedSubscriptionStateArg(isReplicated) + .keySharedMeta(keySharedMeta) + .subscriptionProperties(subscriptionProperties) + .consumerEpoch(consumerEpoch) + .build(); + if (schema != null) { + return topic.addSchemaIfIdleOrCheckCompatible(schema) + .thenCompose(v -> topic.subscribe(option)); + } else { + return topic.subscribe(option); + } + }); }) .thenAccept(consumer -> { if (consumerFuture.complete(consumer)) { @@ -1286,33 +1291,39 @@ protected void handleProducer(final CommandProducer cmdProducer) { schemaVersionFuture.thenAccept(schemaVersion -> { topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future -> { - CompletableFuture createInitSubFuture; + CompletionStage createInitSubFuture; if (!Strings.isNullOrEmpty(initialSubscriptionName) && topic.isPersistent() && !topic.getSubscriptions().containsKey(initialSubscriptionName)) { - if (!this.getBrokerService().isAllowAutoSubscriptionCreation(topicName)) { - String msg = - "Could not create the initial subscription due to the auto subscription " - + "creation is not allowed."; - if (producerFuture.completeExceptionally( - new BrokerServiceException.NotAllowedException(msg))) { - log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", - remoteAddress, msg, initialSubscriptionName, topicName); - commandSender.sendErrorResponse(requestId, - ServerError.NotAllowedError, msg); - } - producers.remove(producerId, producerFuture); - return; - } - createInitSubFuture = - topic.createSubscription(initialSubscriptionName, InitialPosition.Earliest, - false, null); + createInitSubFuture = service.isAllowAutoSubscriptionCreationAsync(topicName) + .thenCompose(isAllowAutoSubscriptionCreation -> { + if (!isAllowAutoSubscriptionCreation) { + return FutureUtil.failedFuture( + new BrokerServiceException.NotAllowedException( + "Could not create the initial subscription due to" + + " the auto subscription creation is not allowed.")); + } + return topic.createSubscription(initialSubscriptionName, + InitialPosition.Earliest, false, null); + }); } else { createInitSubFuture = CompletableFuture.completedFuture(null); } createInitSubFuture.whenComplete((sub, ex) -> { if (ex != null) { + final Throwable rc = FutureUtil.unwrapCompletionException(ex); + if (rc instanceof BrokerServiceException.NotAllowedException) { + log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", + remoteAddress, rc.getMessage(), initialSubscriptionName, topicName); + if (producerFuture.completeExceptionally(rc)) { + commandSender.sendErrorResponse(requestId, + ServerError.NotAllowedError, rc.getMessage()); + } + producers.remove(producerId, producerFuture); + return; + } + String msg = "Failed to create the initial subscription: " + ex.getCause().getMessage(); log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 54f738101be27..a021e7ce94def 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -157,18 +157,20 @@ public CompletableFuture initialize() { return brokerService.pulsar().getPulsarResources().getNamespaceResources() .getPoliciesAsync(TopicName.get(topic).getNamespaceObject()) .thenAccept(optPolicies -> { + final Policies policies; if (!optPolicies.isPresent()) { log.warn("[{}] Policies not present and isEncryptionRequired will be set to false", topic); isEncryptionRequired = false; + policies = new Policies(); } else { - Policies policies = optPolicies.get(); + policies = optPolicies.get(); updateTopicPolicyByNamespacePolicy(policies); isEncryptionRequired = policies.encryption_required; isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema; schemaValidationEnforced = policies.schema_validation_enforced; } updatePublishDispatcher(); - updateResourceGroupLimiter(optPolicies); + updateResourceGroupLimiter(policies); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index cd7f5e9ea64c4..568ae0915cb3d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -364,6 +364,12 @@ public static CompletableFuture> getPoliciesAsync(BrokerServi return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace); } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking + * call. we can use #{@link DispatchRateLimiter#getPoliciesAsync(BrokerService, String)} to instead of it. + */ + @Deprecated public static Optional getPolicies(BrokerService brokerService, String topicName) { final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject(); return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(namespace); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 5054b039af79b..6e5398e4b5b6c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service.persistent; import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter.isSubscribeRateEnabled; import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames; @@ -164,6 +165,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCallback { // Managed ledger associated with the topic @@ -316,7 +319,7 @@ public CompletableFuture initialize() { if (!optPolicies.isPresent()) { isEncryptionRequired = false; updatePublishDispatcher(); - updateResourceGroupLimiter(optPolicies); + updateResourceGroupLimiter(new Policies()); initializeDispatchRateLimiterIfNeeded(); updateSubscribeRateLimiter(); return; @@ -332,7 +335,7 @@ public CompletableFuture initialize() { updatePublishDispatcher(); - updateResourceGroupLimiter(optPolicies); + updateResourceGroupLimiter(policies); this.isEncryptionRequired = policies.encryption_required; @@ -2419,7 +2422,8 @@ public void updateDispatchRateLimiter() { } @Override - public CompletableFuture onPoliciesUpdate(Policies data) { + public CompletableFuture onPoliciesUpdate(@Nonnull Policies data) { + requireNonNull(data); if (log.isDebugEnabled()) { log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required); @@ -2443,7 +2447,7 @@ public CompletableFuture onPoliciesUpdate(Policies data) { updatePublishDispatcher(); - this.updateResourceGroupLimiter(Optional.of(data)); + updateResourceGroupLimiter(data); List> producerCheckFutures = new ArrayList<>(producers.size()); producers.values().forEach(producer -> producerCheckFutures.add( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java index 89af6f6be882f..f602659400f00 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java @@ -28,6 +28,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.util.RateLimiter; @@ -159,14 +160,21 @@ public void onSubscribeRateUpdate(SubscribeRate subscribeRate) { } /** - * Gets configured subscribe-rate from namespace policies. Returns null if subscribe-rate is not configured - * - * @return + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking + * call. */ + @Deprecated public SubscribeRate getPoliciesSubscribeRate() { return getPoliciesSubscribeRate(brokerService, topicName); } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking + * call. + */ + @Deprecated public static SubscribeRate getPoliciesSubscribeRate(BrokerService brokerService, final String topicName) { final String cluster = brokerService.pulsar().getConfiguration().getClusterName(); final Optional policies = DispatchRateLimiter.getPolicies(brokerService, topicName); From 83ea617e72372ffed05d2bb1bc5a9f1471e004d0 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Tue, 25 Jul 2023 22:19:25 +0800 Subject: [PATCH 2/7] Fix checkstyle --- .../pulsar/broker/service/AbstractTopic.java | 3 +-- .../pulsar/broker/service/BrokerService.java | 16 +++++++++++++--- .../service/persistent/PersistentTopic.java | 3 +-- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index d8002748a5250..19abb22b01706 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -41,6 +41,7 @@ import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.ToLongFunction; +import javax.annotation.Nonnull; import lombok.Getter; import org.apache.bookkeeper.mledger.util.StatsBuckets; import org.apache.commons.collections4.CollectionUtils; @@ -77,8 +78,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; - public abstract class AbstractTopic implements Topic, TopicPolicyListener { protected static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index d40c97effab89..148541caedcc2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -55,7 +55,18 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.*; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; @@ -63,6 +74,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.function.Predicate; +import javax.annotation.Nonnull; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; @@ -166,8 +178,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; - @Getter(AccessLevel.PUBLIC) @Setter(AccessLevel.PROTECTED) public class BrokerService implements Closeable { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 6e5398e4b5b6c..ec174a41d63d7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.stream.Collectors; +import javax.annotation.Nonnull; import lombok.Getter; import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.mledger.AsyncCallbacks; @@ -165,8 +166,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; - public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCallback { // Managed ledger associated with the topic From 32a9d5c55abbd178b868598d69a1a9457203de3d Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Tue, 25 Jul 2023 23:13:57 +0800 Subject: [PATCH 3/7] Fix missing place --- .../admin/impl/PersistentTopicsBase.java | 21 +++++++++-------- .../pulsar/broker/lookup/TopicLookupBase.java | 8 ++++--- .../pulsar/broker/service/BrokerService.java | 23 +++++++++++++++---- .../pulsar/broker/service/ServerCnx.java | 9 ++++---- 4 files changed, 40 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index da37d1b7e545c..f4e5caacded63 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2208,9 +2208,10 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su internalCreateSubscriptionForNonPartitionedTopic(asyncResponse, subscriptionName, targetMessageId, authoritative, replicated, properties); } else { - boolean allowAutoTopicCreation = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName); - getPartitionedTopicMetadataAsync(topicName, - authoritative, allowAutoTopicCreation).thenAccept(partitionMetadata -> { + pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName) + .thenCompose(allowAutoTopicCreation -> + getPartitionedTopicMetadataAsync(topicName, authoritative, allowAutoTopicCreation)) + .thenAccept(partitionMetadata -> { final int numPartitions = partitionMetadata.partitions; if (numPartitions > 0) { final CompletableFuture future = new CompletableFuture<>(); @@ -2306,13 +2307,13 @@ private void internalCreateSubscriptionForNonPartitionedTopic( MessageIdImpl targetMessageId, boolean authoritative, boolean replicated, Map properties) { - boolean isAllowAutoTopicCreation = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName); - - validateTopicOwnershipAsync(topicName, authoritative) - .thenCompose(__ -> { - validateTopicOperation(topicName, TopicOperation.SUBSCRIBE, subscriptionName); - return pulsar().getBrokerService().getTopic(topicName.toString(), isAllowAutoTopicCreation); - }).thenApply(optTopic -> { + pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName) + .thenCompose(isAllowAutoTopicCreation -> validateTopicOwnershipAsync(topicName, authoritative) + .thenCompose(__ -> { + validateTopicOperation(topicName, TopicOperation.SUBSCRIBE, subscriptionName); + return pulsar().getBrokerService().getTopic(topicName.toString(), isAllowAutoTopicCreation); + })) + .thenApply(optTopic -> { if (optTopic.isPresent()) { return optTopic.get(); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java index ae1d2a5bab02b..7baebc7d1b6d4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java @@ -83,9 +83,11 @@ protected void internalLookupTopicAsync(TopicName topicName, boolean authoritati // Currently, it's hard to check the non-persistent-non-partitioned topic, because it only exists in the broker, // it doesn't have metadata. If the topic is non-persistent and non-partitioned, we'll return the true flag. - CompletableFuture existFuture = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName) - || (!topicName.isPersistent() && !topicName.isPartitioned()) - ? CompletableFuture.completedFuture(true) : pulsar().getNamespaceService().checkTopicExists(topicName); + CompletableFuture existFuture = pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName) + .thenCompose(isAllowAutoTopicCreation -> + isAllowAutoTopicCreation || (!topicName.isPersistent() && !topicName.isPartitioned()) + ? CompletableFuture.completedFuture(true) : + pulsar().getNamespaceService().checkTopicExists(topicName)); existFuture.thenAccept(exist -> { if (!exist) { completeLookupResponseExceptionally(asyncResponse, new RestException(Response.Status.NOT_FOUND, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 148541caedcc2..9f282d9b3e374 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -934,7 +934,15 @@ public CompletableFuture> getTopicIfExists(final String topic) { } public CompletableFuture getOrCreateTopic(final String topic) { - return getTopic(topic, isAllowAutoTopicCreation(topic)).thenApply(Optional::get); + final TopicName topicName; + try { + topicName = TopicName.get(topic); + } catch (Throwable ex) { + return FutureUtil.failedFuture(ex); + } + return isAllowAutoTopicCreationAsync(topicName) + .thenCompose(isAllowAutoTopicCreation -> getTopic(topic, isAllowAutoTopicCreation) + .thenApply(Optional::get)); } public CompletableFuture> getTopic(final String topic, boolean createIfMissing) { @@ -2724,7 +2732,7 @@ public CompletableFuture fetchPartitionedTopicMetadata if (metadata.partitions == 0 && !topicExists && !topicName.isPartitioned() - && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName, policies) + && pulsar.getBrokerService().isAllowAutoTopicCreationAsync(topicName, policies) && pulsar.getBrokerService() .isDefaultTopicTypePartitioned(topicName, policies)) { @@ -2950,19 +2958,26 @@ private void foreachCnx(Consumer consumer) { cnxSet.forEach(consumer); } + @Deprecated public boolean isAllowAutoTopicCreation(final String topic) { TopicName topicName = TopicName.get(topic); return isAllowAutoTopicCreation(topicName); } + @Deprecated public boolean isAllowAutoTopicCreation(final TopicName topicName) { Optional policies = pulsar.getPulsarResources().getNamespaceResources() .getPoliciesIfCached(topicName.getNamespaceObject()); - return isAllowAutoTopicCreation(topicName, policies); + return isAllowAutoTopicCreationAsync(topicName, policies); + } + + public CompletableFuture isAllowAutoTopicCreationAsync(final TopicName topicName) { + return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject()) + .thenApply(policies -> isAllowAutoTopicCreationAsync(topicName, policies)); } - public boolean isAllowAutoTopicCreation(final TopicName topicName, final Optional policies) { + public boolean isAllowAutoTopicCreationAsync(final TopicName topicName, final Optional policies) { if (policies.isPresent() && policies.get().deleted) { log.info("Preventing AutoTopicCreation on a namespace that is being deleted {}", topicName.getNamespaceObject()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index cdba1be768069..ef7cfdf14d87d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1035,10 +1035,11 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { return null; } - boolean createTopicIfDoesNotExist = forceTopicCreation - && service.isAllowAutoTopicCreation(topicName.toString()); - - service.getTopic(topicName.toString(), createTopicIfDoesNotExist) + service.isAllowAutoTopicCreationAsync(topicName) + .thenCompose(isAllowAutoTopicCreation -> { + final boolean createTopicIfDoesNotExist = forceTopicCreation && isAllowAutoTopicCreation; + return service.getTopic(topicName.toString(), createTopicIfDoesNotExist); + }) .thenCompose(optTopic -> { if (!optTopic.isPresent()) { return FutureUtil From 332417215fe0e978d89b5f05d8f1aa598f339112 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Tue, 25 Jul 2023 23:18:15 +0800 Subject: [PATCH 4/7] Add comment --- .../apache/pulsar/broker/service/BrokerService.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 9f282d9b3e374..8533f53ca7d96 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2958,12 +2958,22 @@ private void foreachCnx(Consumer consumer) { cnxSet.forEach(consumer); } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} + * You can use #{@link BrokerService#isAllowAutoTopicCreationAsync(TopicName)} + */ @Deprecated public boolean isAllowAutoTopicCreation(final String topic) { TopicName topicName = TopicName.get(topic); return isAllowAutoTopicCreation(topicName); } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} + * You can use #{@link BrokerService#isAllowAutoTopicCreationAsync(TopicName)} + */ @Deprecated public boolean isAllowAutoTopicCreation(final TopicName topicName) { Optional policies = From 263ccc0b1d0536f4eaa85c0ba6b80575dc48ccbb Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Tue, 25 Jul 2023 23:25:30 +0800 Subject: [PATCH 5/7] Fix checkstyle --- .../java/org/apache/pulsar/broker/service/BrokerService.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 8533f53ca7d96..f78b3dc2b5e76 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2732,7 +2732,8 @@ public CompletableFuture fetchPartitionedTopicMetadata if (metadata.partitions == 0 && !topicExists && !topicName.isPartitioned() - && pulsar.getBrokerService().isAllowAutoTopicCreationAsync(topicName, policies) + && pulsar.getBrokerService() + .isAllowAutoTopicCreationAsync(topicName, policies) && pulsar.getBrokerService() .isDefaultTopicTypePartitioned(topicName, policies)) { From e34f8b8375f1d02d7923cf84f545a26d0546fbbf Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Tue, 25 Jul 2023 23:47:26 +0800 Subject: [PATCH 6/7] Fix method name --- .../org/apache/pulsar/broker/service/BrokerService.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index f78b3dc2b5e76..c2d7bdf5ffb55 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2733,7 +2733,7 @@ public CompletableFuture fetchPartitionedTopicMetadata && !topicExists && !topicName.isPartitioned() && pulsar.getBrokerService() - .isAllowAutoTopicCreationAsync(topicName, policies) + .isAllowAutoTopicCreation(topicName, policies) && pulsar.getBrokerService() .isDefaultTopicTypePartitioned(topicName, policies)) { @@ -2980,15 +2980,15 @@ public boolean isAllowAutoTopicCreation(final TopicName topicName) { Optional policies = pulsar.getPulsarResources().getNamespaceResources() .getPoliciesIfCached(topicName.getNamespaceObject()); - return isAllowAutoTopicCreationAsync(topicName, policies); + return isAllowAutoTopicCreation(topicName, policies); } public CompletableFuture isAllowAutoTopicCreationAsync(final TopicName topicName) { return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject()) - .thenApply(policies -> isAllowAutoTopicCreationAsync(topicName, policies)); + .thenApply(policies -> isAllowAutoTopicCreation(topicName, policies)); } - public boolean isAllowAutoTopicCreationAsync(final TopicName topicName, final Optional policies) { + public boolean isAllowAutoTopicCreation(final TopicName topicName, final Optional policies) { if (policies.isPresent() && policies.get().deleted) { log.info("Preventing AutoTopicCreation on a namespace that is being deleted {}", topicName.getNamespaceObject()); From 8140c3a060390fc27684deac82e05db4fb740e70 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Wed, 26 Jul 2023 01:19:11 +0800 Subject: [PATCH 7/7] Fix test --- .../pulsar/broker/lookup/http/HttpTopicLookupv2Test.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java index 5ae6076274826..5906c6f4fe216 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java @@ -142,6 +142,9 @@ public void testLookupTopicNotExist() throws Exception { doReturn(uri).when(uriInfo).getRequestUri(); doReturn(true).when(config).isAuthorizationEnabled(); + BrokerService brokerService = pulsar.getBrokerService(); + doReturn(CompletableFuture.completedFuture(false)) + .when(brokerService).isAllowAutoTopicCreationAsync(any()); NamespaceService namespaceService = pulsar.getNamespaceService(); CompletableFuture future = new CompletableFuture<>(); future.complete(false);