From d74ce77b7d5494047c19ed2639a7862413fc35fc Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 18 Aug 2022 17:07:33 +0800 Subject: [PATCH 1/2] [fix][flaky-test]AdminApi2Test.testDeleteNamespace --- .../pulsar/broker/admin/AdminApi2Test.java | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index cf1a3d128761b..3566998823f88 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -19,6 +19,8 @@ package org.apache.pulsar.broker.admin; import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.pulsar.common.naming.SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME; +import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -45,6 +47,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import javax.ws.rs.NotAcceptableException; import javax.ws.rs.core.Response.Status; @@ -1422,6 +1425,9 @@ public void testDeleteNamespace() throws Exception { admin.topics().createPartitionedTopic(topic, 10); assertFalse(admin.topics().getList(namespace).isEmpty()); + // Wait for change event topic and compaction create finish. + awaitChangeEventTopicAndCompactionCreateFinish(namespace, String.format("persistent://%s", topic)); + try { admin.namespaces().deleteNamespace(namespace, false); fail("should have failed due to namespace not empty"); @@ -1445,7 +1451,49 @@ public void testDeleteNamespace() throws Exception { final String bundleDataPath = "/loadbalance/bundle-data/" + namespace; assertFalse(pulsar.getLocalMetadataStore().exists(bundleDataPath).join()); + } + private void awaitChangeEventTopicAndCompactionCreateFinish(String ns, String topic) throws Exception { + if (!pulsar.getConfiguration().isSystemTopicEnabled()){ + return; + } + // Trigger change event topic create. + Consumer consumer = pulsarClient.newConsumer().subscriptionName("del-ns-sub").topic(topic).subscribe(); + consumer.close(); + // Wait for change event topic and compaction create finish. + String allowAutoTopicCreationType = pulsar.getConfiguration().getAllowAutoTopicCreationType(); + int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions(); + ArrayList expectChangeEventTopics = new ArrayList<>(); + if ("non-partitioned".equals(allowAutoTopicCreationType)){ + String t = String.format("persistent://%s/%s", ns, NAMESPACE_EVENTS_LOCAL_NAME); + expectChangeEventTopics.add(t); + } else { + for (int i = 0; i < defaultNumPartitions; i++){ + String t = String.format("persistent://%s/%s-partition-%s", ns, NAMESPACE_EVENTS_LOCAL_NAME, i); + expectChangeEventTopics.add(t); + } + } + Awaitility.await().until(() -> { + boolean finished = true; + for (String changeEventTopicName : expectChangeEventTopics){ + CompletableFuture> completableFuture = pulsar.getBrokerService().getTopic(changeEventTopicName, false); + if (completableFuture == null){ + finished = false; + } + Optional optionalTopic = completableFuture.get(); + if (!optionalTopic.isPresent()){ + finished = false; + } + PersistentTopic changeEventTopic = (PersistentTopic) optionalTopic.get(); + if (!changeEventTopic.isCompactionEnabled()){ + continue; + } + if (!changeEventTopic.getSubscriptions().containsKey(COMPACTION_SUBSCRIPTION)){ + finished = false; + } + } + return finished; + }); } @Test From cb816802c6d28ded446c03afb9cd56f20d443a85 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 19 Aug 2022 10:10:31 +0800 Subject: [PATCH 2/2] use topic policy change instead of new consumer to trigger system topic create --- .../java/org/apache/pulsar/broker/admin/AdminApi2Test.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 3566998823f88..a28e810bf8566 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -105,6 +105,7 @@ import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicStats; @@ -1458,8 +1459,8 @@ private void awaitChangeEventTopicAndCompactionCreateFinish(String ns, String to return; } // Trigger change event topic create. - Consumer consumer = pulsarClient.newConsumer().subscriptionName("del-ns-sub").topic(topic).subscribe(); - consumer.close(); + SubscribeRate subscribeRate = new SubscribeRate(-1, 60); + admin.topicPolicies().setSubscribeRate(topic, subscribeRate); // Wait for change event topic and compaction create finish. String allowAutoTopicCreationType = pulsar.getConfiguration().getAllowAutoTopicCreationType(); int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions();