From 8a2fda5fa35727ee7a4787f58e4a57062c90c5f4 Mon Sep 17 00:00:00 2001 From: chenhang Date: Mon, 18 Oct 2021 10:49:39 +0800 Subject: [PATCH 1/5] fix cherry-pick issue --- .../integration/topologies/PulsarCluster.java | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index 611d8bdf3cf05..5702370193b2d 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -646,24 +646,4 @@ public ContainerExecResult enableDeduplication(String nsName, boolean enabled) t "namespaces", "set-deduplication", "public/" + nsName, enabled ? "--enable" : "--disable"); } - -<<<<<<< HEAD -======= - public void dumpFunctionLogs(String name) { - for (WorkerContainer container : getAlWorkers()) { - log.info("Trying to get function {} logs from container {}", name, container.getContainerName()); - try { - String logFile = "/pulsar/logs/functions/public/default/" + name + "/" + name + "-0.log"; - String logs = container.copyFileFromContainer(logFile, (inputStream) -> { - return IOUtils.toString(inputStream, "utf-8"); - }); - log.info("Function {} logs {}", name, logs); - } catch (com.github.dockerjava.api.exception.NotFoundException notFound) { - log.info("Cannot download {} logs from {} not found exception {}", name, container.getContainerName(), notFound.toString()); - } catch (Throwable err) { - log.info("Cannot download {} logs from {}", name, container.getContainerName(), err); - } - } - } ->>>>>>> a69611c287d (fix logger number not correct in tests (#12168)) } From 8d1df26b2db12fb1eaa8fc9e7d0be167f7aaa9d1 Mon Sep 17 00:00:00 2001 From: chenhang Date: Mon, 18 Oct 2021 12:59:29 +0800 Subject: [PATCH 2/5] fix code style --- .../java/org/apache/pulsar/broker/web/PulsarWebResource.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index b8645c8567de6..f0ee3a12396c2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.commons.lang3.StringUtils.isBlank; -import static org.apache.pulsar.broker.admin.AdminResource.POLICIES_READONLY_FLAG_PATH; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.RESOURCEGROUPS; import com.fasterxml.jackson.databind.ObjectMapper; From c48ce4eddd191d887b22501785c0f2449c908340 Mon Sep 17 00:00:00 2001 From: chenhang Date: Mon, 18 Oct 2021 14:52:06 +0800 Subject: [PATCH 3/5] fix null ptr exception --- .../java/org/apache/pulsar/broker/service/AbstractTopic.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 62f8fe1f830f9..59da4f254864f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -147,11 +147,11 @@ protected boolean isProducersExceeded() { try { policies = brokerService.pulsar().getConfigurationCache().policiesCache() .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); + maxProducers = policies.max_producers_per_topic; } catch (Exception e) { policies = new Policies(); } - maxProducers = policies.max_producers_per_topic; } maxProducers = maxProducers != null ? maxProducers : brokerService.pulsar() .getConfiguration().getMaxProducersPerTopic(); @@ -193,11 +193,11 @@ protected boolean isConsumersExceededOnTopic() { // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks policies = brokerService.pulsar().getConfigurationCache().policiesCache() .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); + maxConsumers = policies.max_consumers_per_topic; } catch (Exception e) { policies = new Policies(); } - maxConsumers = policies.max_consumers_per_topic; } final int maxConsumersPerTopic = maxConsumers != null ? maxConsumers : brokerService.pulsar().getConfiguration().getMaxConsumersPerTopic(); From 2671ec7c0f97588e5bd3da39e81267db180429bd Mon Sep 17 00:00:00 2001 From: chenhang Date: Wed, 20 Oct 2021 15:30:18 +0800 Subject: [PATCH 4/5] fix cache inconsistency --- .../org/apache/pulsar/broker/service/BrokerService.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 98b2f8f0037d6..76a3bf71b1550 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2568,11 +2568,12 @@ public int getDefaultNumPartitions(final TopicName topicName) { private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName) { try { - Policies policies = pulsar.getConfigurationCache().policiesCache() - .getDataIfPresent(AdminResource.path(POLICIES, topicName.getNamespace())); + Optional policies = pulsar.getConfigurationCache().policiesCache() + .get(AdminResource.path(POLICIES, topicName.getNamespace())); + // If namespace policies have the field set, it will override the broker-level setting - if (policies != null && policies.autoTopicCreationOverride != null) { - return policies.autoTopicCreationOverride; + if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) { + return policies.get().autoTopicCreationOverride; } } catch (Throwable t) { // Ignoring since if we don't have policies, we fallback on the default From daaa882b1f7b88b3ee875334c77e1efdfa65d077 Mon Sep 17 00:00:00 2001 From: chenhang Date: Wed, 20 Oct 2021 22:20:19 +0800 Subject: [PATCH 5/5] fix cache inconsistency --- .../pulsar/broker/service/AbstractTopic.java | 18 +++++++++--------- .../pulsar/broker/service/BrokerService.java | 8 ++++---- .../persistent/DispatchRateLimiter.java | 8 +++++--- .../pulsar/broker/service/ReplicatorTest.java | 3 ++- 4 files changed, 20 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 59da4f254864f..ff08abaad8ecc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -146,12 +146,12 @@ protected boolean isProducersExceeded() { Policies policies; try { policies = brokerService.pulsar().getConfigurationCache().policiesCache() - .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); - maxProducers = policies.max_producers_per_topic; + .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())) + .orElseGet(Policies::new); } catch (Exception e) { policies = new Policies(); } - + maxProducers = policies.max_producers_per_topic; } maxProducers = maxProducers != null ? maxProducers : brokerService.pulsar() .getConfiguration().getMaxProducersPerTopic(); @@ -193,11 +193,13 @@ protected boolean isConsumersExceededOnTopic() { // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks policies = brokerService.pulsar().getConfigurationCache().policiesCache() .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); - maxConsumers = policies.max_consumers_per_topic; + if (policies == null) { + policies = new Policies(); + } } catch (Exception e) { policies = new Policies(); } - + maxConsumers = policies.max_consumers_per_topic; } final int maxConsumersPerTopic = maxConsumers != null ? maxConsumers : brokerService.pulsar().getConfiguration().getMaxConsumersPerTopic(); @@ -773,10 +775,8 @@ private void updatePublishDispatcher(Optional optPolicies) { policies = optPolicies.get(); } else { policies = brokerService.pulsar().getConfigurationCache().policiesCache() - .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); - if (policies == null) { - policies = new Policies(); - } + .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())) + .orElseGet(Policies::new); } } catch (Exception e) { log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 76a3bf71b1550..a0239aa0021ac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2602,11 +2602,11 @@ public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) { private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) { try { - Policies policies = pulsar.getConfigurationCache().policiesCache() - .getDataIfPresent(AdminResource.path(POLICIES, topicName.getNamespace())); + Optional policies = pulsar.getConfigurationCache().policiesCache() + .get(AdminResource.path(POLICIES, topicName.getNamespace())); // If namespace policies have the field set, it will override the broker-level setting - if (policies != null && policies.autoSubscriptionCreationOverride != null) { - return policies.autoSubscriptionCreationOverride; + if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) { + return policies.get().autoSubscriptionCreationOverride; } } catch (Throwable t) { // Ignoring since if we don't have policies, we fallback on the default diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index 91eb0748d2599..229ae171713a9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -324,16 +324,18 @@ public DispatchRate getPoliciesDispatchRate(BrokerService brokerService) { public static Optional getPolicies(BrokerService brokerService, String topicName) { final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject(); final String path = path(POLICIES, namespace.toString()); - Policies policies = null; + Optionalpolicies = Optional.empty(); try { ConfigurationCacheService configurationCacheService = brokerService.pulsar().getConfigurationCache(); if (configurationCacheService != null) { - policies = configurationCacheService.policiesCache().getDataIfPresent(path); + policies = configurationCacheService.policiesCache().getAsync(path) + .get(brokerService.pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(), + TimeUnit.SECONDS); } } catch (Exception e) { log.warn("Failed to get message-rate for {} ", topicName, e); } - return Optional.ofNullable(policies); + return policies; } /** diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 071160b09eb2e..29d86d6b2710a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -1216,7 +1216,8 @@ public void testRemoveClusterFromNamespace() throws Exception { Replicator replicator = pulsar1.getBrokerService().getTopicReference(topicName) .get().getReplicators().get(cluster4); - Awaitility.await().untilAsserted(() -> Assert.assertTrue(replicator.isConnected())); + Awaitility.waitAtMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> Assert.assertTrue(replicator.isConnected())); admin1.clusters().deleteCluster(cluster4);