Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ protected boolean isProducersExceeded() {
Policies policies;
try {
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()));
.get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
.orElseGet(Policies::new);
} catch (Exception e) {
policies = new Policies();
}

maxProducers = policies.max_producers_per_topic;
}
maxProducers = maxProducers != null ? maxProducers : brokerService.pulsar()
Expand Down Expand Up @@ -193,10 +193,12 @@ protected boolean isConsumersExceededOnTopic() {
// Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()));
if (policies == null) {
policies = new Policies();
}
} catch (Exception e) {
policies = new Policies();
}

maxConsumers = policies.max_consumers_per_topic;
}
final int maxConsumersPerTopic = maxConsumers != null ? maxConsumers
Expand Down Expand Up @@ -773,10 +775,8 @@ private void updatePublishDispatcher(Optional<Policies> optPolicies) {
policies = optPolicies.get();
} else {
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()));
if (policies == null) {
policies = new Policies();
}
.get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
.orElseGet(Policies::new);
}
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2568,11 +2568,12 @@ public int getDefaultNumPartitions(final TopicName topicName) {

private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName) {
try {
Policies policies = pulsar.getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, topicName.getNamespace()));
Optional<Policies> policies = pulsar.getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, topicName.getNamespace()));

// If namespace policies have the field set, it will override the broker-level setting
if (policies != null && policies.autoTopicCreationOverride != null) {
return policies.autoTopicCreationOverride;
if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) {
return policies.get().autoTopicCreationOverride;
}
} catch (Throwable t) {
// Ignoring since if we don't have policies, we fallback on the default
Expand Down Expand Up @@ -2601,11 +2602,11 @@ public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) {

private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) {
try {
Policies policies = pulsar.getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, topicName.getNamespace()));
Optional<Policies> policies = pulsar.getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, topicName.getNamespace()));
// If namespace policies have the field set, it will override the broker-level setting
if (policies != null && policies.autoSubscriptionCreationOverride != null) {
return policies.autoSubscriptionCreationOverride;
if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) {
return policies.get().autoSubscriptionCreationOverride;
}
} catch (Throwable t) {
// Ignoring since if we don't have policies, we fallback on the default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,16 +324,18 @@ public DispatchRate getPoliciesDispatchRate(BrokerService brokerService) {
public static Optional<Policies> getPolicies(BrokerService brokerService, String topicName) {
final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject();
final String path = path(POLICIES, namespace.toString());
Policies policies = null;
Optional<Policies >policies = Optional.empty();
try {
ConfigurationCacheService configurationCacheService = brokerService.pulsar().getConfigurationCache();
if (configurationCacheService != null) {
policies = configurationCacheService.policiesCache().getDataIfPresent(path);
policies = configurationCacheService.policiesCache().getAsync(path)
.get(brokerService.pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(),
TimeUnit.SECONDS);
}
} catch (Exception e) {
log.warn("Failed to get message-rate for {} ", topicName, e);
}
return Optional.ofNullable(policies);
return policies;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.admin.AdminResource.POLICIES_READONLY_FLAG_PATH;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.RESOURCEGROUPS;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1216,7 +1216,8 @@ public void testRemoveClusterFromNamespace() throws Exception {
Replicator replicator = pulsar1.getBrokerService().getTopicReference(topicName)
.get().getReplicators().get(cluster4);

Awaitility.await().untilAsserted(() -> Assert.assertTrue(replicator.isConnected()));
Awaitility.waitAtMost(30, TimeUnit.SECONDS)
.untilAsserted(() -> Assert.assertTrue(replicator.isConnected()));

admin1.clusters().deleteCluster(cluster4);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,24 +646,4 @@ public ContainerExecResult enableDeduplication(String nsName, boolean enabled) t
"namespaces", "set-deduplication", "public/" + nsName,
enabled ? "--enable" : "--disable");
}

<<<<<<< HEAD
=======
public void dumpFunctionLogs(String name) {
for (WorkerContainer container : getAlWorkers()) {
log.info("Trying to get function {} logs from container {}", name, container.getContainerName());
try {
String logFile = "/pulsar/logs/functions/public/default/" + name + "/" + name + "-0.log";
String logs = container.<String>copyFileFromContainer(logFile, (inputStream) -> {
return IOUtils.toString(inputStream, "utf-8");
});
log.info("Function {} logs {}", name, logs);
} catch (com.github.dockerjava.api.exception.NotFoundException notFound) {
log.info("Cannot download {} logs from {} not found exception {}", name, container.getContainerName(), notFound.toString());
} catch (Throwable err) {
log.info("Cannot download {} logs from {}", name, container.getContainerName(), err);
}
}
}
>>>>>>> a69611c287d (fix logger number not correct in tests (#12168))
}