diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 62f8fe1f830f9..ff08abaad8ecc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -146,11 +146,11 @@ protected boolean isProducersExceeded() { Policies policies; try { policies = brokerService.pulsar().getConfigurationCache().policiesCache() - .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); + .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())) + .orElseGet(Policies::new); } catch (Exception e) { policies = new Policies(); } - maxProducers = policies.max_producers_per_topic; } maxProducers = maxProducers != null ? maxProducers : brokerService.pulsar() @@ -193,10 +193,12 @@ protected boolean isConsumersExceededOnTopic() { // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks policies = brokerService.pulsar().getConfigurationCache().policiesCache() .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); + if (policies == null) { + policies = new Policies(); + } } catch (Exception e) { policies = new Policies(); } - maxConsumers = policies.max_consumers_per_topic; } final int maxConsumersPerTopic = maxConsumers != null ? maxConsumers @@ -773,10 +775,8 @@ private void updatePublishDispatcher(Optional optPolicies) { policies = optPolicies.get(); } else { policies = brokerService.pulsar().getConfigurationCache().policiesCache() - .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); - if (policies == null) { - policies = new Policies(); - } + .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())) + .orElseGet(Policies::new); } } catch (Exception e) { log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 98b2f8f0037d6..a0239aa0021ac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2568,11 +2568,12 @@ public int getDefaultNumPartitions(final TopicName topicName) { private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName) { try { - Policies policies = pulsar.getConfigurationCache().policiesCache() - .getDataIfPresent(AdminResource.path(POLICIES, topicName.getNamespace())); + Optional policies = pulsar.getConfigurationCache().policiesCache() + .get(AdminResource.path(POLICIES, topicName.getNamespace())); + // If namespace policies have the field set, it will override the broker-level setting - if (policies != null && policies.autoTopicCreationOverride != null) { - return policies.autoTopicCreationOverride; + if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) { + return policies.get().autoTopicCreationOverride; } } catch (Throwable t) { // Ignoring since if we don't have policies, we fallback on the default @@ -2601,11 +2602,11 @@ public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) { private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) { try { - Policies policies = pulsar.getConfigurationCache().policiesCache() - .getDataIfPresent(AdminResource.path(POLICIES, topicName.getNamespace())); + Optional policies = pulsar.getConfigurationCache().policiesCache() + .get(AdminResource.path(POLICIES, topicName.getNamespace())); // If namespace policies have the field set, it will override the broker-level setting - if (policies != null && policies.autoSubscriptionCreationOverride != null) { - return policies.autoSubscriptionCreationOverride; + if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) { + return policies.get().autoSubscriptionCreationOverride; } } catch (Throwable t) { // Ignoring since if we don't have policies, we fallback on the default diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index 91eb0748d2599..229ae171713a9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -324,16 +324,18 @@ public DispatchRate getPoliciesDispatchRate(BrokerService brokerService) { public static Optional getPolicies(BrokerService brokerService, String topicName) { final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject(); final String path = path(POLICIES, namespace.toString()); - Policies policies = null; + Optionalpolicies = Optional.empty(); try { ConfigurationCacheService configurationCacheService = brokerService.pulsar().getConfigurationCache(); if (configurationCacheService != null) { - policies = configurationCacheService.policiesCache().getDataIfPresent(path); + policies = configurationCacheService.policiesCache().getAsync(path) + .get(brokerService.pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(), + TimeUnit.SECONDS); } } catch (Exception e) { log.warn("Failed to get message-rate for {} ", topicName, e); } - return Optional.ofNullable(policies); + return policies; } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index b8645c8567de6..f0ee3a12396c2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.commons.lang3.StringUtils.isBlank; -import static org.apache.pulsar.broker.admin.AdminResource.POLICIES_READONLY_FLAG_PATH; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.RESOURCEGROUPS; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 071160b09eb2e..29d86d6b2710a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -1216,7 +1216,8 @@ public void testRemoveClusterFromNamespace() throws Exception { Replicator replicator = pulsar1.getBrokerService().getTopicReference(topicName) .get().getReplicators().get(cluster4); - Awaitility.await().untilAsserted(() -> Assert.assertTrue(replicator.isConnected())); + Awaitility.waitAtMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> Assert.assertTrue(replicator.isConnected())); admin1.clusters().deleteCluster(cluster4); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index 611d8bdf3cf05..5702370193b2d 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -646,24 +646,4 @@ public ContainerExecResult enableDeduplication(String nsName, boolean enabled) t "namespaces", "set-deduplication", "public/" + nsName, enabled ? "--enable" : "--disable"); } - -<<<<<<< HEAD -======= - public void dumpFunctionLogs(String name) { - for (WorkerContainer container : getAlWorkers()) { - log.info("Trying to get function {} logs from container {}", name, container.getContainerName()); - try { - String logFile = "/pulsar/logs/functions/public/default/" + name + "/" + name + "-0.log"; - String logs = container.copyFileFromContainer(logFile, (inputStream) -> { - return IOUtils.toString(inputStream, "utf-8"); - }); - log.info("Function {} logs {}", name, logs); - } catch (com.github.dockerjava.api.exception.NotFoundException notFound) { - log.info("Cannot download {} logs from {} not found exception {}", name, container.getContainerName(), notFound.toString()); - } catch (Throwable err) { - log.info("Cannot download {} logs from {}", name, container.getContainerName(), err); - } - } - } ->>>>>>> a69611c287d (fix logger number not correct in tests (#12168)) }