Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -338,13 +338,13 @@ public CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName n
}

private CompletableFuture<Void> updateSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, Set<String> roles,
boolean remove) {
boolean remove) {
CompletableFuture<Void> result = new CompletableFuture<>();

try {
validatePoliciesReadOnlyAccess();
} catch (Exception e) {
result.completeExceptionally(e);
return result;
}

ZooKeeper globalZk = configCache.getZooKeeper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,30 +146,6 @@ protected String internalGetNamespaceBundle(TopicName topicName) {
}
}

/**
*
* Lookup broker-service address for a given namespace-bundle which contains given topic.
*
* a. Returns broker-address if namespace-bundle is already owned by any broker
* b. If current-broker receives lookup-request and if it's not a leader then current broker redirects request
* to leader by returning leader-service address.
* c. If current-broker is leader then it finds out least-loaded broker to own namespace bundle and redirects request
* by returning least-loaded broker.
* d. If current-broker receives request to own the namespace-bundle then it owns a bundle and returns success(connect)
* response to client.
*
* @param pulsarService
* @param topicName
* @param authoritative
* @param clientAppId
* @param requestId
* @return
*/
public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarService, TopicName topicName,
boolean authoritative, String clientAppId, AuthenticationDataSource authenticationData, long requestId) {
return lookupTopicAsync(pulsarService, topicName, authoritative, clientAppId, authenticationData, requestId, null);
}

