diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index af6e2d1418059..abcaf940815f1 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2959,6 +2959,13 @@ public double getLoadBalancerBandwidthOutResourceWeight() { ) private boolean loadBalancerSheddingBundlesWithPoliciesEnabled = false; + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "The namespaces to be excluded from load shedding" + ) + private Set loadBalancerSheddingExcludedNamespaces = new HashSet<>(); + @FieldContext( category = CATEGORY_LOAD_BALANCER, doc = "Time to wait before fixing any stuck in-flight service unit states. " diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index ef29d7d9a74f3..af95df60174db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -84,6 +84,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy; import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategyFactory; import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight; +import org.apache.pulsar.broker.loadbalance.extensions.strategy.RoundRobinBrokerSelectionStrategy; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies; import org.apache.pulsar.broker.namespace.LookupOptions; @@ -161,6 +162,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS @Getter private final BrokerSelectionStrategy brokerSelectionStrategy; + private final BrokerSelectionStrategy sheddingExcludedNamespaceSelectionStrategy; + @Getter private final List brokerFilterPipeline; @@ -254,6 +257,7 @@ public ExtensibleLoadManagerImpl() { this.brokerFilterPipeline.add(new BrokerMaxTopicCountFilter()); this.brokerFilterPipeline.add(new BrokerVersionFilter()); this.brokerSelectionStrategy = createBrokerSelectionStrategy(); + this.sheddingExcludedNamespaceSelectionStrategy = new RoundRobinBrokerSelectionStrategy(); } public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) { @@ -636,11 +640,33 @@ public CompletableFuture> selectAsync(ServiceUnitId bundle, return Optional.empty(); } Set candidateBrokers = availableBrokerCandidates.keySet(); - return getBrokerSelectionStrategy().select(candidateBrokers, bundle, context); + return getBrokerSelectionStrategy(bundle).select(candidateBrokers, bundle, context); }); }); } + /** + * For shedding excluded namespaces, use RoundRobinBrokerSelector to assign the ownership, + * it can make the assignment more average because these will not automatically rebalance to + * another broker unless manually unloaded it. + * + * @param bundle the bundle to assign + * @return the broker selection strategy + */ + private BrokerSelectionStrategy getBrokerSelectionStrategy(ServiceUnitId bundle) { + + Set sheddingExcludedNamespaces = conf.getLoadBalancerSheddingExcludedNamespaces(); + + var namespace = NamespaceBundle.getBundleNamespace(bundle.toString()); + if (sheddingExcludedNamespaces.contains(namespace)) { + if (debug(conf, log)) { + log.info("Use round robin broker selector for {}", bundle); + } + return sheddingExcludedNamespaceSelectionStrategy; + } + return brokerSelectionStrategy; + } + @Override public CompletableFuture checkOwnershipAsync(Optional topic, ServiceUnitId bundleUnit) { return getOwnershipAsync(topic, bundleUnit) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java index 9c6e963417813..481e907d04439 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.ToString; @@ -68,8 +69,10 @@ public TopKBundles(PulsarService pulsar) { public void update(Map bundleStats, int topk) { arr.clear(); try { + var conf = pulsar.getConfiguration(); var isLoadBalancerSheddingBundlesWithPoliciesEnabled = - pulsar.getConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled(); + conf.isLoadBalancerSheddingBundlesWithPoliciesEnabled(); + Set sheddingExcludedNamespaces = conf.getLoadBalancerSheddingExcludedNamespaces(); for (var etr : bundleStats.entrySet()) { String bundle = etr.getKey(); var stat = etr.getValue(); @@ -79,12 +82,16 @@ public void update(Map bundleStats, int topk) { continue; } // TODO: do not filter system topic while shedding - if (NamespaceService.isSystemServiceNamespace(NamespaceBundle.getBundleNamespace(bundle))) { + String namespace = NamespaceBundle.getBundleNamespace(bundle); + if (NamespaceService.isSystemServiceNamespace(namespace)) { continue; } if (!isLoadBalancerSheddingBundlesWithPoliciesEnabled && hasPolicies(bundle)) { continue; } + if (sheddingExcludedNamespaces.contains(namespace)) { + continue; + } arr.add(etr); } var topKBundlesLoadData = loadData.getTopBundlesLoadData(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java index b5255f2713a6a..18555bc18fac9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java @@ -493,6 +493,7 @@ public Set findBundlesForUnloading(LoadManagerContext context, } int remainingTopBundles = maxBrokerTopBundlesLoadData.size(); + Set sheddingExcludedNamespaces = conf.getLoadBalancerSheddingExcludedNamespaces(); for (var e : maxBrokerTopBundlesLoadData) { String bundle = e.bundleName(); if (channel != null && !channel.isOwner(bundle, maxBroker)) { @@ -502,6 +503,14 @@ public Set findBundlesForUnloading(LoadManagerContext context, } continue; } + final String namespaceName = NamespaceBundle.getBundleNamespace(bundle); + if (sheddingExcludedNamespaces.contains(namespaceName)) { + if (debugMode) { + log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG + + " Bundle namespace has been found in sheddingExcludedNamespaces", bundle)); + } + continue; + } if (recentlyUnloadedBundles.containsKey(bundle)) { if (debugMode) { log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/RoundRobinBrokerSelectionStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/RoundRobinBrokerSelectionStrategy.java new file mode 100644 index 0000000000000..2f356ec1f5ec1 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/RoundRobinBrokerSelectionStrategy.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.strategy; + +import java.util.Optional; +import java.util.Set; +import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; +import org.apache.pulsar.broker.loadbalance.impl.RoundRobinBrokerSelector; +import org.apache.pulsar.common.naming.ServiceUnitId; + +/** + * Simple Round Robin Broker Selection Strategy. + */ +public class RoundRobinBrokerSelectionStrategy implements BrokerSelectionStrategy { + private final RoundRobinBrokerSelector selector = new RoundRobinBrokerSelector(); + + @Override + public Optional select(Set brokers, ServiceUnitId bundle, LoadManagerContext context) { + return selector.selectBroker(brokers, null, null, context.brokerConfiguration()); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index f86b608d93722..75c60e2687942 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -151,6 +151,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager { // Strategy used to determine where new topics should be placed. private ModularLoadManagerStrategy placementStrategy; + private ModularLoadManagerStrategy sheddingExcludedNamespaceSelectionStrategy; + // Policies used to determine which brokers are available for particular namespaces. private SimpleResourceAllocationPolicies policies; @@ -251,6 +253,7 @@ public void initialize(final PulsarService pulsar) { defaultStats.msgRateOut = DEFAULT_MESSAGE_RATE; placementStrategy = ModularLoadManagerStrategy.create(conf); + sheddingExcludedNamespaceSelectionStrategy = new RoundRobinBrokerSelector(); policies = new SimpleResourceAllocationPolicies(pulsar); filterPipeline.add(new BrokerLoadManagerClassFilter()); filterPipeline.add(new BrokerVersionFilter()); @@ -630,6 +633,7 @@ public synchronized void doLoadShedding() { final Map recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles(); recentlyUnloadedBundles.keySet().removeIf(e -> recentlyUnloadedBundles.get(e) < timeout); + Set sheddingExcludedNamespaces = conf.getLoadBalancerSheddingExcludedNamespaces(); final Multimap bundlesToUnload = loadSheddingStrategy.findBundlesForUnloading(loadData, conf); bundlesToUnload.asMap().forEach((broker, bundles) -> { @@ -637,6 +641,13 @@ public synchronized void doLoadShedding() { bundles.forEach(bundle -> { final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle); final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle); + if (sheddingExcludedNamespaces.contains(namespaceName)) { + if (log.isDebugEnabled()) { + log.debug("[{}] Skipping load shedding for namespace {}", + loadSheddingStrategy.getClass().getSimpleName(), namespaceName); + } + return; + } if (!shouldNamespacePoliciesUnload(namespaceName, bundleRange, broker)) { return; } @@ -914,8 +925,22 @@ Optional selectBroker(final ServiceUnitId serviceUnit) { brokerTopicLoadingPredicate); } - // Choose a broker among the potentially smaller filtered list, when possible - Optional broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf); + Optional broker; + // For shedding excluded namespaces, use RoundRobinBrokerSelector to assign the ownership, + // it can make the assignment more average because these will not automatically rebalance to + // another broker unless manually unloaded it. + Set sheddingExcludedNamespaces = conf.getLoadBalancerSheddingExcludedNamespaces(); + String namespaceNameFromBundleName = LoadManagerShared.getNamespaceNameFromBundleName(bundle); + if (sheddingExcludedNamespaces.contains(namespaceNameFromBundleName)) { + if (log.isDebugEnabled()) { + log.debug("Use round robin broker selector for {}", bundle); + } + broker = sheddingExcludedNamespaceSelectionStrategy + .selectBroker(brokerCandidateCache, data, loadData, conf); + } else { + // Choose a broker among the potentially smaller filtered list, when possible + broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf); + } if (log.isDebugEnabled()) { log.debug("Selected broker {} from candidate brokers {}", broker, brokerCandidateCache); } @@ -1122,7 +1147,15 @@ public void writeBrokerDataOnZooKeeper(boolean force) { */ private int selectTopKBundle() { bundleArr.clear(); - bundleArr.addAll(loadData.getBundleData().entrySet()); + Set sheddingExcludedNamespaces = conf.getLoadBalancerSheddingExcludedNamespaces(); + for (Map.Entry entry : loadData.getBundleData().entrySet()) { + String bundle = entry.getKey(); + String namespace = NamespaceBundle.getBundleNamespace(bundle); + if (sheddingExcludedNamespaces.contains(namespace)) { + continue; + } + bundleArr.add(entry); + } int maxNumberOfBundlesInBundleLoadReport = pulsar.getConfiguration() .getLoadBalancerMaxNumberOfBundlesInBundleLoadReport(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 0417b6fc14412..060e41bbea667 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -205,6 +205,41 @@ public void testAssign() throws Exception { assertEquals(webServiceUrl.get().toString(), brokerLookupData.get().getWebServiceUrl()); } + // Test that the load manager will use round-robin assignment + // if the namespace is in loadBalancerSheddingExcludedNamespaces. + @Test + public void testSelectBrokerForSheddingExcludedNamespaces() throws Exception { + pulsar1.getConfiguration().setLoadBalancerSheddingExcludedNamespaces(Set.of(defaultTestNamespace)); + try { + Pair topicAndBundle = + getBundleIsNotOwnByChangeEventTopic("test-topic" + UUID.randomUUID()); + NamespaceBundle bundle1 = topicAndBundle.getRight(); + Optional brokerLookupData1 = primaryLoadManager.assign(Optional.empty(), bundle1, + LookupOptions.builder().build()).get(); + assertTrue(brokerLookupData1.isPresent()); + log.info("Assign the bundle1 {} to {}", bundle1, brokerLookupData1); + + String webServiceUrl1 = brokerLookupData1.get().getWebServiceUrl(); + + Pair topicAndBundle2 = + getBundleIsNotOwnByChangeEventTopic("test-topic-" + UUID.randomUUID()); + + while (topicAndBundle2.getRight().toString().equals(topicAndBundle.getRight().toString()) + || primaryLoadManager.checkOwnershipAsync(Optional.empty(), topicAndBundle2.getRight()).get()) { + topicAndBundle2 = getBundleIsNotOwnByChangeEventTopic("test-topic-" + UUID.randomUUID()); + } + NamespaceBundle bundle2 = topicAndBundle2.getRight(); + Optional brokerLookupData2 = primaryLoadManager.assign(Optional.empty(), bundle2, + LookupOptions.builder().build()).get(); + assertTrue(brokerLookupData2.isPresent()); + log.info("Assign the bundle2 {} to {}", bundle2, brokerLookupData2); + String webServiceUrl2 = brokerLookupData2.get().getWebServiceUrl(); + assertNotEquals(webServiceUrl1, webServiceUrl2); + } finally { + pulsar1.getConfiguration().setLoadBalancerSheddingExcludedNamespaces(Set.of()); + } + } + @Test public void testLookupOptions() throws Exception { Pair topicAndBundle = diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java index 3445ab393be7a..2fa1db3bacc36 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Optional; import java.util.Random; +import java.util.Set; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; @@ -136,6 +137,27 @@ public void testSystemNamespace() { assertEquals(top0.bundleName(), bundle1); } + @Test + public void testSheddingExcludedNamespaces() { + Map bundleStats = new HashMap<>(); + var topKBundles = new TopKBundles(pulsar); + pulsar.getConfiguration().setLoadBalancerSheddingExcludedNamespaces(Set.of("my-tenant/my-namespace2")); + NamespaceBundleStats stats1 = new NamespaceBundleStats(); + stats1.msgRateIn = 500; + bundleStats.put("my-tenant/my-namespace2/0x00000000_0x0FFFFFFF", stats1); + + NamespaceBundleStats stats2 = new NamespaceBundleStats(); + stats2.msgRateIn = 10000; + stats2.msgThroughputOut = 10; + bundleStats.put(bundle1, stats2); + + topKBundles.update(bundleStats, 2); + + assertEquals(topKBundles.getLoadData().getTopBundlesLoadData().size(), 1); + var top0 = topKBundles.getLoadData().getTopBundlesLoadData().get(0); + assertEquals(top0.bundleName(), bundle1); + } + @Test public void testZeroMsgThroughputBundleStats() { Map bundleStats = new HashMap<>(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java index 5e20b196c9a5a..8d0509805472d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java @@ -615,6 +615,25 @@ public void testRecentlyUnloadedBundles() { assertEquals(counter.getLoadStd(), setupLoadStd); } + @Test + public void testSheddingExcludedNamespaces() { + UnloadCounter counter = new UnloadCounter(); + TransferShedder transferShedder = new TransferShedder(counter); + var ctx = setupContext(); + ctx.brokerConfiguration().setLoadBalancerSheddingExcludedNamespaces( + Set.of("my-tenant/my-namespaceE", "my-tenant/my-namespaceD")); + + var res = transferShedder.findBundlesForUnloading(ctx, new HashMap<>(), Map.of()); + var expected = new HashSet(); + expected.add(new UnloadDecision(new Unload("broker3:8080", + "my-tenant/my-namespaceC/0x00000000_0x0FFFFFFF", + Optional.of("broker1:8080")), + Success, Overloaded)); + assertEquals(res, expected); + assertEquals(counter.getLoadAvg(), setupLoadAvg); + assertEquals(counter.getLoadStd(), setupLoadStd); + } + @Test public void testGetAvailableBrokersFailed() { UnloadCounter counter = new UnloadCounter();