Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -554,4 +554,10 @@ public static void filterBrokersWithLargeTopicCount(Set<String> brokerCandidateC
brokerCandidateCache.addAll(filteredBrokerCandidates);
}
}

public static NamespaceBundle getNamespaceBundle(PulsarService pulsar, String bundle) {
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
return pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName, bundleRange);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
Expand Down Expand Up @@ -666,11 +667,24 @@ public synchronized void doLoadShedding() {
if (!shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) {
return;
}
NamespaceBundle bundleToUnload = LoadManagerShared.getNamespaceBundle(pulsar, bundle);
Optional<String> destBroker = this.selectBroker(bundleToUnload);
if (!destBroker.isPresent()) {
log.info("[{}] No broker available to unload bundle {} from broker {}",
strategy.getClass().getSimpleName(), bundle, broker);
return;
}
if (destBroker.get().equals(broker)) {
log.warn("[{}] The destination broker {} is the same as the current owner broker for Bundle {}",
strategy.getClass().getSimpleName(), destBroker.get(), bundle);
return;
}

log.info("[{}] Unloading bundle: {} from broker {}",
strategy.getClass().getSimpleName(), bundle, broker);
log.info("[{}] Unloading bundle: {} from broker {} to dest broker {}",
strategy.getClass().getSimpleName(), bundle, broker, destBroker.get());
try {
pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName, bundleRange);
pulsar.getAdminClient().namespaces()
.unloadNamespaceBundle(namespaceName, bundleRange);
loadData.getRecentlyUnloadedBundles().put(bundle, System.currentTimeMillis());
} catch (PulsarServerException | PulsarAdminException e) {
log.warn("Error when trying to perform load shedding on {} for broker {}", bundle, broker, e);
Expand Down Expand Up @@ -837,99 +851,119 @@ public Optional<String> selectBrokerForAssignment(final ServiceUnitId serviceUni
// If the given bundle is already in preallocated, return the selected broker.
return Optional.of(preallocatedBundleToBroker.get(bundle));
}
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
key -> getBundleDataOrDefault(bundle));
brokerCandidateCache.clear();
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);

// filter brokers which owns topic higher than threshold
LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, loadData,
conf.getLoadBalancerBrokerMaxTopics());

// distribute namespaces to domain and brokers according to anti-affinity-group
LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, serviceUnit.toString(),
brokerCandidateCache,
brokerToNamespaceToBundleRange, brokerToFailureDomainMap);

// distribute bundles evenly to candidate-brokers if enable
if (conf.isLoadBalancerDistributeBundlesEvenlyEnabled()) {
LoadManagerShared.removeMostServicingBrokersForNamespace(serviceUnit.toString(),
brokerCandidateCache,
brokerToNamespaceToBundleRange);
if (log.isDebugEnabled()) {
log.debug("enable distribute bundles evenly to candidate-brokers, broker candidate count={}",
brokerCandidateCache.size());
}
Optional<String> broker = selectBroker(serviceUnit);
if (!broker.isPresent()) {
// If no broker is selected, return empty.
return broker;
}
log.info("{} brokers being considered for assignment of {}", brokerCandidateCache.size(), bundle);
// Add new bundle to preallocated.
preallocateBundle(bundle, broker.get());
return broker;
}
} finally {
selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
}
}

// Use the filter pipeline to finalize broker candidates.
try {
for (BrokerFilter filter : filterPipeline) {
filter.filter(brokerCandidateCache, data, loadData, conf);
}
} catch (BrokerFilterException x) {
// restore the list of brokers to the full set
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);
}
private void preallocateBundle(String bundle, String broker) {
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
key -> getBundleDataOrDefault(bundle));
loadData.getBrokerData().get(broker).getPreallocatedBundleData().put(bundle, data);
preallocatedBundleToBroker.put(bundle, broker);

final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
brokerToNamespaceToBundleRange
.computeIfAbsent(broker,
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder()
.build());
synchronized (namespaceToBundleRange) {
namespaceToBundleRange.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.add(bundleRange);
}
}

if (brokerCandidateCache.isEmpty()) {
// restore the list of brokers to the full set
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);
}
@VisibleForTesting
Optional<String> selectBroker(final ServiceUnitId serviceUnit) {
synchronized (brokerCandidateCache) {
final String bundle = serviceUnit.toString();
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
key -> getBundleDataOrDefault(bundle));
brokerCandidateCache.clear();
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);

// filter brokers which owns topic higher than threshold
LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, loadData,
conf.getLoadBalancerBrokerMaxTopics());

// Choose a broker among the potentially smaller filtered list, when possible
Optional<String> broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
// distribute namespaces to domain and brokers according to anti-affinity-group
LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, bundle,
brokerCandidateCache,
brokerToNamespaceToBundleRange, brokerToFailureDomainMap);

// distribute bundles evenly to candidate-brokers if enable
if (conf.isLoadBalancerDistributeBundlesEvenlyEnabled()) {
LoadManagerShared.removeMostServicingBrokersForNamespace(bundle,
brokerCandidateCache,
brokerToNamespaceToBundleRange);
if (log.isDebugEnabled()) {
log.debug("Selected broker {} from candidate brokers {}", broker, brokerCandidateCache);
log.debug("enable distribute bundles evenly to candidate-brokers, broker candidate count={}",
brokerCandidateCache.size());
}
}

if (!broker.isPresent()) {
// No brokers available
return broker;
}
log.info("{} brokers being considered for assignment of {}", brokerCandidateCache.size(), bundle);

final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
final double maxUsage = loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
if (maxUsage > overloadThreshold) {
// All brokers that were in the filtered list were overloaded, so check if there is a better broker
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);
Optional<String> brokerTmp =
placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
if (brokerTmp.isPresent()) {
broker = brokerTmp;
}
// Use the filter pipeline to finalize broker candidates.
try {
for (BrokerFilter filter : filterPipeline) {
filter.filter(brokerCandidateCache, data, loadData, conf);
}
} catch (BrokerFilterException x) {
// restore the list of brokers to the full set
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);
}

// Add new bundle to preallocated.
loadData.getBrokerData().get(broker.get()).getPreallocatedBundleData().put(bundle, data);
preallocatedBundleToBroker.put(bundle, broker.get());

final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
brokerToNamespaceToBundleRange
.computeIfAbsent(broker.get(),
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder()
.build());
synchronized (namespaceToBundleRange) {
namespaceToBundleRange.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.add(bundleRange);
}
if (brokerCandidateCache.isEmpty()) {
// restore the list of brokers to the full set
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);
}

// Choose a broker among the potentially smaller filtered list, when possible
Optional<String> broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
if (log.isDebugEnabled()) {
log.debug("Selected broker {} from candidate brokers {}", broker, brokerCandidateCache);
}

if (!broker.isPresent()) {
// No brokers available
return broker;
}
} finally {
selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);

final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
final double maxUsage = loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
if (maxUsage > overloadThreshold) {
// All brokers that were in the filtered list were overloaded, so check if there is a better broker
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);
Optional<String> brokerTmp =
placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
if (brokerTmp.isPresent()) {
broker = brokerTmp;
}
}
return broker;
}
}

Expand Down
Loading