/**
*
* Lookup broker-service address for a given namespace-bundle which contains given topic.
Expand All @@ -193,7 +169,8 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarService, TopicName topicName,
boolean authoritative, String clientAppId,
AuthenticationDataSource authenticationData, long requestId,
final String advertisedListenerName) {
final String advertisedListenerName,
boolean isAlreadyAuthorized) {

final CompletableFuture<ByteBuf> validationFuture = new CompletableFuture<>();
final CompletableFuture<ByteBuf> lookupfuture = new CompletableFuture<>();
Expand All @@ -213,7 +190,9 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
} else {
// (2) authorize client
try {
checkAuthorization(pulsarService, topicName, clientAppId, authenticationData);
if (!isAlreadyAuthorized) {
checkAuthorization(pulsarService, topicName, clientAppId, authenticationData);
}
} catch (RestException authException) {
log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName.toString());
validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,13 @@ public AbstractTopic(String topic, BrokerService brokerService) {
this.inactiveTopicPolicies.setMaxInactiveDurationSeconds(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds());
this.inactiveTopicPolicies.setInactiveTopicDeleteMode(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMode());
this.lastActive = System.nanoTime();
Policies policies = null;
Policies policies;
try {
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
.orElseGet(() -> new Policies());
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()));
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
policies = new Policies();
}
this.preciseTopicPublishRateLimitingEnable =
brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable();
Expand All @@ -135,15 +135,11 @@ protected boolean isProducersExceeded() {
}

if (maxProducers == null) {
Policies policies;
try {
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
.orElseGet(() -> new Policies());
} catch (Exception e) {
policies = new Policies();
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
Comment thread
merlimat marked this conversation as resolved.
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()));
if (policies != null) {
maxProducers = policies.max_producers_per_topic;
}
maxProducers = policies.max_producers_per_topic;
}
maxProducers = maxProducers != null ? maxProducers : brokerService.pulsar()
.getConfiguration().getMaxProducersPerTopic();
Expand All @@ -160,22 +156,12 @@ protected boolean isConsumersExceededOnTopic() {
maxConsumers = topicPolicies.getMaxConsumerPerTopic();
}
if (maxConsumers == null) {
Policies policies;
try {
// Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()));

if (policies == null) {
policies = new Policies();
}
} catch (Exception e) {
log.warn("[{}] Failed to get namespace policies that include max number of consumers: {}", topic,
e.getMessage());
policies = new Policies();
}
maxConsumers = policies.max_consumers_per_topic;
// Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()));
maxConsumers = policies != null ? policies.max_consumers_per_topic : 0;
}

final int maxConsumersPerTopic = maxConsumers > 0 ? maxConsumers
: brokerService.pulsar().getConfiguration().getMaxConsumersPerTopic();
if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= getNumberOfConsumers()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,17 @@ public BacklogQuota getDefaultQuota() {
}

public BacklogQuota getBacklogQuota(String namespace, String policyPath) {
Policies policies = null;
try {
return zkCache.get(policyPath)
.map(p -> p.backlog_quota_map.getOrDefault(BacklogQuotaType.destination_storage, defaultQuota))
.orElse(defaultQuota);
policies = zkCache.getDataIfPresent(policyPath);
} catch (Exception e) {
log.warn("Failed to read policies data, will apply the default backlog quota: namespace={}", namespace, e);
return this.defaultQuota;
log.warn("Failed to check policies for path {}: {}", policyPath, e);
}

if (policies != null) {
return policies.backlog_quota_map.getOrDefault(BacklogQuotaType.destination_storage, defaultQuota);
} else {
return defaultQuota;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2414,8 +2414,10 @@ public int getDefaultNumPartitions(final TopicName topicName) {

private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName) {
try {
Optional<Policies> policies = pulsar.getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, topicName.getNamespace()));
Optional<Policies> policies =
Optional.ofNullable(pulsar.getConfigurationCache().policiesCache().getDataIfPresent(
AdminResource.path(POLICIES, topicName.getNamespace().toString()))
);
// If namespace policies have the field set, it will override the broker-level setting
if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) {
return policies.get().autoTopicCreationOverride;
Expand Down Expand Up @@ -2445,8 +2447,10 @@ public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) {

private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) {
try {
Optional<Policies> policies = pulsar.getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, topicName.getNamespace()));
Optional<Policies> policies =
Optional.ofNullable(pulsar.getConfigurationCache().policiesCache().getDataIfPresent(
AdminResource.path(POLICIES, topicName.getNamespace().toString()))
);
// If namespace policies have the field set, it will override the broker-level setting
if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) {
return policies.get().autoSubscriptionCreationOverride;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,8 @@ protected void handleLookup(CommandLookupTopic lookup) {
if (isAuthorized) {
lookupTopicAsync(getBrokerService().pulsar(), topicName, authoritative,
getPrincipal(), getAuthenticationData(),
requestId, advertisedListenerName).handle((lookupResponse, ex) -> {
requestId, advertisedListenerName,
true /* isAlreadyAuthorized */).handle((lookupResponse, ex) -> {
if (ex == null) {
ctx.writeAndFlush(lookupResponse);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@
*/
package org.apache.pulsar.broker.service.persistent;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.pulsar.broker.web.PulsarWebResource.path;

import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
Expand Down Expand Up @@ -171,7 +169,7 @@ public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional
return true;
}

policies = policies.isPresent() ? policies : getPolicies(brokerService, topicName);
policies = policies.isPresent() ? policies : getPolicies(brokerService, topicName);
return isDispatchRateNeeded(serviceConfig, policies, topicName, type);
}

Expand Down Expand Up @@ -302,13 +300,12 @@ public DispatchRate getPoliciesDispatchRate(BrokerService brokerService) {

public static Optional<Policies> getPolicies(BrokerService brokerService, String topicName) {
final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject();
final String path = path(POLICIES, namespace.toString());
Optional<Policies> policies = Optional.empty();
try {
ConfigurationCacheService configurationCacheService = brokerService.pulsar().getConfigurationCache();
if (configurationCacheService != null) {
policies = configurationCacheService.policiesCache().getAsync(path)
.get(brokerService.pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
return Optional.ofNullable(configurationCacheService.policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, namespace.toString())));
}
} catch (Exception e) {
log.warn("Failed to get message-rate for {} ", topicName, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,11 @@ public void startReplProducers() {
// read repl-cluster from policies to avoid restart of replicator which are in process of disconnect and close
try {
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
.orElseThrow(() -> new KeeperException.NoNodeException());
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()));
if (policies == null) {
throw new KeeperException.NoNodeException();
}

if (policies.replication_clusters != null) {
Set<String> configuredClusters = Sets.newTreeSet(policies.replication_clusters);
replicators.forEach((region, replicator) -> {
Expand Down Expand Up @@ -2010,8 +2013,14 @@ public void checkInactiveSubscriptions() {
TopicName name = TopicName.get(topic);
try {
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, name.getNamespace()))
.orElseThrow(() -> new KeeperException.NoNodeException());
.getDataIfPresent(AdminResource.path(POLICIES, name.getNamespace()));
if (policies == null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Error getting policies", topic);
}
return;
}

final int defaultExpirationTime = brokerService.pulsar().getConfiguration()
.getSubscriptionExpirationTimeMinutes();
final long expirationTimeMillis = TimeUnit.MINUTES
Expand Down Expand Up @@ -2457,8 +2466,10 @@ private int getMessageTTL() throws Exception {
TopicName name = TopicName.get(topic);
TopicPolicies topicPolicies = getTopicPolicies(name);
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, name.getNamespace()))
.orElseThrow(KeeperException.NoNodeException::new);
.getDataIfPresent(AdminResource.path(POLICIES, name.getNamespace()));
if (policies == null) {
throw new KeeperException.NoNodeException();
}
if (topicPolicies != null && topicPolicies.isMessageTTLSet()) {
return topicPolicies.getMessageTTLInSeconds();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,9 @@ public void testMaxProducersForNamespace() throws Exception {
when(pulsar.getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, TopicName.get(successTopicName).getNamespace())))
.thenReturn(Optional.of(policies));
when(pulsar.getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(successTopicName).getNamespace())))
.thenReturn(policies);
testMaxProducers();
}

Expand Down Expand Up @@ -1439,6 +1442,9 @@ public void testAtomicReplicationRemoval() throws Exception {
when(pulsar.getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, TopicName.get(globalTopicName).getNamespace())))
.thenReturn(Optional.of(new Policies()));
when(pulsar.getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(globalTopicName).getNamespace())))
.thenReturn(new Policies());
// try to start replicator again
topic.startReplProducers();
// verify: replicator.startProducer is not invoked
Expand Down Expand Up @@ -1748,6 +1754,10 @@ public void testCheckInactiveSubscriptions() throws Exception {
.get(AdminResource.path(POLICIES, TopicName.get(successTopicName).getNamespace())))
.thenReturn(Optional.of(new Policies()));

when(pulsar.getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(successTopicName).getNamespace())))
.thenReturn(new Policies());

ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
doReturn(5).when(svcConfig).getSubscriptionExpirationTimeMinutes();
doReturn(svcConfig).when(pulsar).getConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,13 +542,13 @@ public Boolean allowNamespaceOperation(
@Override
public CompletableFuture<Boolean> allowTopicOperationAsync(
TopicName topic, String role, TopicOperation operation, AuthenticationDataSource authData) {
return CompletableFuture.completedFuture(true);
return CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
}

@Override
public Boolean allowTopicOperation(
TopicName topicName, String role, TopicOperation operation, AuthenticationDataSource authData) {
return true;
return clientAuthProviderSupportedRoles.contains(role);
}
}

Expand Down