From abd916cf72e68737d4bea90794664719a279cf1c Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 12 Oct 2021 11:34:55 -0700 Subject: [PATCH 1/8] Avoid potentially blocking calls to metadata on critical threads (#12339) --- .../PulsarAuthorizationProvider.java | 4 +- .../pulsar/broker/lookup/TopicLookupBase.java | 31 ++--------- .../pulsar/broker/service/AbstractTopic.java | 52 +++++++------------ .../broker/service/BacklogQuotaManager.java | 12 ++--- .../pulsar/broker/service/BrokerService.java | 12 +++-- .../pulsar/broker/service/ServerCnx.java | 3 +- .../persistent/DispatchRateLimiter.java | 11 ++-- .../service/persistent/PersistentTopic.java | 23 +++++--- 8 files changed, 63 insertions(+), 85 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 934d058a1eb30..12c289f567da6 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -338,13 +338,13 @@ public CompletableFuture revokeSubscriptionPermissionAsync(NamespaceName n } private CompletableFuture updateSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, Set roles, - boolean remove) { + boolean remove) { CompletableFuture result = new CompletableFuture<>(); - try { validatePoliciesReadOnlyAccess(); } catch (Exception e) { result.completeExceptionally(e); + return result; } ZooKeeper globalZk = configCache.getZooKeeper(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java index 19ddce755c1f3..67b3268607e59 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java @@ -146,30 +146,6 @@ protected String internalGetNamespaceBundle(TopicName topicName) { } } - /** - * - * Lookup broker-service address for a given namespace-bundle which contains given topic. - * - * a. Returns broker-address if namespace-bundle is already owned by any broker - * b. If current-broker receives lookup-request and if it's not a leader then current broker redirects request - * to leader by returning leader-service address. - * c. If current-broker is leader then it finds out least-loaded broker to own namespace bundle and redirects request - * by returning least-loaded broker. - * d. If current-broker receives request to own the namespace-bundle then it owns a bundle and returns success(connect) - * response to client. - * - * @param pulsarService - * @param topicName - * @param authoritative - * @param clientAppId - * @param requestId - * @return - */ - public static CompletableFuture lookupTopicAsync(PulsarService pulsarService, TopicName topicName, - boolean authoritative, String clientAppId, AuthenticationDataSource authenticationData, long requestId) { - return lookupTopicAsync(pulsarService, topicName, authoritative, clientAppId, authenticationData, requestId, null); - } - /** * * Lookup broker-service address for a given namespace-bundle which contains given topic. @@ -193,7 +169,8 @@ public static CompletableFuture lookupTopicAsync(PulsarService pulsarSe public static CompletableFuture lookupTopicAsync(PulsarService pulsarService, TopicName topicName, boolean authoritative, String clientAppId, AuthenticationDataSource authenticationData, long requestId, - final String advertisedListenerName) { + final String advertisedListenerName, + boolean isAlreadyAuthorized) { final CompletableFuture validationFuture = new CompletableFuture<>(); final CompletableFuture lookupfuture = new CompletableFuture<>(); @@ -213,7 +190,9 @@ public static CompletableFuture lookupTopicAsync(PulsarService pulsarSe } else { // (2) authorize client try { - checkAuthorization(pulsarService, topicName, clientAppId, authenticationData); + if (!isAlreadyAuthorized) { + checkAuthorization(pulsarService, topicName, clientAppId, authenticationData); + } } catch (RestException authException) { log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName.toString()); validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 23b22212077d6..04cdc747c04fe 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -114,17 +114,11 @@ public AbstractTopic(String topic, BrokerService brokerService) { this.inactiveTopicPolicies.setMaxInactiveDurationSeconds(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds()); this.inactiveTopicPolicies.setInactiveTopicDeleteMode(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMode()); this.lastActive = System.nanoTime(); - Policies policies = null; - try { - policies = brokerService.pulsar().getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())) - .orElseGet(() -> new Policies()); - } catch (Exception e) { - log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage()); - } + Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache() + .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); this.preciseTopicPublishRateLimitingEnable = brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable(); - updatePublishDispatcher(policies); + updatePublishDispatcher(); } protected boolean isProducersExceeded() { @@ -135,12 +129,10 @@ protected boolean isProducersExceeded() { } if (maxProducers == null) { - Policies policies; - try { - policies = brokerService.pulsar().getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())) - .orElseGet(() -> new Policies()); - } catch (Exception e) { + Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache() + .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); + + if (policies == null) { policies = new Policies(); } maxProducers = policies.max_producers_per_topic; @@ -160,21 +152,14 @@ protected boolean isConsumersExceededOnTopic() { maxConsumers = topicPolicies.getMaxConsumerPerTopic(); } if (maxConsumers == null) { - Policies policies; - try { - // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks - policies = brokerService.pulsar().getConfigurationCache().policiesCache() - .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); + // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks + } - if (policies == null) { - policies = new Policies(); - } - } catch (Exception e) { - log.warn("[{}] Failed to get namespace policies that include max number of consumers: {}", topic, - e.getMessage()); - policies = new Policies(); - } - maxConsumers = policies.max_consumers_per_topic; + Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache() + .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); + + if (policies == null) { + policies = new Policies(); } final int maxConsumersPerTopic = maxConsumers > 0 ? maxConsumers : brokerService.pulsar().getConfiguration().getMaxConsumersPerTopic(); @@ -411,7 +396,7 @@ protected void checkTopicFenced() throws BrokerServiceException { } } - protected void internalAddProducer(Producer producer) throws BrokerServiceException { + protected void internalAddProducer(Producer producer) throws BrokerServiceException{ if (isProducersExceeded()) { log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic); throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit"); @@ -484,10 +469,10 @@ public PublishRateLimiter getBrokerPublishRateLimiter() { } public void updateMaxPublishRate(Policies policies) { - updatePublishDispatcher(policies); + updatePublishDispatcher(); } - private void updatePublishDispatcher(Policies policies) { + private void updatePublishDispatcher() { //if topic-level policy exists, try to use topic-level publish rate policy TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic)); if (topicPolicies != null && topicPolicies.isPublishRateSet()) { @@ -496,6 +481,9 @@ private void updatePublishDispatcher(Policies policies) { return; } + Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache() + .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); + //topic-level policy is not set, try to use namespace-level rate policy final String clusterName = brokerService.pulsar().getConfiguration().getClusterName(); final PublishRate publishRate = policies != null && policies.publishMaxMessageRate != null diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java index 020efa144b1f6..5b2e5054f3919 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java @@ -65,13 +65,11 @@ public BacklogQuota getDefaultQuota() { } public BacklogQuota getBacklogQuota(String namespace, String policyPath) { - try { - return zkCache.get(policyPath) - .map(p -> p.backlog_quota_map.getOrDefault(BacklogQuotaType.destination_storage, defaultQuota)) - .orElse(defaultQuota); - } catch (Exception e) { - log.warn("Failed to read policies data, will apply the default backlog quota: namespace={}", namespace, e); - return this.defaultQuota; + Policies policies = zkCache.getDataIfPresent(policyPath); + if (policies != null) { + return policies.backlog_quota_map.getOrDefault(BacklogQuotaType.destination_storage, defaultQuota); + } else { + return defaultQuota; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index b7eca90c5d50d..d9c015ea9e405 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2414,8 +2414,10 @@ public int getDefaultNumPartitions(final TopicName topicName) { private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName) { try { - Optional policies = pulsar.getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, topicName.getNamespace())); + Optional policies = + Optional.ofNullable(pulsar.getConfigurationCache().policiesCache().getDataIfPresent( + AdminResource.path(POLICIES, topicName.getNamespace().toString())) + ); // If namespace policies have the field set, it will override the broker-level setting if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) { return policies.get().autoTopicCreationOverride; @@ -2445,8 +2447,10 @@ public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) { private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) { try { - Optional policies = pulsar.getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, topicName.getNamespace())); + Optional policies = + Optional.ofNullable(pulsar.getConfigurationCache().policiesCache().getDataIfPresent( + AdminResource.path(POLICIES, topicName.getNamespace().toString())) + ); // If namespace policies have the field set, it will override the broker-level setting if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) { return policies.get().autoSubscriptionCreationOverride; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index fa67aa4934e57..22e2d28180ed0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -378,7 +378,8 @@ protected void handleLookup(CommandLookupTopic lookup) { if (isAuthorized) { lookupTopicAsync(getBrokerService().pulsar(), topicName, authoritative, getPrincipal(), getAuthenticationData(), - requestId, advertisedListenerName).handle((lookupResponse, ex) -> { + requestId, advertisedListenerName, + true /* isAlreadyAuthorized */).handle((lookupResponse, ex) -> { if (ex == null) { ctx.writeAndFlush(lookupResponse); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index b2447a26ef1ec..1ddb0a6332e00 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -18,9 +18,6 @@ */ package org.apache.pulsar.broker.service.persistent; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.pulsar.broker.web.PulsarWebResource.path; - import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -28,6 +25,7 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -171,7 +169,7 @@ public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional return true; } - policies = policies.isPresent() ? policies : getPolicies(brokerService, topicName); + policies = policies.isPresent() ? policies : getPolicies(brokerService, topicName); return isDispatchRateNeeded(serviceConfig, policies, topicName, type); } @@ -302,13 +300,12 @@ public DispatchRate getPoliciesDispatchRate(BrokerService brokerService) { public static Optional getPolicies(BrokerService brokerService, String topicName) { final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject(); - final String path = path(POLICIES, namespace.toString()); Optional policies = Optional.empty(); try { ConfigurationCacheService configurationCacheService = brokerService.pulsar().getConfigurationCache(); if (configurationCacheService != null) { - policies = configurationCacheService.policiesCache().getAsync(path) - .get(brokerService.pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS); + return Optional.ofNullable(configurationCacheService.policiesCache() + .getDataIfPresent(AdminResource.path(POLICIES, namespace.toString()))); } } catch (Exception e) { log.warn("Failed to get message-rate for {} ", topicName, e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index c40d4cc096178..d76fb2817da94 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -518,8 +518,11 @@ public void startReplProducers() { // read repl-cluster from policies to avoid restart of replicator which are in process of disconnect and close try { Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())) - .orElseThrow(() -> new KeeperException.NoNodeException()); + .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); + if (policies == null) { + throw new KeeperException.NoNodeException(); + } + if (policies.replication_clusters != null) { Set configuredClusters = Sets.newTreeSet(policies.replication_clusters); replicators.forEach((region, replicator) -> { @@ -2010,8 +2013,14 @@ public void checkInactiveSubscriptions() { TopicName name = TopicName.get(topic); try { Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, name.getNamespace())) - .orElseThrow(() -> new KeeperException.NoNodeException()); + .getDataIfPresent(AdminResource.path(POLICIES, name.getNamespace())); + if (policies == null) { + if (log.isDebugEnabled()) { + log.debug("[{}] Error getting policies", topic); + } + return; + } + final int defaultExpirationTime = brokerService.pulsar().getConfiguration() .getSubscriptionExpirationTimeMinutes(); final long expirationTimeMillis = TimeUnit.MINUTES @@ -2457,8 +2466,10 @@ private int getMessageTTL() throws Exception { TopicName name = TopicName.get(topic); TopicPolicies topicPolicies = getTopicPolicies(name); Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, name.getNamespace())) - .orElseThrow(KeeperException.NoNodeException::new); + .getDataIfPresent(AdminResource.path(POLICIES, name.getNamespace())); + if (policies == null) { + throw new KeeperException.NoNodeException(); + } if (topicPolicies != null && topicPolicies.isMessageTTLSet()) { return topicPolicies.getMessageTTLInSeconds(); } From c518f912c7d8bd35c89c42d79bb75e1287fbe0a0 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 12 Oct 2021 18:39:05 -0700 Subject: [PATCH 2/8] Fixed NPE --- .../pulsar/broker/service/AbstractTopic.java | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 04cdc747c04fe..0b750396375e7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -131,11 +131,9 @@ protected boolean isProducersExceeded() { if (maxProducers == null) { Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache() .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); - - if (policies == null) { - policies = new Policies(); + if (policies != null) { + maxProducers = policies.max_producers_per_topic; } - maxProducers = policies.max_producers_per_topic; } maxProducers = maxProducers != null ? maxProducers : brokerService.pulsar() .getConfiguration().getMaxProducersPerTopic(); @@ -153,14 +151,11 @@ protected boolean isConsumersExceededOnTopic() { } if (maxConsumers == null) { // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks + Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache() + .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); + maxConsumers = policies != null ? policies.max_consumers_per_topic : 0; } - Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache() - .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); - - if (policies == null) { - policies = new Policies(); - } final int maxConsumersPerTopic = maxConsumers > 0 ? maxConsumers : brokerService.pulsar().getConfiguration().getMaxConsumersPerTopic(); if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= getNumberOfConsumers()) { From 0f74a94a9f4125d10d9d8d6fb28cb8a65468e7e4 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 12 Oct 2021 18:41:13 -0700 Subject: [PATCH 3/8] Addressed comments --- .../java/org/apache/pulsar/broker/service/AbstractTopic.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 0b750396375e7..dd6f622d4811a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -114,8 +114,6 @@ public AbstractTopic(String topic, BrokerService brokerService) { this.inactiveTopicPolicies.setMaxInactiveDurationSeconds(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds()); this.inactiveTopicPolicies.setInactiveTopicDeleteMode(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMode()); this.lastActive = System.nanoTime(); - Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache() - .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); this.preciseTopicPublishRateLimitingEnable = brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable(); updatePublishDispatcher(); @@ -391,7 +389,7 @@ protected void checkTopicFenced() throws BrokerServiceException { } } - protected void internalAddProducer(Producer producer) throws BrokerServiceException{ + protected void internalAddProducer(Producer producer) throws BrokerServiceException { if (isProducersExceeded()) { log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic); throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit"); From f8ea208dca735d0817f04abfc9bcf1292ea624f0 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 12 Oct 2021 20:43:10 -0700 Subject: [PATCH 4/8] Fixed issue with mocked tests --- .../pulsar/broker/service/AbstractTopic.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index dd6f622d4811a..339e89d3fccfc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -114,9 +114,17 @@ public AbstractTopic(String topic, BrokerService brokerService) { this.inactiveTopicPolicies.setMaxInactiveDurationSeconds(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds()); this.inactiveTopicPolicies.setInactiveTopicDeleteMode(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMode()); this.lastActive = System.nanoTime(); + Policies policies; + try { + policies = brokerService.pulsar().getConfigurationCache().policiesCache() + .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); + } catch (Exception e) { + log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage()); + policies = new Policies(); + } this.preciseTopicPublishRateLimitingEnable = brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable(); - updatePublishDispatcher(); + updatePublishDispatcher(policies); } protected boolean isProducersExceeded() { @@ -462,10 +470,10 @@ public PublishRateLimiter getBrokerPublishRateLimiter() { } public void updateMaxPublishRate(Policies policies) { - updatePublishDispatcher(); + updatePublishDispatcher(policies); } - private void updatePublishDispatcher() { + private void updatePublishDispatcher(Policies policies) { //if topic-level policy exists, try to use topic-level publish rate policy TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic)); if (topicPolicies != null && topicPolicies.isPublishRateSet()) { @@ -474,9 +482,6 @@ private void updatePublishDispatcher() { return; } - Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache() - .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); - //topic-level policy is not set, try to use namespace-level rate policy final String clusterName = brokerService.pulsar().getConfiguration().getClusterName(); final PublishRate publishRate = policies != null && policies.publishMaxMessageRate != null From 957b4cd7219f87c56c42bee633b6d2022a109244 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 12 Oct 2021 21:56:43 -0700 Subject: [PATCH 5/8] Fixed behavior in BacklogQuotaManager to be like before --- .../apache/pulsar/broker/service/BacklogQuotaManager.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java index 5b2e5054f3919..63e3e98df6a64 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java @@ -65,7 +65,13 @@ public BacklogQuota getDefaultQuota() { } public BacklogQuota getBacklogQuota(String namespace, String policyPath) { - Policies policies = zkCache.getDataIfPresent(policyPath); + Policies policies = null; + try { + policies = zkCache.getDataIfPresent(policyPath); + } catch (Exception e) { + log.warn("Failed to check policies for path {}: {}", policyPath, e); + } + if (policies != null) { return policies.backlog_quota_map.getOrDefault(BacklogQuotaType.destination_storage, defaultQuota); } else { From 4142c391f0b8fe506ae29024bb1317c85ec24694 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 12 Oct 2021 22:43:38 -0700 Subject: [PATCH 6/8] Fixed AuthorizationProducerConsumerTest --- .../pulsar/client/api/AuthorizationProducerConsumerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java index d43f657ace35e..2e071af77e6f5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java @@ -542,13 +542,13 @@ public Boolean allowNamespaceOperation( @Override public CompletableFuture allowTopicOperationAsync( TopicName topic, String role, TopicOperation operation, AuthenticationDataSource authData) { - return CompletableFuture.completedFuture(true); + return CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role)); } @Override public Boolean allowTopicOperation( TopicName topicName, String role, TopicOperation operation, AuthenticationDataSource authData) { - return true; + return clientAuthProviderSupportedRoles.contains(role); } } From fb6019406df8636b3db5dea84855023d0c3226bf Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 13 Oct 2021 00:03:51 -0700 Subject: [PATCH 7/8] Fixed PersistentTopicTest --- .../org/apache/pulsar/broker/service/PersistentTopicTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 41c15c795720d..698f7fd723bcb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -1748,6 +1748,10 @@ public void testCheckInactiveSubscriptions() throws Exception { .get(AdminResource.path(POLICIES, TopicName.get(successTopicName).getNamespace()))) .thenReturn(Optional.of(new Policies())); + when(pulsar.getConfigurationCache().policiesCache() + .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(successTopicName).getNamespace()))) + .thenReturn(new Policies()); + ServiceConfiguration svcConfig = spy(new ServiceConfiguration()); doReturn(5).when(svcConfig).getSubscriptionExpirationTimeMinutes(); doReturn(svcConfig).when(pulsar).getConfiguration(); From baf8c9f7f37ba6fdaf8821c6b93ea9ff18aefbff Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 13 Oct 2021 06:25:33 -0700 Subject: [PATCH 8/8] Fixed PersistentTopicTest --- .../apache/pulsar/broker/service/PersistentTopicTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 698f7fd723bcb..2537c99902207 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -517,6 +517,9 @@ public void testMaxProducersForNamespace() throws Exception { when(pulsar.getConfigurationCache().policiesCache() .get(AdminResource.path(POLICIES, TopicName.get(successTopicName).getNamespace()))) .thenReturn(Optional.of(policies)); + when(pulsar.getConfigurationCache().policiesCache() + .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(successTopicName).getNamespace()))) + .thenReturn(policies); testMaxProducers(); } @@ -1439,6 +1442,9 @@ public void testAtomicReplicationRemoval() throws Exception { when(pulsar.getConfigurationCache().policiesCache() .get(AdminResource.path(POLICIES, TopicName.get(globalTopicName).getNamespace()))) .thenReturn(Optional.of(new Policies())); + when(pulsar.getConfigurationCache().policiesCache() + .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(globalTopicName).getNamespace()))) + .thenReturn(new Policies()); // try to start replicator again topic.startReplProducers(); // verify: replicator.startProducer is not invoked