diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 934d058a1eb30..12c289f567da6 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -338,13 +338,13 @@ public CompletableFuture revokeSubscriptionPermissionAsync(NamespaceName n } private CompletableFuture updateSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, Set roles, - boolean remove) { + boolean remove) { CompletableFuture result = new CompletableFuture<>(); - try { validatePoliciesReadOnlyAccess(); } catch (Exception e) { result.completeExceptionally(e); + return result; } ZooKeeper globalZk = configCache.getZooKeeper(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java index 19ddce755c1f3..67b3268607e59 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java @@ -146,30 +146,6 @@ protected String internalGetNamespaceBundle(TopicName topicName) { } } - /** - * - * Lookup broker-service address for a given namespace-bundle which contains given topic. - * - * a. Returns broker-address if namespace-bundle is already owned by any broker - * b. If current-broker receives lookup-request and if it's not a leader then current broker redirects request - * to leader by returning leader-service address. - * c. If current-broker is leader then it finds out least-loaded broker to own namespace bundle and redirects request - * by returning least-loaded broker. - * d. If current-broker receives request to own the namespace-bundle then it owns a bundle and returns success(connect) - * response to client. - * - * @param pulsarService - * @param topicName - * @param authoritative - * @param clientAppId - * @param requestId - * @return - */ - public static CompletableFuture lookupTopicAsync(PulsarService pulsarService, TopicName topicName, - boolean authoritative, String clientAppId, AuthenticationDataSource authenticationData, long requestId) { - return lookupTopicAsync(pulsarService, topicName, authoritative, clientAppId, authenticationData, requestId, null); - } - /** * * Lookup broker-service address for a given namespace-bundle which contains given topic. @@ -193,7 +169,8 @@ public static CompletableFuture lookupTopicAsync(PulsarService pulsarSe public static CompletableFuture lookupTopicAsync(PulsarService pulsarService, TopicName topicName, boolean authoritative, String clientAppId, AuthenticationDataSource authenticationData, long requestId, - final String advertisedListenerName) { + final String advertisedListenerName, + boolean isAlreadyAuthorized) { final CompletableFuture validationFuture = new CompletableFuture<>(); final CompletableFuture lookupfuture = new CompletableFuture<>(); @@ -213,7 +190,9 @@ public static CompletableFuture lookupTopicAsync(PulsarService pulsarSe } else { // (2) authorize client try { - checkAuthorization(pulsarService, topicName, clientAppId, authenticationData); + if (!isAlreadyAuthorized) { + checkAuthorization(pulsarService, topicName, clientAppId, authenticationData); + } } catch (RestException authException) { log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName.toString()); validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 23b22212077d6..339e89d3fccfc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -114,13 +114,13 @@ public AbstractTopic(String topic, BrokerService brokerService) { this.inactiveTopicPolicies.setMaxInactiveDurationSeconds(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds()); this.inactiveTopicPolicies.setInactiveTopicDeleteMode(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMode()); this.lastActive = System.nanoTime(); - Policies policies = null; + Policies policies; try { policies = brokerService.pulsar().getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())) - .orElseGet(() -> new Policies()); + .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); } catch (Exception e) { log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage()); + policies = new Policies(); } this.preciseTopicPublishRateLimitingEnable = brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable(); @@ -135,15 +135,11 @@ protected boolean isProducersExceeded() { } if (maxProducers == null) { - Policies policies; - try { - policies = brokerService.pulsar().getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())) - .orElseGet(() -> new Policies()); - } catch (Exception e) { - policies = new Policies(); + Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache() + .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); + if (policies != null) { + maxProducers = policies.max_producers_per_topic; } - maxProducers = policies.max_producers_per_topic; } maxProducers = maxProducers != null ? maxProducers : brokerService.pulsar() .getConfiguration().getMaxProducersPerTopic(); @@ -160,22 +156,12 @@ protected boolean isConsumersExceededOnTopic() { maxConsumers = topicPolicies.getMaxConsumerPerTopic(); } if (maxConsumers == null) { - Policies policies; - try { - // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks - policies = brokerService.pulsar().getConfigurationCache().policiesCache() - .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); - - if (policies == null) { - policies = new Policies(); - } - } catch (Exception e) { - log.warn("[{}] Failed to get namespace policies that include max number of consumers: {}", topic, - e.getMessage()); - policies = new Policies(); - } - maxConsumers = policies.max_consumers_per_topic; + // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks + Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache() + .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); + maxConsumers = policies != null ? policies.max_consumers_per_topic : 0; } + final int maxConsumersPerTopic = maxConsumers > 0 ? maxConsumers : brokerService.pulsar().getConfiguration().getMaxConsumersPerTopic(); if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= getNumberOfConsumers()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java index 020efa144b1f6..63e3e98df6a64 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java @@ -65,13 +65,17 @@ public BacklogQuota getDefaultQuota() { } public BacklogQuota getBacklogQuota(String namespace, String policyPath) { + Policies policies = null; try { - return zkCache.get(policyPath) - .map(p -> p.backlog_quota_map.getOrDefault(BacklogQuotaType.destination_storage, defaultQuota)) - .orElse(defaultQuota); + policies = zkCache.getDataIfPresent(policyPath); } catch (Exception e) { - log.warn("Failed to read policies data, will apply the default backlog quota: namespace={}", namespace, e); - return this.defaultQuota; + log.warn("Failed to check policies for path {}: {}", policyPath, e); + } + + if (policies != null) { + return policies.backlog_quota_map.getOrDefault(BacklogQuotaType.destination_storage, defaultQuota); + } else { + return defaultQuota; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index b7eca90c5d50d..d9c015ea9e405 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2414,8 +2414,10 @@ public int getDefaultNumPartitions(final TopicName topicName) { private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName) { try { - Optional policies = pulsar.getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, topicName.getNamespace())); + Optional policies = + Optional.ofNullable(pulsar.getConfigurationCache().policiesCache().getDataIfPresent( + AdminResource.path(POLICIES, topicName.getNamespace().toString())) + ); // If namespace policies have the field set, it will override the broker-level setting if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) { return policies.get().autoTopicCreationOverride; @@ -2445,8 +2447,10 @@ public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) { private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) { try { - Optional policies = pulsar.getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, topicName.getNamespace())); + Optional policies = + Optional.ofNullable(pulsar.getConfigurationCache().policiesCache().getDataIfPresent( + AdminResource.path(POLICIES, topicName.getNamespace().toString())) + ); // If namespace policies have the field set, it will override the broker-level setting if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) { return policies.get().autoSubscriptionCreationOverride; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index fa67aa4934e57..22e2d28180ed0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -378,7 +378,8 @@ protected void handleLookup(CommandLookupTopic lookup) { if (isAuthorized) { lookupTopicAsync(getBrokerService().pulsar(), topicName, authoritative, getPrincipal(), getAuthenticationData(), - requestId, advertisedListenerName).handle((lookupResponse, ex) -> { + requestId, advertisedListenerName, + true /* isAlreadyAuthorized */).handle((lookupResponse, ex) -> { if (ex == null) { ctx.writeAndFlush(lookupResponse); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index b2447a26ef1ec..1ddb0a6332e00 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -18,9 +18,6 @@ */ package org.apache.pulsar.broker.service.persistent; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.pulsar.broker.web.PulsarWebResource.path; - import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -28,6 +25,7 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -171,7 +169,7 @@ public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional return true; } - policies = policies.isPresent() ? policies : getPolicies(brokerService, topicName); + policies = policies.isPresent() ? policies : getPolicies(brokerService, topicName); return isDispatchRateNeeded(serviceConfig, policies, topicName, type); } @@ -302,13 +300,12 @@ public DispatchRate getPoliciesDispatchRate(BrokerService brokerService) { public static Optional getPolicies(BrokerService brokerService, String topicName) { final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject(); - final String path = path(POLICIES, namespace.toString()); Optional policies = Optional.empty(); try { ConfigurationCacheService configurationCacheService = brokerService.pulsar().getConfigurationCache(); if (configurationCacheService != null) { - policies = configurationCacheService.policiesCache().getAsync(path) - .get(brokerService.pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS); + return Optional.ofNullable(configurationCacheService.policiesCache() + .getDataIfPresent(AdminResource.path(POLICIES, namespace.toString()))); } } catch (Exception e) { log.warn("Failed to get message-rate for {} ", topicName, e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index c40d4cc096178..d76fb2817da94 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -518,8 +518,11 @@ public void startReplProducers() { // read repl-cluster from policies to avoid restart of replicator which are in process of disconnect and close try { Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())) - .orElseThrow(() -> new KeeperException.NoNodeException()); + .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); + if (policies == null) { + throw new KeeperException.NoNodeException(); + } + if (policies.replication_clusters != null) { Set configuredClusters = Sets.newTreeSet(policies.replication_clusters); replicators.forEach((region, replicator) -> { @@ -2010,8 +2013,14 @@ public void checkInactiveSubscriptions() { TopicName name = TopicName.get(topic); try { Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, name.getNamespace())) - .orElseThrow(() -> new KeeperException.NoNodeException()); + .getDataIfPresent(AdminResource.path(POLICIES, name.getNamespace())); + if (policies == null) { + if (log.isDebugEnabled()) { + log.debug("[{}] Error getting policies", topic); + } + return; + } + final int defaultExpirationTime = brokerService.pulsar().getConfiguration() .getSubscriptionExpirationTimeMinutes(); final long expirationTimeMillis = TimeUnit.MINUTES @@ -2457,8 +2466,10 @@ private int getMessageTTL() throws Exception { TopicName name = TopicName.get(topic); TopicPolicies topicPolicies = getTopicPolicies(name); Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, name.getNamespace())) - .orElseThrow(KeeperException.NoNodeException::new); + .getDataIfPresent(AdminResource.path(POLICIES, name.getNamespace())); + if (policies == null) { + throw new KeeperException.NoNodeException(); + } if (topicPolicies != null && topicPolicies.isMessageTTLSet()) { return topicPolicies.getMessageTTLInSeconds(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 41c15c795720d..2537c99902207 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -517,6 +517,9 @@ public void testMaxProducersForNamespace() throws Exception { when(pulsar.getConfigurationCache().policiesCache() .get(AdminResource.path(POLICIES, TopicName.get(successTopicName).getNamespace()))) .thenReturn(Optional.of(policies)); + when(pulsar.getConfigurationCache().policiesCache() + .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(successTopicName).getNamespace()))) + .thenReturn(policies); testMaxProducers(); } @@ -1439,6 +1442,9 @@ public void testAtomicReplicationRemoval() throws Exception { when(pulsar.getConfigurationCache().policiesCache() .get(AdminResource.path(POLICIES, TopicName.get(globalTopicName).getNamespace()))) .thenReturn(Optional.of(new Policies())); + when(pulsar.getConfigurationCache().policiesCache() + .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(globalTopicName).getNamespace()))) + .thenReturn(new Policies()); // try to start replicator again topic.startReplProducers(); // verify: replicator.startProducer is not invoked @@ -1748,6 +1754,10 @@ public void testCheckInactiveSubscriptions() throws Exception { .get(AdminResource.path(POLICIES, TopicName.get(successTopicName).getNamespace()))) .thenReturn(Optional.of(new Policies())); + when(pulsar.getConfigurationCache().policiesCache() + .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(successTopicName).getNamespace()))) + .thenReturn(new Policies()); + ServiceConfiguration svcConfig = spy(new ServiceConfiguration()); doReturn(5).when(svcConfig).getSubscriptionExpirationTimeMinutes(); doReturn(svcConfig).when(pulsar).getConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java index d43f657ace35e..2e071af77e6f5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java @@ -542,13 +542,13 @@ public Boolean allowNamespaceOperation( @Override public CompletableFuture allowTopicOperationAsync( TopicName topic, String role, TopicOperation operation, AuthenticationDataSource authData) { - return CompletableFuture.completedFuture(true); + return CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role)); } @Override public Boolean allowTopicOperation( TopicName topicName, String role, TopicOperation operation, AuthenticationDataSource authData) { - return true; + return clientAuthProviderSupportedRoles.contains(role); } }