From cc8fb53e0b97f32106d9cf7b15525b9f63d71e31 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Sun, 23 Jul 2023 02:15:58 +0800 Subject: [PATCH 01/11] [fix][broker] Fix inconsens namespace policis by `getPoliciesIfCached` --- .../broker/resources/NamespaceResources.java | 1 + .../pulsar/broker/service/AbstractTopic.java | 17 ++- .../pulsar/broker/service/BrokerService.java | 55 ++++++--- .../pulsar/broker/service/ServerCnx.java | 110 ++++++++++-------- .../nonpersistent/NonPersistentTopic.java | 6 +- .../persistent/DispatchRateLimiter.java | 1 + .../service/persistent/PersistentTopic.java | 12 +- .../persistent/SubscribeRateLimiter.java | 3 + .../common/util/CompletableFutures.java | 42 +++++++ 9 files changed, 167 insertions(+), 80 deletions(-) create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/CompletableFutures.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index e5dd13c32eb23..9740cc0c4d916 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -117,6 +117,7 @@ public Optional getPolicies(NamespaceName ns) throws MetadataStoreExce return get(joinPath(BASE_POLICIES_PATH, ns.toString())); } + @Deprecated public Optional getPoliciesIfCached(NamespaceName ns) { return getCache().getIfCached(joinPath(BASE_POLICIES_PATH, ns.toString())); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 90693091c89b8..4b51a7b519a80 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.*; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC; import com.google.common.base.MoreObjects; import java.util.ArrayList; @@ -82,6 +83,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + public abstract class AbstractTopic implements Topic, TopicPolicyListener { protected static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60; @@ -1128,6 +1132,7 @@ public PublishRateLimiter getBrokerPublishRateLimiter() { return brokerService.getBrokerPublishRateLimiter(); } + @Deprecated public void updateResourceGroupLimiter(Optional optPolicies) { Policies policies; try { @@ -1141,17 +1146,20 @@ public void updateResourceGroupLimiter(Optional optPolicies) { log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage()); policies = new Policies(); } + updateResourceGroupLimiter(policies); + } + public void updateResourceGroupLimiter(@Nonnull Policies namespacePolicies) { + requireNonNull(namespacePolicies); // attach the resource-group level rate limiters, if set - String rgName = policies.resource_group_name; + String rgName = namespacePolicies.resource_group_name; if (rgName != null) { final ResourceGroup resourceGroup = - brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName); + brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName); if (resourceGroup != null) { this.resourceGroupRateLimitingEnabled = true; this.resourceGroupPublishLimiter = resourceGroup.getResourceGroupPublishLimiter(); - this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(), - () -> this.enableCnxAutoRead()); + this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(), this::enableCnxAutoRead); log.info("Using resource group {} rate limiter for topic {}", rgName, topic); return; } @@ -1166,6 +1174,7 @@ public void updateResourceGroupLimiter(Optional optPolicies) { } } + public void updateEntryFilters() { if (isSystemTopic()) { entryFilters = Pair.of(null, Collections.emptyList()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 8dc94ca874094..d470b7b8debfa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -19,10 +19,13 @@ package org.apache.pulsar.broker.service; import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.*; import static org.apache.bookkeeper.mledger.ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY; import static org.apache.commons.collections4.CollectionUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName; +import static org.apache.pulsar.common.util.CompletableFutures.*; + import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Queues; import io.netty.bootstrap.ServerBootstrap; @@ -51,16 +54,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; @@ -158,10 +152,7 @@ import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.common.stats.Metrics; -import org.apache.pulsar.common.util.FieldParser; -import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown; -import org.apache.pulsar.common.util.RateLimiter; +import org.apache.pulsar.common.util.*; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; import org.apache.pulsar.common.util.netty.ChannelFutures; @@ -176,6 +167,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + @Getter(AccessLevel.PUBLIC) @Setter(AccessLevel.PROTECTED) public class BrokerService implements Closeable { @@ -1934,7 +1927,7 @@ private void addTopicToStatsMaps(TopicName topicName, Topic topic) { } public void refreshTopicToStatsMaps(NamespaceBundle oldBundle) { - Objects.requireNonNull(oldBundle); + requireNonNull(oldBundle); try { // retrieve all topics under existing old bundle List topics = getAllTopicsFromNamespaceBundle(oldBundle.getNamespaceObject().toString(), @@ -3267,10 +3260,9 @@ public CompletableFuture isAllowAutoTopicCreationAsync(final String top } public CompletableFuture isAllowAutoTopicCreationAsync(final TopicName topicName) { - Optional policies = - pulsar.getPulsarResources().getNamespaceResources() - .getPoliciesIfCached(topicName.getNamespaceObject()); - return isAllowAutoTopicCreationAsync(topicName, policies); + return pulsar.getPulsarResources().getNamespaceResources() + .getPoliciesAsync(topicName.getNamespaceObject()) + .thenCompose(policies -> isAllowAutoTopicCreationAsync(topicName, policies)); } private CompletableFuture isAllowAutoTopicCreationAsync(final TopicName topicName, @@ -3340,11 +3332,13 @@ private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName t return null; } + @Deprecated public boolean isAllowAutoSubscriptionCreation(final String topic) { TopicName topicName = TopicName.get(topic); return isAllowAutoSubscriptionCreation(topicName); } + @Deprecated public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) { AutoSubscriptionCreationOverride autoSubscriptionCreationOverride = getAutoSubscriptionCreationOverride(topicName); @@ -3355,6 +3349,7 @@ public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) { } } + @Deprecated private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) { Optional topicPolicies = getTopicPolicies(topicName); if (topicPolicies.isPresent() && topicPolicies.get().getAutoSubscriptionCreationOverride() != null) { @@ -3371,6 +3366,28 @@ private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(fin return null; } + public @Nonnull CompletionStage isAllowAutoSubscriptionCreationAsync(@Nonnull TopicName tpName) { + return compose(() -> { + requireNonNull(tpName); + // topic level policies + Optional topicPolicies = getTopicPolicies(tpName); + if (topicPolicies.isPresent() && topicPolicies.get().getAutoSubscriptionCreationOverride() != null) { + return CompletableFuture.completedFuture(topicPolicies.get().getAutoSubscriptionCreationOverride() + .isAllowAutoSubscriptionCreation()); + } + // namespace level policies + return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(tpName.getNamespaceObject()) + .thenApply(policies -> { + if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) { + return policies.get().autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation(); + } + // broker level policies + return pulsar.getConfiguration().isAllowAutoSubscriptionCreation(); + }); + }); + + } + public boolean isSystemTopic(String topic) { return isSystemTopic(TopicName.get(topic)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 8dfeb54fd6a32..f3b355d63325f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -55,6 +55,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -1210,39 +1211,43 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { .failedFuture(new TopicNotFoundException( "Topic " + topicName + " does not exist")); } + final Topic topic = optTopic.get(); + return service.isAllowAutoSubscriptionCreationAsync(topicName) + .thenCompose(isAllowedAutoSubscriptionCreation -> { + boolean rejectSubscriptionIfDoesNotExist = isDurable + && !isAllowedAutoSubscriptionCreation + && !topic.getSubscriptions().containsKey(subscriptionName) + && topic.isPersistent(); + + if (rejectSubscriptionIfDoesNotExist) { + return FutureUtil + .failedFuture( + new SubscriptionNotFoundException( + "Subscription does not exist")); + } - Topic topic = optTopic.get(); - - boolean rejectSubscriptionIfDoesNotExist = isDurable - && !service.isAllowAutoSubscriptionCreation(topicName.toString()) - && !topic.getSubscriptions().containsKey(subscriptionName) - && topic.isPersistent(); - - if (rejectSubscriptionIfDoesNotExist) { - return FutureUtil - .failedFuture( - new SubscriptionNotFoundException( - "Subscription does not exist")); - } - - SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this) - .subscriptionName(subscriptionName) - .consumerId(consumerId).subType(subType).priorityLevel(priorityLevel) - .consumerName(consumerName).isDurable(isDurable) - .startMessageId(startMessageId).metadata(metadata).readCompacted(readCompacted) - .initialPosition(initialPosition) - .startMessageRollbackDurationSec(startMessageRollbackDurationSec) - .replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta) - .subscriptionProperties(subscriptionProperties) - .consumerEpoch(consumerEpoch) - .schemaType(schema == null ? null : schema.getType()) - .build(); - if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) { - return topic.addSchemaIfIdleOrCheckCompatible(schema) - .thenCompose(v -> topic.subscribe(option)); - } else { - return topic.subscribe(option); - } + SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this) + .subscriptionName(subscriptionName) + .consumerId(consumerId).subType(subType) + .priorityLevel(priorityLevel) + .consumerName(consumerName).isDurable(isDurable) + .startMessageId(startMessageId).metadata(metadata) + .readCompacted(readCompacted) + .initialPosition(initialPosition) + .startMessageRollbackDurationSec(startMessageRollbackDurationSec) + .replicatedSubscriptionStateArg(isReplicated) + .keySharedMeta(keySharedMeta) + .subscriptionProperties(subscriptionProperties) + .consumerEpoch(consumerEpoch) + .schemaType(schema == null ? null : schema.getType()) + .build(); + if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) { + return topic.addSchemaIfIdleOrCheckCompatible(schema) + .thenCompose(v -> topic.subscribe(option)); + } else { + return topic.subscribe(option); + } + }); }) .thenAccept(consumer -> { if (consumerFuture.complete(consumer)) { @@ -1461,33 +1466,38 @@ protected void handleProducer(final CommandProducer cmdProducer) { schemaVersionFuture.thenAccept(schemaVersion -> { topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future -> { - CompletableFuture createInitSubFuture; + CompletionStage createInitSubFuture; if (!Strings.isNullOrEmpty(initialSubscriptionName) && topic.isPersistent() && !topic.getSubscriptions().containsKey(initialSubscriptionName)) { - if (!this.getBrokerService().isAllowAutoSubscriptionCreation(topicName)) { - String msg = - "Could not create the initial subscription due to the auto subscription " - + "creation is not allowed."; - if (producerFuture.completeExceptionally( - new BrokerServiceException.NotAllowedException(msg))) { - log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", - remoteAddress, msg, initialSubscriptionName, topicName); - commandSender.sendErrorResponse(requestId, - ServerError.NotAllowedError, msg); - } - producers.remove(producerId, producerFuture); - return; - } - createInitSubFuture = - topic.createSubscription(initialSubscriptionName, InitialPosition.Earliest, - false, null); + createInitSubFuture = service.isAllowAutoSubscriptionCreationAsync(topicName) + .thenCompose(isAllowAutoSubscriptionCreation -> { + if (!isAllowAutoSubscriptionCreation) { + return CompletableFuture.failedFuture( + new BrokerServiceException.NotAllowedException( + "Could not create the initial subscription due to" + + " the auto subscription creation is not allowed.")); + } + return topic.createSubscription(initialSubscriptionName, + InitialPosition.Earliest, false, null); + }); } else { createInitSubFuture = CompletableFuture.completedFuture(null); } createInitSubFuture.whenComplete((sub, ex) -> { if (ex != null) { + final Throwable rc = FutureUtil.unwrapCompletionException(ex); + if (rc instanceof BrokerServiceException.NotAllowedException) { + log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", + remoteAddress, rc.getMessage(), initialSubscriptionName, topicName); + if (producerFuture.completeExceptionally(rc)) { + commandSender.sendErrorResponse(requestId, + ServerError.NotAllowedError, rc.getMessage()); + } + producers.remove(producerId, producerFuture); + return; + } String msg = "Failed to create the initial subscription: " + ex.getCause().getMessage(); log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 9fe0a735c90d9..c764283cb4459 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -167,17 +167,19 @@ public CompletableFuture initialize() { return brokerService.pulsar().getPulsarResources().getNamespaceResources() .getPoliciesAsync(TopicName.get(topic).getNamespaceObject()) .thenCompose(optPolicies -> { + final Policies policies; if (!optPolicies.isPresent()) { log.warn("[{}] Policies not present and isEncryptionRequired will be set to false", topic); isEncryptionRequired = false; + policies = new Policies(); } else { - Policies policies = optPolicies.get(); + policies = optPolicies.get(); updateTopicPolicyByNamespacePolicy(policies); isEncryptionRequired = policies.encryption_required; isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema; } updatePublishDispatcher(); - updateResourceGroupLimiter(optPolicies); + updateResourceGroupLimiter(policies); return updateClusterMigrated(); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index 0650d5bc1cf38..80bcdf67d4759 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -204,6 +204,7 @@ public static CompletableFuture> getPoliciesAsync(BrokerServi return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace); } + @Deprecated public static Optional getPolicies(BrokerService brokerService, String topicName) { final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject(); return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(namespace); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index e907caa8c3000..be7a0dd86dd6c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service.persistent; +import static java.util.Objects.*; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter.isSubscribeRateEnabled; import static org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopic; @@ -340,7 +341,7 @@ public CompletableFuture initialize() { if (!optPolicies.isPresent()) { isEncryptionRequired = false; updatePublishDispatcher(); - updateResourceGroupLimiter(optPolicies); + updateResourceGroupLimiter(new Policies()); initializeDispatchRateLimiterIfNeeded(); updateSubscribeRateLimiter(); return; @@ -356,7 +357,7 @@ public CompletableFuture initialize() { updatePublishDispatcher(); - updateResourceGroupLimiter(optPolicies); + updateResourceGroupLimiter(policies); this.isEncryptionRequired = policies.encryption_required; @@ -487,7 +488,7 @@ public CompletableFuture unloadSubscription(@Nonnull String subName) { private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor, boolean replicated, Map subscriptionProperties) { - Objects.requireNonNull(topicCompactionService); + requireNonNull(topicCompactionService); if (isCompactionSubscription(subscriptionName) && topicCompactionService instanceof PulsarTopicCompactionService pulsarTopicCompactionService) { CompactedTopicImpl compactedTopic = pulsarTopicCompactionService.getCompactedTopic(); @@ -2804,7 +2805,8 @@ public void updateDispatchRateLimiter() { } @Override - public CompletableFuture onPoliciesUpdate(Policies data) { + public CompletableFuture onPoliciesUpdate(@Nonnull Policies data) { + requireNonNull(data); if (log.isDebugEnabled()) { log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required); @@ -2826,7 +2828,7 @@ public CompletableFuture onPoliciesUpdate(Policies data) { updatePublishDispatcher(); - this.updateResourceGroupLimiter(Optional.of(data)); + updateResourceGroupLimiter(data); List> producerCheckFutures = new ArrayList<>(producers.size()); producers.values().forEach(producer -> producerCheckFutures.add( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java index a052f1f6abf33..792744b37e87a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java @@ -163,16 +163,19 @@ public void onSubscribeRateUpdate(SubscribeRate subscribeRate) { * * @return */ + @Deprecated public SubscribeRate getPoliciesSubscribeRate() { return getPoliciesSubscribeRate(brokerService, topicName); } + @Deprecated public static SubscribeRate getPoliciesSubscribeRate(BrokerService brokerService, final String topicName) { final String cluster = brokerService.pulsar().getConfiguration().getClusterName(); final Optional policies = DispatchRateLimiter.getPolicies(brokerService, topicName); return getPoliciesSubscribeRate(cluster, policies, topicName); } + @Deprecated public static SubscribeRate getPoliciesSubscribeRate(final String cluster, final Optional policies, String topicName) { // return policy-subscribe rate only if it's enabled in policies diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/CompletableFutures.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/CompletableFutures.java new file mode 100644 index 0000000000000..a5e92997ee5fe --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/CompletableFutures.java @@ -0,0 +1,42 @@ +package org.apache.pulsar.common.util; + +import lombok.experimental.UtilityClass; + +import javax.annotation.Nonnull; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.function.Supplier; + +@UtilityClass +public final class CompletableFutures { + public static @Nonnull CompletionStage apply(@Nonnull Supplier fn) { + try { + final CompletableFuture future = new CompletableFuture<>(); + if (fn == null) { + future.completeExceptionally(new NullPointerException("Parameter can not be null.")); + return future; + } + future.complete(fn.get()); + return future; + } catch (Throwable ex) { + final CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(ex); + return future; + } + } + + public static @Nonnull CompletionStage compose(@Nonnull Supplier> fn) { + try { + if (fn == null) { + final CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new NullPointerException("Parameter can not be null.")); + return future; + } + return fn.get(); + } catch (Throwable ex) { + final CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(ex); + return future; + } + } +} From eb1df1feadc783c516e3dd67512911eb17f788f3 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Sun, 23 Jul 2023 02:27:50 +0800 Subject: [PATCH 02/11] Fix checkstyle --- .../pulsar/broker/service/AbstractTopic.java | 3 +-- .../pulsar/broker/service/BrokerService.java | 21 ++++++++++++------- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 4b51a7b519a80..0dd12bd56a4d1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -19,7 +19,7 @@ package org.apache.pulsar.broker.service; import static com.google.common.base.Preconditions.checkArgument; -import static java.util.Objects.*; +import static java.util.Objects.requireNonNull; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC; import com.google.common.base.MoreObjects; import java.util.ArrayList; @@ -82,7 +82,6 @@ import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import javax.annotation.Nonnull; import javax.annotation.Nullable; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index d470b7b8debfa..940377e980e29 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -19,13 +19,12 @@ package org.apache.pulsar.broker.service; import static com.google.common.base.Preconditions.checkArgument; -import static java.util.Objects.*; +import static java.util.Objects.requireNonNull; import static org.apache.bookkeeper.mledger.ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY; import static org.apache.commons.collections4.CollectionUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName; -import static org.apache.pulsar.common.util.CompletableFutures.*; - +import static org.apache.pulsar.common.util.CompletableFutures.compose; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Queues; import io.netty.bootstrap.ServerBootstrap; @@ -51,10 +50,19 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.*; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; @@ -62,6 +70,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.function.Predicate; +import javax.annotation.Nonnull; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; @@ -167,8 +176,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; - @Getter(AccessLevel.PUBLIC) @Setter(AccessLevel.PROTECTED) public class BrokerService implements Closeable { From bcd64f2d21a5255ac13572e64de4d32ff3a897f5 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Sun, 23 Jul 2023 02:28:45 +0800 Subject: [PATCH 03/11] Fix checkstyle --- .../pulsar/broker/service/persistent/PersistentTopic.java | 2 +- .../org/apache/pulsar/common/util/CompletableFutures.java | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index be7a0dd86dd6c..039f576b04420 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.broker.service.persistent; -import static java.util.Objects.*; +import static java.util.Objects.requireNonNull; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter.isSubscribeRateEnabled; import static org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopic; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/CompletableFutures.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/CompletableFutures.java index a5e92997ee5fe..bb092ede500de 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/CompletableFutures.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/CompletableFutures.java @@ -1,11 +1,10 @@ package org.apache.pulsar.common.util; -import lombok.experimental.UtilityClass; - -import javax.annotation.Nonnull; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Supplier; +import javax.annotation.Nonnull; +import lombok.experimental.UtilityClass; @UtilityClass public final class CompletableFutures { From 8c41949eaabf5e7e6c6dbbbe113461b6fa823119 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Tue, 25 Jul 2023 10:19:46 +0800 Subject: [PATCH 04/11] Fix license --- .../pulsar/common/util/CompletableFutures.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/CompletableFutures.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/CompletableFutures.java index bb092ede500de..095ad419503bc 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/CompletableFutures.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/CompletableFutures.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.pulsar.common.util; import java.util.concurrent.CompletableFuture; From f894375f409211632ee1a5235aaa841af4f3c8c1 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Tue, 25 Jul 2023 10:33:45 +0800 Subject: [PATCH 05/11] Add comments --- .../broker/resources/NamespaceResources.java | 7 +++++++ .../pulsar/broker/service/AbstractTopic.java | 6 ++++++ .../pulsar/broker/service/BrokerService.java | 15 +++++++++++++++ .../service/persistent/DispatchRateLimiter.java | 5 +++++ 4 files changed, 33 insertions(+) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 9740cc0c4d916..1c0b31f0b6bf7 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -117,6 +117,13 @@ public Optional getPolicies(NamespaceName ns) throws MetadataStoreExce return get(joinPath(BASE_POLICIES_PATH, ns.toString())); } + /** + * Get the namespace policy from the metadata cache, if the cache doesn't have this value, + * don't load the new value. + * + * @deprecated Since this method may introduce inconsistent namespace policies. we should use + * #{@link NamespaceResources#getPoliciesAsync} + */ @Deprecated public Optional getPoliciesIfCached(NamespaceName ns) { return getCache().getIfCached(joinPath(BASE_POLICIES_PATH, ns.toString())); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 0dd12bd56a4d1..3eeb3d2320578 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -62,6 +62,7 @@ import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; @@ -1131,6 +1132,11 @@ public PublishRateLimiter getBrokerPublishRateLimiter() { return brokerService.getBrokerPublishRateLimiter(); } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and we can use + * #{@link AbstractTopic#updateResourceGroupLimiter(Policies)} to instead of it. + */ @Deprecated public void updateResourceGroupLimiter(Optional optPolicies) { Policies policies; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 940377e980e29..fe20184977a70 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -3339,12 +3339,22 @@ private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName t return null; } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking + * call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it. + */ @Deprecated public boolean isAllowAutoSubscriptionCreation(final String topic) { TopicName topicName = TopicName.get(topic); return isAllowAutoSubscriptionCreation(topicName); } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking + * call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it. + */ @Deprecated public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) { AutoSubscriptionCreationOverride autoSubscriptionCreationOverride = @@ -3356,6 +3366,11 @@ public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) { } } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking + * call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it. + */ @Deprecated private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) { Optional topicPolicies = getTopicPolicies(topicName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index 80bcdf67d4759..233972452e97f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -204,6 +204,11 @@ public static CompletableFuture> getPoliciesAsync(BrokerServi return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace); } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking + * call. we can use #{@link DispatchRateLimiter#getPoliciesAsync(BrokerService, String)} to instead of it. + */ @Deprecated public static Optional getPolicies(BrokerService brokerService, String topicName) { final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject(); From 1a86ee66a86d5487cdd345248834d817b8e1f467 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Tue, 25 Jul 2023 10:35:23 +0800 Subject: [PATCH 06/11] Fix comments --- .../org/apache/pulsar/broker/resources/NamespaceResources.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 1c0b31f0b6bf7..f70d3f31d6e2f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -118,8 +118,7 @@ public Optional getPolicies(NamespaceName ns) throws MetadataStoreExce } /** - * Get the namespace policy from the metadata cache, if the cache doesn't have this value, - * don't load the new value. + * Get the namespace policy from the metadata cache. This method will not trigger the load of metadata cache. * * @deprecated Since this method may introduce inconsistent namespace policies. we should use * #{@link NamespaceResources#getPoliciesAsync} From 12e11cc3d6c4bbd3ead8606857ea08718dbab262 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Tue, 25 Jul 2023 10:35:55 +0800 Subject: [PATCH 07/11] Fix checkstyle --- .../java/org/apache/pulsar/broker/service/AbstractTopic.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 3eeb3d2320578..0b1241f35ac1a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -41,6 +41,7 @@ import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.ToLongFunction; +import javax.annotation.Nonnull; import lombok.Getter; import org.apache.bookkeeper.mledger.util.StatsBuckets; import org.apache.commons.collections4.CollectionUtils; @@ -83,8 +84,6 @@ import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; public abstract class AbstractTopic implements Topic, TopicPolicyListener { From 6bf510a2091ed6bda2a57ee69dbae42aa3ef8954 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Tue, 25 Jul 2023 10:36:19 +0800 Subject: [PATCH 08/11] Fix checkstyle --- .../java/org/apache/pulsar/broker/service/BrokerService.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index fe20184977a70..da13736ab68b9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -161,7 +161,10 @@ import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.common.stats.Metrics; -import org.apache.pulsar.common.util.*; +import org.apache.pulsar.common.util.FieldParser; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown; +import org.apache.pulsar.common.util.RateLimiter; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; import org.apache.pulsar.common.util.netty.ChannelFutures; From 907df11d00bbc66308067f3be194f4a33b2a5cda Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Tue, 25 Jul 2023 10:38:13 +0800 Subject: [PATCH 09/11] Apply comments --- .../service/persistent/SubscribeRateLimiter.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java index 792744b37e87a..fa980461afd7b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java @@ -28,6 +28,8 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.util.RateLimiter; @@ -159,15 +161,20 @@ public void onSubscribeRateUpdate(SubscribeRate subscribeRate) { } /** - * Gets configured subscribe-rate from namespace policies. Returns null if subscribe-rate is not configured - * - * @return + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking + * call. */ @Deprecated public SubscribeRate getPoliciesSubscribeRate() { return getPoliciesSubscribeRate(brokerService, topicName); } + /** + * @deprecated Avoid using the deprecated method + * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking + * call. + */ @Deprecated public static SubscribeRate getPoliciesSubscribeRate(BrokerService brokerService, final String topicName) { final String cluster = brokerService.pulsar().getConfiguration().getClusterName(); @@ -175,7 +182,6 @@ public static SubscribeRate getPoliciesSubscribeRate(BrokerService brokerService return getPoliciesSubscribeRate(cluster, policies, topicName); } - @Deprecated public static SubscribeRate getPoliciesSubscribeRate(final String cluster, final Optional policies, String topicName) { // return policy-subscribe rate only if it's enabled in policies From e453f8b2e7380d9405b2f515c862e81b61098e26 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Tue, 25 Jul 2023 11:12:56 +0800 Subject: [PATCH 10/11] Fix checkstyle --- .../pulsar/broker/service/persistent/SubscribeRateLimiter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java index fa980461afd7b..58f099a9dab80 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java @@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.util.RateLimiter; From 27f77ae468cd9526bd846c6a376c2bae86444f6d Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Tue, 25 Jul 2023 18:01:16 +0800 Subject: [PATCH 11/11] Apply comment --- .../pulsar/broker/service/AbstractTopic.java | 1 - .../pulsar/broker/service/BrokerService.java | 36 +++++------ .../common/util/CompletableFutures.java | 59 ------------------- 3 files changed, 16 insertions(+), 80 deletions(-) delete mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/CompletableFutures.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 0b1241f35ac1a..ec1593b076b44 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -1178,7 +1178,6 @@ public void updateResourceGroupLimiter(@Nonnull Policies namespacePolicies) { } } - public void updateEntryFilters() { if (isSystemTopic()) { entryFilters = Pair.of(null, Collections.emptyList()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index da13736ab68b9..6c48e4d9ae89f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -24,7 +24,6 @@ import static org.apache.commons.collections4.CollectionUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName; -import static org.apache.pulsar.common.util.CompletableFutures.compose; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Queues; import io.netty.bootstrap.ServerBootstrap; @@ -3392,25 +3391,22 @@ private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(fin } public @Nonnull CompletionStage isAllowAutoSubscriptionCreationAsync(@Nonnull TopicName tpName) { - return compose(() -> { - requireNonNull(tpName); - // topic level policies - Optional topicPolicies = getTopicPolicies(tpName); - if (topicPolicies.isPresent() && topicPolicies.get().getAutoSubscriptionCreationOverride() != null) { - return CompletableFuture.completedFuture(topicPolicies.get().getAutoSubscriptionCreationOverride() - .isAllowAutoSubscriptionCreation()); - } - // namespace level policies - return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(tpName.getNamespaceObject()) - .thenApply(policies -> { - if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) { - return policies.get().autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation(); - } - // broker level policies - return pulsar.getConfiguration().isAllowAutoSubscriptionCreation(); - }); - }); - + requireNonNull(tpName); + // topic level policies + final var topicPolicies = getTopicPolicies(tpName); + if (topicPolicies.isPresent() && topicPolicies.get().getAutoSubscriptionCreationOverride() != null) { + return CompletableFuture.completedFuture(topicPolicies.get().getAutoSubscriptionCreationOverride() + .isAllowAutoSubscriptionCreation()); + } + // namespace level policies + return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(tpName.getNamespaceObject()) + .thenApply(policies -> { + if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) { + return policies.get().autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation(); + } + // broker level policies + return pulsar.getConfiguration().isAllowAutoSubscriptionCreation(); + }); } public boolean isSystemTopic(String topic) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/CompletableFutures.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/CompletableFutures.java deleted file mode 100644 index 095ad419503bc..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/CompletableFutures.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.function.Supplier; -import javax.annotation.Nonnull; -import lombok.experimental.UtilityClass; - -@UtilityClass -public final class CompletableFutures { - public static @Nonnull CompletionStage apply(@Nonnull Supplier fn) { - try { - final CompletableFuture future = new CompletableFuture<>(); - if (fn == null) { - future.completeExceptionally(new NullPointerException("Parameter can not be null.")); - return future; - } - future.complete(fn.get()); - return future; - } catch (Throwable ex) { - final CompletableFuture future = new CompletableFuture<>(); - future.completeExceptionally(ex); - return future; - } - } - - public static @Nonnull CompletionStage compose(@Nonnull Supplier> fn) { - try { - if (fn == null) { - final CompletableFuture future = new CompletableFuture<>(); - future.completeExceptionally(new NullPointerException("Parameter can not be null.")); - return future; - } - return fn.get(); - } catch (Throwable ex) { - final CompletableFuture future = new CompletableFuture<>(); - future.completeExceptionally(ex); - return future; - } - } -}