From 07dc5a54dfac914194fb9ae734e1a63c647a9bef Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 12 Oct 2021 11:34:55 -0700 Subject: [PATCH 1/6] Avoid potentially blocking calls to metadata on critical threads --- .../PulsarAuthorizationProvider.java | 66 +++++++++---------- .../pulsar/broker/service/AbstractTopic.java | 49 +++++--------- .../pulsar/broker/service/BrokerService.java | 35 ++++------ .../persistent/DispatchRateLimiter.java | 9 +-- 4 files changed, 61 insertions(+), 98 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 005707f58b068..7ec3a64ff63ad 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import com.google.common.base.Function; import java.io.IOException; import java.util.Collections; import java.util.Map; @@ -46,6 +47,7 @@ import org.apache.pulsar.common.util.RestException; import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException; import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -322,48 +324,42 @@ public CompletableFuture revokeSubscriptionPermissionAsync(NamespaceName n return updateSubscriptionPermissionAsync(namespace, subscriptionName, Collections.singleton(role), true); } - private CompletableFuture updateSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, Set roles, - boolean remove) { - CompletableFuture result = new CompletableFuture<>(); - + private CompletableFuture updateSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, + Set roles, + boolean remove) { try { validatePoliciesReadOnlyAccess(); } catch (Exception e) { - result.completeExceptionally(e); + return FutureUtil.failedFuture(e); } - try { - Policies policies = pulsarResources.getNamespaceResources().getPolicies(namespace) - .orElseThrow(() -> new NotFoundException(namespace + " not found")); - if (remove) { - if (policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName) != null) { - policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName).removeAll(roles); - }else { - log.info("[{}] Couldn't find role {} while revoking for sub = {}", namespace, subscriptionName, roles); - result.completeExceptionally(new IllegalArgumentException("couldn't find subscription")); - return result; - } - } else { - policies.auth_policies.getSubscriptionAuthentication().put(subscriptionName, roles); - } - pulsarResources.getNamespaceResources().setPolicies(namespace, (data)->policies); + CompletableFuture future = + pulsarResources.getNamespaceResources().setPoliciesAsync(namespace, policies -> { + if (remove) { + if (policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName) != null) { + policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName) + .removeAll(roles); + } else { + log.info("[{}] Couldn't find role {} while revoking for sub = {}", namespace, + subscriptionName, roles); + throw new IllegalArgumentException("couldn't find subscription"); + } + } else { + policies.auth_policies.getSubscriptionAuthentication().put(subscriptionName, roles); + } + return policies; + }).thenRun(() -> { + log.info("[{}] Successfully granted access for role {} for sub = {}", namespace, subscriptionName, + roles); + }); - log.info("[{}] Successfully granted access for role {} for sub = {}", namespace, subscriptionName, roles); - result.complete(null); - } catch (NotFoundException e) { - log.warn("[{}] Failed to set permissions for namespace {}: does not exist", subscriptionName, namespace); - result.completeExceptionally(new IllegalArgumentException("Namespace does not exist" + namespace)); - } catch (BadVersionException e) { - log.warn("[{}] Failed to set permissions for {} on namespace {}: concurrent modification", subscriptionName, roles, namespace); - result.completeExceptionally(new IllegalStateException( - "Concurrent modification on metadata path: " + namespace + ", " + e.getMessage())); - } catch (Exception e) { - log.error("[{}] Failed to get permissions for role {} on namespace {}", subscriptionName, roles, namespace, e); - result.completeExceptionally( - new IllegalStateException("Failed to get permissions for namespace " + namespace)); - } + future.exceptionally(ex -> { + log.error("[{}] Failed to get permissions for role {} on namespace {}", subscriptionName, roles, namespace, + ex); + return null; + }); - return result; + return future; } private CompletableFuture checkAuthorization(TopicName topicName, String role, AuthAction action) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 1e5710b090dff..bde3e89fd3434 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -146,31 +146,18 @@ public AbstractTopic(String topic, BrokerService brokerService) { this.topicMaxMessageSizeCheckIntervalMs = TimeUnit.SECONDS.toMillis(brokerService.pulsar().getConfiguration() .getMaxMessageSizeCheckIntervalInSeconds()); this.lastActive = System.nanoTime(); - Policies policies = null; - try { - policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies( - TopicName.get(topic).getNamespaceObject()) - .orElseGet(() -> new Policies()); - } catch (Exception e) { - log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage()); - } this.preciseTopicPublishRateLimitingEnable = brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable(); - updatePublishDispatcher(policies); + updatePublishDispatcher(); } protected boolean isProducersExceeded() { Integer maxProducers = getTopicPolicies().map(TopicPolicies::getMaxProducerPerTopic).orElse(null); if (maxProducers == null) { - Policies policies; - try { - policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies( - TopicName.get(topic).getNamespaceObject()) - .orElseGet(() -> new Policies()); - } catch (Exception e) { - policies = new Policies(); - } + Policies policies = brokerService.pulsar().getPulsarResources().getNamespaceResources() + .getPoliciesIfCached(TopicName.get(topic).getNamespaceObject()) + .orElseGet(() -> new Policies()); maxProducers = policies.max_producers_per_topic; } maxProducers = maxProducers != null ? maxProducers : brokerService.pulsar() @@ -209,18 +196,12 @@ protected boolean isConsumersExceededOnTopic() { Integer maxConsumers = getTopicPolicies().map(TopicPolicies::getMaxConsumerPerTopic).orElse(null); if (maxConsumers == null) { Policies policies; - try { - // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks - policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies( - TopicName.get(topic).getNamespaceObject()) - .orElseGet(() -> new Policies()); + // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks + policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached( + TopicName.get(topic).getNamespaceObject()) + .orElseGet(() -> new Policies()); - if (policies == null) { - policies = new Policies(); - } - } catch (Exception e) { - log.warn("[{}] Failed to get namespace policies that include max number of consumers: {}", topic, - e.getMessage()); + if (policies == null) { policies = new Policies(); } maxConsumers = policies.max_consumers_per_topic; @@ -630,7 +611,7 @@ protected void checkTopicFenced() throws BrokerServiceException { } } - protected void internalAddProducer(Producer producer) throws BrokerServiceException { + protected void internalAddProducer(Producer producer) throws BrokerServiceException{ if (isProducersExceeded()) { log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic); throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit"); @@ -789,10 +770,10 @@ public PublishRateLimiter getBrokerPublishRateLimiter() { } public void updateMaxPublishRate(Policies policies) { - updatePublishDispatcher(policies); + updatePublishDispatcher(); } - private void updatePublishDispatcher(Policies policies) { + private void updatePublishDispatcher() { //if topic-level policy exists, try to use topic-level publish rate policy Optional topicPublishRate = getTopicPolicies().map(TopicPolicies::getPublishRate); if (topicPublishRate.isPresent()) { @@ -802,9 +783,13 @@ private void updatePublishDispatcher(Policies policies) { return; } + Policies policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached( + TopicName.get(topic).getNamespaceObject()) + .orElseGet(() -> new Policies()); + //topic-level policy is not set, try to use namespace-level rate policy final String clusterName = brokerService.pulsar().getConfiguration().getClusterName(); - final PublishRate publishRate = policies != null && policies.publishMaxMessageRate != null + final PublishRate publishRate = policies.publishMaxMessageRate != null ? policies.publishMaxMessageRate.get(clusterName) : null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index bd3c1e9b9bc19..71c64c4c55278 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2535,18 +2535,12 @@ public int getDefaultNumPartitions(final TopicName topicName) { } private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName) { - try { - Optional policies = - pulsar.getPulsarResources().getNamespaceResources().getPolicies(topicName.getNamespaceObject()); - // If namespace policies have the field set, it will override the broker-level setting - if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) { - return policies.get().autoTopicCreationOverride; - } - } catch (Throwable t) { - // Ignoring since if we don't have policies, we fallback on the default - log.warn("Got exception when reading autoTopicCreateOverride policy for {}: {};", - topicName, t.getMessage(), t); - return null; + Optional policies = + pulsar.getPulsarResources().getNamespaceResources() + .getPoliciesIfCached(topicName.getNamespaceObject()); + // If namespace policies have the field set, it will override the broker-level setting + if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) { + return policies.get().autoTopicCreationOverride; } log.debug("No autoTopicCreateOverride policy found for {}", topicName); return null; @@ -2568,18 +2562,11 @@ public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) { } private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) { - try { - Optional policies = - pulsar.getPulsarResources().getNamespaceResources().getPolicies(topicName.getNamespaceObject()); - // If namespace policies have the field set, it will override the broker-level setting - if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) { - return policies.get().autoSubscriptionCreationOverride; - } - } catch (Throwable t) { - // Ignoring since if we don't have policies, we fallback on the default - log.warn("Got exception when reading autoSubscriptionCreateOverride policy for {}: {};", - topicName, t.getMessage(), t); - return null; + Optional policies = + pulsar.getPulsarResources().getNamespaceResources().getPoliciesIfCached(topicName.getNamespaceObject()); + // If namespace policies have the field set, it will override the broker-level setting + if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) { + return policies.get().autoSubscriptionCreationOverride; } log.debug("No autoSubscriptionCreateOverride policy found for {}", topicName); return null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index 696995d51ccb3..14f9d8dd7f925 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -180,7 +180,7 @@ public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional return true; } - policies = policies.isPresent() ? policies : getPolicies(brokerService, topicName); + policies = policies.isPresent() ? policies : getPolicies(brokerService, topicName); return isDispatchRateNeeded(serviceConfig, policies, topicName, type); } @@ -320,12 +320,7 @@ public DispatchRate getPoliciesDispatchRate(BrokerService brokerService) { public static Optional getPolicies(BrokerService brokerService, String topicName) { final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject(); - try { - return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(namespace); - } catch (Exception e) { - log.warn("Failed to get message-rate for {} ", topicName, e); - return Optional.empty(); - } + return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(namespace); } /** From 868763d652a8acda603ff3430d99d3826f409648 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 12 Oct 2021 14:24:05 -0700 Subject: [PATCH 2/6] Fixed log arguments order --- .../broker/authorization/PulsarAuthorizationProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 7ec3a64ff63ad..84b107a0974bf 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -341,7 +341,7 @@ private CompletableFuture updateSubscriptionPermissionAsync(NamespaceName .removeAll(roles); } else { log.info("[{}] Couldn't find role {} while revoking for sub = {}", namespace, - subscriptionName, roles); + roles, subscriptionName); throw new IllegalArgumentException("couldn't find subscription"); } } else { From b591d44d39d9ac8cc5493fa959941441dfedb87f Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 12 Oct 2021 15:36:32 -0700 Subject: [PATCH 3/6] Addressed comments --- .../authorization/PulsarAuthorizationProvider.java | 7 ++++--- .../org/apache/pulsar/broker/service/AbstractTopic.java | 9 +++------ .../broker/service/persistent/DispatchRateLimiter.java | 2 +- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 84b107a0974bf..e355b122fc013 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -336,9 +336,10 @@ private CompletableFuture updateSubscriptionPermissionAsync(NamespaceName CompletableFuture future = pulsarResources.getNamespaceResources().setPoliciesAsync(namespace, policies -> { if (remove) { - if (policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName) != null) { - policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName) - .removeAll(roles); + Set subscriptionAuth = + policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName); + if (subscriptionAuth != null) { + subscriptionAuth.removeAll(roles); } else { log.info("[{}] Couldn't find role {} while revoking for sub = {}", namespace, roles, subscriptionName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index bde3e89fd3434..9f4c3e6cf1d3f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -195,15 +195,12 @@ public int getNumberOfSameAddressProducers(final String clientAddress) { protected boolean isConsumersExceededOnTopic() { Integer maxConsumers = getTopicPolicies().map(TopicPolicies::getMaxConsumerPerTopic).orElse(null); if (maxConsumers == null) { - Policies policies; + // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks - policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached( + Policies policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached( TopicName.get(topic).getNamespaceObject()) .orElseGet(() -> new Policies()); - if (policies == null) { - policies = new Policies(); - } maxConsumers = policies.max_consumers_per_topic; } final int maxConsumersPerTopic = maxConsumers != null ? maxConsumers @@ -611,7 +608,7 @@ protected void checkTopicFenced() throws BrokerServiceException { } } - protected void internalAddProducer(Producer producer) throws BrokerServiceException{ + protected void internalAddProducer(Producer producer) throws BrokerServiceException { if (isProducersExceeded()) { log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic); throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit"); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index 14f9d8dd7f925..47bb6388d6b63 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -180,7 +180,7 @@ public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional return true; } - policies = policies.isPresent() ? policies : getPolicies(brokerService, topicName); + policies = policies.isPresent() ? policies : getPolicies(brokerService, topicName); return isDispatchRateNeeded(serviceConfig, policies, topicName, type); } From 353b3ab29e7b9dbc600d4345f8167dace68af832 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 12 Oct 2021 17:52:46 -0700 Subject: [PATCH 4/6] Fixed mock in PersistentSubscriptionTest --- .../service/persistent/PersistentSubscriptionTest.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java index 8be9b81a1c28c..76f485ebb42a3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java @@ -54,6 +54,7 @@ import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -120,7 +121,13 @@ public void setup() throws Exception { svcConfig.setBrokerShutdownTimeoutMs(0L); svcConfig.setTransactionCoordinatorEnabled(true); pulsarMock = spy(new PulsarService(svcConfig)); - doReturn(mock(PulsarResources.class)).when(pulsarMock).getPulsarResources(); + PulsarResources pulsarResources = mock(PulsarResources.class); + doReturn(pulsarResources).when(pulsarMock).getPulsarResources(); + NamespaceResources namespaceResources = mock(NamespaceResources.class); + doReturn(namespaceResources).when(pulsarResources).getNamespaceResources(); + + doReturn(Optional.of(new Policies())).when(namespaceResources).getPoliciesIfCached(any()); + doReturn(new InMemTransactionBufferProvider()).when(pulsarMock).getTransactionBufferProvider(); doReturn(new TransactionPendingAckStoreProvider() { @Override From 6838bae86723948d914e53d3af0ffa80255e943e Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 12 Oct 2021 20:48:26 -0700 Subject: [PATCH 5/6] Fixed issue in mocked tests --- .../org/apache/pulsar/broker/service/AbstractTopic.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 9f4c3e6cf1d3f..cbd19650c1f0a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -780,9 +780,15 @@ private void updatePublishDispatcher() { return; } - Policies policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached( + Policies policies; + try { + policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached( TopicName.get(topic).getNamespaceObject()) .orElseGet(() -> new Policies()); + } catch (Exception e) { + log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage()); + policies = new Policies(); + } //topic-level policy is not set, try to use namespace-level rate policy final String clusterName = brokerService.pulsar().getConfiguration().getClusterName(); From 2e5ad414dd3f42dc4aac8c28217c5a8290f08dd3 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 13 Oct 2021 23:41:48 -0700 Subject: [PATCH 6/6] Fixed test that was force policies modification under the hood --- .../pulsar/broker/service/AbstractTopic.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index cbd19650c1f0a..951d6025eb51b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -148,7 +148,7 @@ public AbstractTopic(String topic, BrokerService brokerService) { this.lastActive = System.nanoTime(); this.preciseTopicPublishRateLimitingEnable = brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable(); - updatePublishDispatcher(); + updatePublishDispatcher(Optional.empty()); } protected boolean isProducersExceeded() { @@ -767,10 +767,10 @@ public PublishRateLimiter getBrokerPublishRateLimiter() { } public void updateMaxPublishRate(Policies policies) { - updatePublishDispatcher(); + updatePublishDispatcher(Optional.of(policies)); } - private void updatePublishDispatcher() { + private void updatePublishDispatcher(Optional optPolicies) { //if topic-level policy exists, try to use topic-level publish rate policy Optional topicPublishRate = getTopicPolicies().map(TopicPolicies::getPublishRate); if (topicPublishRate.isPresent()) { @@ -782,9 +782,13 @@ private void updatePublishDispatcher() { Policies policies; try { - policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached( - TopicName.get(topic).getNamespaceObject()) - .orElseGet(() -> new Policies()); + if (optPolicies.isPresent()) { + policies = optPolicies.get(); + } else { + policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached( + TopicName.get(topic).getNamespaceObject()) + .orElseGet(() -> new Policies()); + } } catch (Exception e) { log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage()); policies = new Policies();