From 6df36ded80dc9d3282cd00668535e39305154c2f Mon Sep 17 00:00:00 2001 From: zhaoli Date: Sun, 8 Oct 2023 17:33:28 +0800 Subject: [PATCH] =?UTF-8?q?[Feature]=E9=9B=86=E7=BE=A4=E5=9D=87=E8=A1=A1?= =?UTF-8?q?=E6=94=AF=E6=8C=81LogDir=E7=BA=A7=E5=88=AB=E7=9A=84Disk?= =?UTF-8?q?=E5=9D=87=E8=A1=A1(#1164)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../executor/common/BalanceTask.java | 10 + .../executor/common/OptimizerResult.java | 2 + .../km/rebalance/algorithm/model/Broker.java | 37 +++- .../algorithm/model/ClusterModel.java | 53 +++-- .../km/rebalance/algorithm/model/LogDir.java | 137 ++++++++++++ .../km/rebalance/algorithm/model/Replica.java | 18 +- .../rebalance/algorithm/model/Supplier.java | 34 ++- .../algorithm/optimizer/AnalyzerUtils.java | 26 +++ .../algorithm/optimizer/BalancingAction.java | 26 +++ .../optimizer/goals/AbstractGoal.java | 2 +- .../optimizer/goals/AbstractLogDirGoal.java | 136 ++++++++++++ .../goals/ResourceDistributionGoal.java | 201 +++++++++++------- .../algorithm/utils/MetadataUtils.java | 92 +++++++- 13 files changed, 661 insertions(+), 113 deletions(-) create mode 100644 km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/model/LogDir.java create mode 100644 km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/optimizer/goals/AbstractLogDirGoal.java diff --git a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/executor/common/BalanceTask.java b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/executor/common/BalanceTask.java index f79b8f3ab..3b6265f52 100644 --- a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/executor/common/BalanceTask.java +++ b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/executor/common/BalanceTask.java @@ -7,6 +7,7 @@ public class BalanceTask { private int partition; //副本分配列表 private List replicas; + private List logDirs; public String getTopic() { return topic; @@ -32,12 +33,21 @@ public void setReplicas(List replicas) { this.replicas = replicas; } + public List getLogDirs() { + return logDirs; + } + + public void setLogDirs(List logDirs) { + this.logDirs = logDirs; + } + @Override public String toString() { return "BalanceTask{" + "topic='" + topic + '\'' + ", partition=" + partition + ", replicas=" + replicas + + ", logDirs=" + logDirs + '}'; } } diff --git a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/executor/common/OptimizerResult.java b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/executor/common/OptimizerResult.java index 18abdcd9a..3921167ea 100644 --- a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/executor/common/OptimizerResult.java +++ b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/executor/common/OptimizerResult.java @@ -120,6 +120,8 @@ public List resultTask() { task.setPartition(proposal.tp().partition()); List replicas = proposal.newReplicas().stream().map(ReplicaPlacementInfo::brokerId).collect(Collectors.toList()); task.setReplicas(replicas); + List logDirs = proposal.newReplicas().stream().map(ReplicaPlacementInfo::logdir).collect(Collectors.toList()); + task.setLogDirs(logDirs); balanceTasks.add(task); }); return balanceTasks; diff --git a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/model/Broker.java b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/model/Broker.java index a2b3f2694..e6e393725 100644 --- a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/model/Broker.java +++ b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/model/Broker.java @@ -3,6 +3,7 @@ import org.apache.kafka.common.TopicPartition; import java.util.*; +import java.util.concurrent.ThreadLocalRandom; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -11,19 +12,16 @@ * @date 2022/4/29 */ public class Broker implements Comparable { - public static final Broker NONE = new Broker(new Rack("-1"), -1, "localhost", true, new Capacity()); private final Rack rack; private final int id; private final String host; private final boolean isOffline; - private final Set replicas; private final Set leaderReplicas; private final Map> topicReplicas; - + private final Map logDirs; private final Load load; - private final Capacity capacity; public Broker(Rack rack, int id, String host, boolean isOffline, Capacity capacity) { @@ -34,10 +32,15 @@ public Broker(Rack rack, int id, String host, boolean isOffline, Capacity capaci this.replicas = new HashSet<>(); this.leaderReplicas = new HashSet<>(); this.topicReplicas = new HashMap<>(); + this.logDirs = new HashMap<>(); this.load = new Load(); this.capacity = capacity; } + public void addLogDir(LogDir logDir) { + this.logDirs.put(logDir.name(), logDir); + } + public Rack rack() { return rack; } @@ -58,6 +61,10 @@ public Set replicas() { return Collections.unmodifiableSet(this.replicas); } + public Map logDirs() { + return Collections.unmodifiableMap(this.logDirs); + } + public SortedSet sortedReplicasFor(Resource resource, boolean reverse) { return sortedReplicasFor(null, resource, reverse); } @@ -84,27 +91,34 @@ public Set leaderReplicas() { } public Load load() { - return load; + return this.load; } public Capacity capacity() { return capacity; } + public double utilizationFor(Resource resource) { return this.load.loadFor(resource) / this.capacity.capacityFor(resource); } + public double expectedUtilizationAfterAdd(Resource resource, Load loadToChange) { return (this.load.loadFor(resource) + ((loadToChange == null) ? 0 : loadToChange.loadFor(resource))) / this.capacity.capacityFor(resource); } + public double expectedUtilizationAfterRemove(Resource resource, Load loadToChange) { return (this.load.loadFor(resource) - ((loadToChange == null) ? 0 : loadToChange.loadFor(resource))) / this.capacity.capacityFor(resource); } + public LogDir logDir(String name) { + return this.logDirs.get(name); + } + public Replica replica(TopicPartition topicPartition) { Map replicas = this.topicReplicas.get(topicPartition.topic()); if (replicas == null) { @@ -113,7 +127,7 @@ public Replica replica(TopicPartition topicPartition) { return replicas.get(topicPartition.partition()); } - void addReplica(Replica replica) { + void addReplica(String logDir, Replica replica) { // Add replica to list of all replicas in the broker. if (this.replicas.contains(replica)) { throw new IllegalStateException(String.format("Broker %d already has replica %s", this.id, @@ -131,9 +145,12 @@ void addReplica(Replica replica) { // Add replica load to the broker load. this.load.addLoad(replica.load()); + + // Add replica to list of replicas in the logDir + this.logDirs.get(logDir).addReplica(replica); } - Replica removeReplica(TopicPartition topicPartition) { + Replica removeReplica(String sourceLogDir, TopicPartition topicPartition) { Replica replica = replica(topicPartition); if (replica != null) { this.replicas.remove(replica); @@ -145,6 +162,7 @@ Replica removeReplica(TopicPartition topicPartition) { this.leaderReplicas.remove(replica); } this.load.subtractLoad(replica.load()); + this.logDirs.get(sourceLogDir).removeReplica(topicPartition); } return replica; } @@ -193,6 +211,7 @@ public String toString() { ", replicas=" + replicas + ", leaderReplicas=" + leaderReplicas + ", topicReplicas=" + topicReplicas + + ", logDirs=" + logDirs + ", load=" + load + ", capacity=" + capacity + '}'; @@ -219,4 +238,8 @@ public Collection replicasOfTopicInBroker(String topic) { public Set currentOfflineReplicas() { return replicas.stream().filter(Replica::isCurrentOffline).collect(Collectors.toSet()); } + + public LogDir randomLogDir() { + return logDirs.values().stream().collect(Collectors.toList()).get(ThreadLocalRandom.current().nextInt(logDirs.size())); + } } diff --git a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/model/ClusterModel.java b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/model/ClusterModel.java index 57ef98a1a..f62791660 100644 --- a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/model/ClusterModel.java +++ b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/model/ClusterModel.java @@ -67,6 +67,25 @@ public SortedSet sortedBrokersFor(Predicate filter, Reso return sortedBrokers; } + public SortedSet sortedLogDirsFor(Predicate filter, Resource resource, boolean reverse) { + Comparator comparator = + Comparator.comparingDouble(b -> b.utilizationFor(resource)); + if (reverse) + comparator = comparator.reversed(); + + SortedSet sortedLogDirs = new TreeSet<>(comparator); + this.brokersById.values().stream().forEach(broker -> { + if (filter == null) { + sortedLogDirs.addAll(broker.logDirs().values()); + } else { + sortedLogDirs.addAll(broker.logDirs().values().stream() + .filter(filter).collect(Collectors.toList())); + } + }); + + return sortedLogDirs; + } + public Load load() { Load load = new Load(); for (Broker broker : this.brokersById.values()) { @@ -101,30 +120,33 @@ public Broker broker(int brokerId) { return this.brokersById.get(brokerId); } - public Broker addBroker(String rackId, int brokerId, String host, boolean isOffline, Capacity capacity) { + public Broker addBroker(String rackId, int brokerId, String host, boolean isOffline, Capacity capacity, Map logDirCapacities) { Rack rack = rack(rackId); if (rack == null) throw new IllegalArgumentException("Rack: " + rackId + "is not exists."); Broker broker = new Broker(rack, brokerId, host, isOffline, capacity); + for(Map.Entry entry : logDirCapacities.entrySet()) { + broker.addLogDir(new LogDir(entry.getKey(), broker, entry.getValue())); + } rack.addBroker(broker); this.brokersById.put(brokerId, broker); return broker; } - public Replica addReplica(int brokerId, TopicPartition topicPartition, boolean isLeader, Load load) { - return addReplica(brokerId, topicPartition, isLeader, false, load); + public Replica addReplica(int brokerId, String logDir, TopicPartition topicPartition, boolean isLeader, Load load) { + return addReplica(brokerId, logDir, topicPartition, isLeader, false, load); } - public Replica addReplica(int brokerId, TopicPartition topicPartition, boolean isLeader, boolean isOffline, Load load) { + public Replica addReplica(int brokerId, String logDir, TopicPartition topicPartition, boolean isLeader, boolean isOffline, Load load) { Broker broker = broker(brokerId); if (broker == null) { throw new IllegalArgumentException("Broker: " + brokerId + "is not exists."); } - Replica replica = new Replica(broker, topicPartition, isLeader, isOffline); + Replica replica = new Replica(broker, logDir, topicPartition, isLeader, isOffline); replica.setLoad(load); // add to broker - broker.addReplica(replica); + broker.addReplica(logDir, replica); Map partitions = this.partitionsByTopic .computeIfAbsent(topicPartition.topic(), k -> new HashMap<>()); @@ -139,9 +161,9 @@ public Replica addReplica(int brokerId, TopicPartition topicPartition, boolean i return replica; } - public Replica removeReplica(int brokerId, TopicPartition topicPartition) { + public Replica removeReplica(int brokerId, String sourceLogDir, TopicPartition topicPartition) { Broker broker = broker(brokerId); - return broker.removeReplica(topicPartition); + return broker.removeReplica(sourceLogDir, topicPartition); } public void relocateLeadership(String goal, String actionType, TopicPartition topicPartition, int sourceBrokerId, int destinationBrokerId) { @@ -171,19 +193,20 @@ public void relocateLeadership(TopicPartition topicPartition, int sourceBrokerId partition.relocateLeadership(destinationReplica); } - public void relocateReplica(String goal, String actionType, TopicPartition topicPartition, int sourceBrokerId, int destinationBrokerId) { - relocateReplica(topicPartition, sourceBrokerId, destinationBrokerId); + public void relocateReplica(String goal, String actionType, TopicPartition topicPartition, int sourceBrokerId, String sourceLogDir, int destinationBrokerId, String destinationLogDir) { + relocateReplica(topicPartition, sourceBrokerId, sourceLogDir, destinationBrokerId, destinationLogDir); addBalanceActionHistory(goal, actionType, topicPartition, sourceBrokerId, destinationBrokerId); } - public void relocateReplica(TopicPartition topicPartition, int sourceBrokerId, int destinationBrokerId) { - Replica replica = removeReplica(sourceBrokerId, topicPartition); + public void relocateReplica(TopicPartition topicPartition, int sourceBrokerId, String sourceLogDir, int destinationBrokerId, String destinationLogDir) { + Replica replica = removeReplica(sourceBrokerId, sourceLogDir, topicPartition); if (replica == null) { throw new IllegalArgumentException("Replica is not in the cluster."); } Broker destinationBroker = broker(destinationBrokerId); replica.setBroker(destinationBroker); - destinationBroker.addReplica(replica); + replica.setLogDir(destinationLogDir); + destinationBroker.addReplica(destinationLogDir, replica); } private void addBalanceActionHistory(String goal, String actionType, TopicPartition topicPartition, int sourceBrokerId, int destinationBrokerId) { @@ -208,7 +231,7 @@ public Map> getReplicaDistribution() for (Map tp : partitionsByTopic.values()) { tp.values().forEach(i -> { i.replicas().forEach(j -> replicaDistribution.computeIfAbsent(j.topicPartition(), k -> new ArrayList<>()) - .add(new ReplicaPlacementInfo(j.broker().id(), ""))); + .add(new ReplicaPlacementInfo(j.broker().id(), j.logDir()))); }); } return replicaDistribution; @@ -221,7 +244,7 @@ public Replica partition(TopicPartition tp) { public Map getLeaderDistribution() { Map leaderDistribution = new HashMap<>(); for (Broker broker : brokersById.values()) { - broker.leaderReplicas().forEach(i -> leaderDistribution.put(i.topicPartition(), new ReplicaPlacementInfo(broker.id(), ""))); + broker.leaderReplicas().forEach(i -> leaderDistribution.put(i.topicPartition(), new ReplicaPlacementInfo(broker.id(), i.logDir()))); } return leaderDistribution; } diff --git a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/model/LogDir.java b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/model/LogDir.java new file mode 100644 index 000000000..ef6b2e3a6 --- /dev/null +++ b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/model/LogDir.java @@ -0,0 +1,137 @@ +package com.xiaojukeji.know.streaming.km.rebalance.algorithm.model; + +import org.apache.kafka.common.TopicPartition; + +import java.util.*; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +public class LogDir implements Comparable{ + + private final String name; + private final Broker broker; + private final Set replicas; + private final Map topicPartitionReplicas; + private final Load load; + private final Capacity capacity; + + + public LogDir(String name, Broker broker, Capacity capacity) { + this.name = name; + this.broker = broker; + this.replicas = new HashSet<>(); + this.load = new Load(); + this.capacity = capacity; + this.topicPartitionReplicas = new HashMap<>(); + } + + public void addReplica(Replica replica) { + if (this.replicas.contains(replica)) { + throw new IllegalStateException(String.format("Broker %d logDir %s already has replica %s", this.broker.id(), this.name, + replica.topicPartition())); + } + this.replicas.add(replica); + this.topicPartitionReplicas.put(replica.topicPartition(), replica); + this.load.addLoad(replica.load()); + } + + Replica removeReplica(TopicPartition topicPartition) { + Replica replica = this.topicPartitionReplicas.get(topicPartition); + if(replica == null) { + return replica; + } + this.replicas.remove(replica); + this.topicPartitionReplicas.remove(topicPartition); + this.load.subtractLoad(replica.load()); + return replica; + } + + public Set replicas() { + return Collections.unmodifiableSet(this.replicas); + } + + public String name() { + return this.name; + } + + public Broker broker() { + return this.broker; + } + + public Load load() { + return this.load; + } + + public Capacity capacity() { + return this.capacity; + } + + public double utilizationFor(Resource resource) { + return this.load.loadFor(resource) / this.capacity.capacityFor(resource); + } + + + public double expectedUtilizationAfterAdd(Resource resource, Load loadToChange) { + return (this.load.loadFor(resource) + ((loadToChange == null) ? 0 : loadToChange.loadFor(resource))) + / this.capacity.capacityFor(resource); + } + + + public double expectedUtilizationAfterRemove(Resource resource, Load loadToChange) { + return (this.load.loadFor(resource) - ((loadToChange == null) ? 0 : loadToChange.loadFor(resource))) + / this.capacity.capacityFor(resource); + } + + + public SortedSet sortedReplicasFor(Predicate filter, Resource resource, boolean reverse) { + Comparator comparator = + Comparator.comparingDouble(r -> r.load().loadFor(resource)) + .thenComparingInt(Replica::hashCode); + if (reverse) + comparator = comparator.reversed(); + SortedSet sortedReplicas = new TreeSet<>(comparator); + if (filter == null) { + sortedReplicas.addAll(this.replicas); + } else { + sortedReplicas.addAll(this.replicas.stream() + .filter(filter).collect(Collectors.toList())); + } + + return sortedReplicas; + } + + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + LogDir logDir = (LogDir) o; + return broker.equals(logDir.broker) && name.equals(logDir.name); + } + + @Override + public int hashCode() { + return Objects.hash(name, broker.id()); + } + + @Override + public int compareTo(LogDir o) { + int rst = Integer.compare(broker.id(), o.broker.id()); + if(rst != 0) { + return rst; + } else { + return name.compareTo(o.name()); + } + } + + @Override + public String toString() { + return "LogDir{" + + "name=" + name + + ", broker='" + broker + '\'' + + ", replicas=" + replicas + + ", load=" + load + + ", capacity=" + capacity + + '}'; + } +} diff --git a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/model/Replica.java b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/model/Replica.java index 37b1156ff..0bb63912c 100644 --- a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/model/Replica.java +++ b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/model/Replica.java @@ -13,22 +13,24 @@ public class Replica { private final Replica original; private final TopicPartition topicPartition; private Broker broker; + private String logDir; private boolean isLeader; private boolean isOffline; - public Replica(Broker broker, TopicPartition topicPartition, boolean isLeader, boolean isOffline) { - this(broker, topicPartition, isLeader, isOffline, false); + public Replica(Broker broker, String logDir, TopicPartition topicPartition, boolean isLeader, boolean isOffline) { + this(broker, logDir, topicPartition, isLeader, isOffline, false); } - private Replica(Broker broker, TopicPartition topicPartition, boolean isLeader, boolean isOffline, boolean isOriginal) { + private Replica(Broker broker, String logDir, TopicPartition topicPartition, boolean isLeader, boolean isOffline, boolean isOriginal) { if (isOriginal) { this.original = null; } else { - this.original = new Replica(broker, topicPartition, isLeader, isOffline, true); + this.original = new Replica(broker, logDir, topicPartition, isLeader, isOffline, true); } this.load = new Load(); this.topicPartition = topicPartition; this.broker = broker; + this.logDir = logDir; this.isLeader = isLeader; this.isOffline = isOffline; } @@ -50,6 +52,14 @@ public void setBroker(Broker broker) { this.broker = broker; } + public String logDir() { + return logDir; + } + + public void setLogDir(String logDir) { + this.logDir = logDir; + } + public boolean isLeader() { return isLeader; } diff --git a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/model/Supplier.java b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/model/Supplier.java index 70db965c0..60857824e 100644 --- a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/model/Supplier.java +++ b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/model/Supplier.java @@ -4,6 +4,7 @@ import com.xiaojukeji.know.streaming.km.rebalance.algorithm.metric.Metrics; import com.xiaojukeji.know.streaming.km.rebalance.algorithm.metric.elasticsearch.ElasticsearchMetricStore; import com.xiaojukeji.know.streaming.km.rebalance.algorithm.utils.MetadataUtils; +import org.apache.kafka.clients.admin.LogDirDescription; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; @@ -41,9 +42,10 @@ public static ClusterModel load(Properties kafkaProperties, Map> logDirDescriptions = MetadataUtils.describeLogDirs(kafkaProperties, cluster.nodes()); // nodes for (Node node: cluster.nodes()) { - addBroker(node, false, model, capacitiesById); + addBroker(node, false, model, capacitiesById, logDirDescriptions.get(node.id()).keySet()); } // replicas @@ -84,10 +86,20 @@ public static ClusterModel load(Properties kafkaProperties, Map logDirDescriptionMap = logDirDescriptions.get(n.id()); + Optional logDir = logDirDescriptionMap.entrySet() + .stream() + .filter(entry -> entry.getValue().replicaInfos().containsKey(topicPartition)) + .map(Map.Entry::getKey) + .findFirst(); + if(!logDir.isPresent()) { + throw new IllegalArgumentException("Cannot get logDir of topic partiton:" + topicPartition); + } + model.addReplica(n.id(), logDir.get(), topicPartition, isLeader, isOffline, isLeader ? leaderLoad : followerLoad); } } }); @@ -98,7 +110,7 @@ private static String rack(Node node) { return (node.rack() == null || "".equals(node.rack())) ? node.host() : node.rack(); } - private static void addBroker(Node node, boolean isOffline, ClusterModel model, Map capacitiesById) { + private static void addBroker(Node node, boolean isOffline, ClusterModel model, Map capacitiesById, Set logDirs) { // rack Rack rack = model.addRack(rack(node)); // broker @@ -106,7 +118,19 @@ private static void addBroker(Node node, boolean isOffline, ClusterModel model, if (capacity == null) throw new IllegalArgumentException("Cannot get capacity of node: " + node); - model.addBroker(rack.id(), node.id(), node.host(), isOffline, capacity); + Map subCapacities = new HashMap<>(); + int logDirSize = logDirs.size(); + for(String name : logDirs) { + Capacity subCapacity = new Capacity(); + // TODO 默认每个logDir大小相同,均分node的资源,需要改为从配置加载 + subCapacity.setCapacity(Resource.CPU, capacity.capacityFor(Resource.CPU) / logDirSize); + subCapacity.setCapacity(Resource.DISK, capacity.capacityFor(Resource.DISK) / logDirSize); + subCapacity.setCapacity(Resource.NW_IN, capacity.capacityFor(Resource.NW_IN) / logDirSize); + subCapacity.setCapacity(Resource.NW_OUT, capacity.capacityFor(Resource.NW_OUT) / logDirSize); + subCapacities.put(name, subCapacity); + } + + model.addBroker(rack.id(), node.id(), node.host(), isOffline, capacity, subCapacities); } } diff --git a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/optimizer/AnalyzerUtils.java b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/optimizer/AnalyzerUtils.java index f327bfb99..c8c220b01 100644 --- a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/optimizer/AnalyzerUtils.java +++ b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/optimizer/AnalyzerUtils.java @@ -7,6 +7,8 @@ import com.xiaojukeji.know.streaming.km.rebalance.algorithm.optimizer.goals.Goal; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.*; import java.util.stream.Collectors; @@ -14,6 +16,7 @@ import static com.xiaojukeji.know.streaming.km.rebalance.algorithm.optimizer.ActionAcceptance.ACCEPT; public class AnalyzerUtils { + private static final Logger logger = LoggerFactory.getLogger(AnalyzerUtils.class); public static Set getSplitTopics(String value) { if (StringUtils.isBlank(value)) { @@ -48,6 +51,10 @@ public static Set getDiff(Map getDiff(Map beforeReplicaPlacementInfos, List afterReplicaPlacementInfos) { + for(ReplicaPlacementInfo beforePlacement : beforeReplicaPlacementInfos) { + for(ReplicaPlacementInfo afterPlacement : afterReplicaPlacementInfos) { + if(beforePlacement.brokerId() == afterPlacement.brokerId() + && !"".equals(beforePlacement.logdir()) + && !beforePlacement.logdir().equals(afterPlacement.logdir())) { + return false; + } + } + } + return true; + } + public static ActionAcceptance isProposalAcceptableForOptimizedGoals(Set optimizedGoals, BalancingAction proposal, ClusterModel clusterModel) { diff --git a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/optimizer/BalancingAction.java b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/optimizer/BalancingAction.java index feafda152..f3b034dcc 100644 --- a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/optimizer/BalancingAction.java +++ b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/optimizer/BalancingAction.java @@ -5,7 +5,9 @@ public class BalancingAction { private final TopicPartition _tp; private final Integer _sourceBrokerId; + private final String _sourceLogDir; private final Integer _destinationBrokerId; + private final String _destinationLogDir; private final ActionType _actionType; public BalancingAction(TopicPartition tp, @@ -14,7 +16,23 @@ public BalancingAction(TopicPartition tp, ActionType actionType) { _tp = tp; _sourceBrokerId = sourceBrokerId; + _sourceLogDir = "any"; _destinationBrokerId = destinationBrokerId; + _destinationLogDir = "any"; + _actionType = actionType; + } + + public BalancingAction(TopicPartition tp, + Integer sourceBrokerId, + String sourceLogDir, + Integer destinationBrokerId, + String destinationLogDir, + ActionType actionType) { + _tp = tp; + _sourceBrokerId = sourceBrokerId; + _sourceLogDir = sourceLogDir; + _destinationBrokerId = destinationBrokerId; + _destinationLogDir = destinationLogDir; _actionType = actionType; } @@ -22,10 +40,18 @@ public Integer sourceBrokerId() { return _sourceBrokerId; } + public String sourceLogDir() { + return _sourceLogDir; + } + public Integer destinationBrokerId() { return _destinationBrokerId; } + public String destinationLogDir() { + return _destinationLogDir; + } + public ActionType balancingAction() { return _actionType; } diff --git a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/optimizer/goals/AbstractGoal.java b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/optimizer/goals/AbstractGoal.java index 0916a228c..3a581d36a 100644 --- a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/optimizer/goals/AbstractGoal.java +++ b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/optimizer/goals/AbstractGoal.java @@ -62,7 +62,7 @@ protected Broker maybeApplyBalancingAction(ClusterModel clusterModel, if (action == ActionType.LEADERSHIP_MOVEMENT) { clusterModel.relocateLeadership(name(), action.toString(), replica.topicPartition(), replica.broker().id(), broker.id()); } else if (action == ActionType.REPLICA_MOVEMENT) { - clusterModel.relocateReplica(name(), action.toString(), replica.topicPartition(), replica.broker().id(), broker.id()); + clusterModel.relocateReplica(name(), action.toString(), replica.topicPartition(), replica.broker().id(), replica.logDir(), broker.id(), broker.randomLogDir().name()); } return broker; } diff --git a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/optimizer/goals/AbstractLogDirGoal.java b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/optimizer/goals/AbstractLogDirGoal.java new file mode 100644 index 000000000..5cf1446bb --- /dev/null +++ b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/optimizer/goals/AbstractLogDirGoal.java @@ -0,0 +1,136 @@ +package com.xiaojukeji.know.streaming.km.rebalance.algorithm.optimizer.goals; + +import com.xiaojukeji.know.streaming.km.rebalance.algorithm.model.Broker; +import com.xiaojukeji.know.streaming.km.rebalance.algorithm.model.ClusterModel; +import com.xiaojukeji.know.streaming.km.rebalance.algorithm.model.LogDir; +import com.xiaojukeji.know.streaming.km.rebalance.algorithm.model.Replica; +import com.xiaojukeji.know.streaming.km.rebalance.algorithm.optimizer.*; + +import java.util.*; +import java.util.stream.Collectors; + +public abstract class AbstractLogDirGoal implements Goal { + + /** + * 均衡算法逻辑处理 + */ + protected abstract void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set optimizedGoals, OptimizationOptions optimizationOptions); + + /** + * 集群列表中的所有Broker循环执行均衡算法 + */ + @Override + public void optimize(ClusterModel clusterModel, Set optimizedGoals, OptimizationOptions optimizationOptions) { + initGoalState(clusterModel, optimizationOptions); + SortedSet brokenBrokers = clusterModel.brokers().stream() + .filter(b -> optimizationOptions.balanceBrokers().isEmpty() + || optimizationOptions.balanceBrokers().contains(b.id())) + .collect(Collectors.toCollection(TreeSet::new)); + + // SortedSet brokenBrokers = clusterModel.brokers(); + + for (Broker broker : brokenBrokers) { + rebalanceForBroker(broker, clusterModel, optimizedGoals, optimizationOptions); + } + } + + protected abstract void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions); + + /** + * 根据已经计算完的均衡副本、候选目标LogDir、执行类型来 + * 执行不同的集群模型数据更改操作 + */ + protected LogDir maybeApplyBalancingAction(ClusterModel clusterModel, + Replica replica, + Collection candidateLogDirs, + ActionType action, + Set optimizedGoals, + OptimizationOptions optimizationOptions) { + //如果该topicPartiton的其他副本已经参与过迁移,则跳过,避免出现tp-replica1:[broker1/data1 -> broker2/data3], tp-replica2:[broker4/data2 -> broker1/data2]的情况(同一个tp的不同副本先从broker1迁走,随后另一个副本又迁回broker1),触发kafka bug:https://issues.apache.org/jira/browse/KAFKA-9087 + if(action == ActionType.REPLICA_MOVEMENT && clusterModel.balanceActionHistory().containsKey(replica.topicPartition())) { + return null; + } + + List eligibleLogDirs = eligibleLogDirs(replica, candidateLogDirs, action, optimizationOptions); + for (LogDir logDir : eligibleLogDirs) { + + BalancingAction proposal = new BalancingAction(replica.topicPartition(), replica.broker().id(), replica.logDir(), logDir.broker().id(), logDir.name(), action); + //均衡的副本如果存在当前的Broker上则尝试下一个logDir + if (!legitMove(replica, logDir.broker(), action)) { + continue; + } + //均衡条件不满足则尝试下一个logDir + if (!selfSatisfied(clusterModel, proposal)) { + continue; + } + //判断当前均衡操作是否与其他目标冲突,如果冲突则禁止均衡操作 + ActionAcceptance acceptance = AnalyzerUtils.isProposalAcceptableForOptimizedGoals(optimizedGoals, proposal, clusterModel); + if (acceptance == ActionAcceptance.ACCEPT) { + if (action == ActionType.LEADERSHIP_MOVEMENT) { + clusterModel.relocateLeadership(name(), action.toString(), replica.topicPartition(), replica.broker().id(), logDir.broker().id()); + } else if (action == ActionType.REPLICA_MOVEMENT) { + clusterModel.relocateReplica(name(), action.toString(), replica.topicPartition(), replica.broker().id(), replica.logDir(), logDir.broker().id(), logDir.name()); + } + return logDir; + } + } + return null; + } + + /** + * 副本操作合法性判断: + * 1.副本迁移,目的broker不包含移动副本 + * 2.Leader切换,目的broker需要包含切换副本 + */ + private static boolean legitMove(Replica replica, + Broker destinationBroker, ActionType actionType) { + switch (actionType) { + case REPLICA_MOVEMENT: + return destinationBroker.replica(replica.topicPartition()) == null; + case LEADERSHIP_MOVEMENT: + return replica.isLeader() && destinationBroker.replica(replica.topicPartition()) != null; + default: + return false; + } + } + + protected abstract boolean selfSatisfied(ClusterModel clusterModel, BalancingAction action); + + /** + * 候选Broker列表筛选过滤 + */ + public static List eligibleLogDirs(Replica replica, + Collection candidates, + ActionType action, + OptimizationOptions optimizationOptions) { + List eligibleBrokers = new ArrayList<>(candidates); + filterOutBrokersExcludedForLeadership(eligibleBrokers, optimizationOptions, replica, action); + filterOutBrokersExcludedForReplicaMove(eligibleBrokers, optimizationOptions, action); + return eligibleBrokers; + } + + /** + * Leader切换,从候选的Broker列表中排除掉excludedBroker + */ + public static void filterOutBrokersExcludedForLeadership(List eligibleLogDirs, + OptimizationOptions optimizationOptions, + Replica replica, + ActionType action) { + Set excludedBrokers = optimizationOptions.offlineBrokers(); + if (!excludedBrokers.isEmpty() && (action == ActionType.LEADERSHIP_MOVEMENT || replica.isLeader())) { + eligibleLogDirs.removeIf(logDir -> excludedBrokers.contains(logDir.broker().id())); + } + } + + /** + * 副本迁移,从候选的Broker列表中排除掉excludedBroker + */ + public static void filterOutBrokersExcludedForReplicaMove(List eligibleLogDirs, + OptimizationOptions optimizationOptions, + ActionType action) { + Set excludedBrokers = optimizationOptions.offlineBrokers(); + if (!excludedBrokers.isEmpty() && action == ActionType.REPLICA_MOVEMENT) { + eligibleLogDirs.removeIf(logDir -> excludedBrokers.contains(logDir.broker().id())); + } + } +} diff --git a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/optimizer/goals/ResourceDistributionGoal.java b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/optimizer/goals/ResourceDistributionGoal.java index 8109859a8..849acf56b 100644 --- a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/optimizer/goals/ResourceDistributionGoal.java +++ b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/optimizer/goals/ResourceDistributionGoal.java @@ -8,16 +8,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; -import java.util.Iterator; -import java.util.Set; -import java.util.SortedSet; +import java.util.*; /** * @author leewei * @date 2022/5/20 */ -public abstract class ResourceDistributionGoal extends AbstractGoal { +public abstract class ResourceDistributionGoal extends AbstractLogDirGoal { private static final Logger logger = LoggerFactory.getLogger(ResourceDistributionGoal.class); private double balanceUpperThreshold; private double balanceLowerThreshold; @@ -35,66 +32,70 @@ protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set optimizedGoals, OptimizationOptions optimizationOptions) { - double utilization = broker.utilizationFor(resource()); + for(LogDir logDir : broker.logDirs().values()) { + double utilization = logDir.utilizationFor(resource()); - boolean requireLessLoad = utilization > this.balanceUpperThreshold; - boolean requireMoreLoad = utilization < this.balanceLowerThreshold; - if (!requireMoreLoad && !requireLessLoad) { - return; - } - - // First try leadership movement - if (resource() == Resource.NW_OUT || resource() == Resource.CPU) { - if (requireLessLoad && rebalanceByMovingLoadOut(broker, clusterModel, optimizedGoals, - ActionType.LEADERSHIP_MOVEMENT, optimizationOptions)) { - logger.debug("Successfully balanced {} for broker {} by moving out leaders.", resource(), broker.id()); - requireLessLoad = false; + boolean requireLessLoad = utilization > this.balanceUpperThreshold; + boolean requireMoreLoad = utilization < this.balanceLowerThreshold; + if (!requireMoreLoad && !requireLessLoad) { + return; } - if (requireMoreLoad && rebalanceByMovingLoadIn(broker, clusterModel, optimizedGoals, - ActionType.LEADERSHIP_MOVEMENT, optimizationOptions)) { - logger.debug("Successfully balanced {} for broker {} by moving in leaders.", resource(), broker.id()); - requireMoreLoad = false; + + // First try leadership movement + if (resource() == Resource.NW_OUT || resource() == Resource.CPU) { + if (requireLessLoad && rebalanceByMovingLoadOut(logDir, clusterModel, optimizedGoals, + ActionType.LEADERSHIP_MOVEMENT, optimizationOptions)) { + logger.debug("Successfully balanced {} for broker {} logDir {} by moving out leaders.", resource(), broker.id(), logDir.name()); + requireLessLoad = false; + } + if (requireMoreLoad && rebalanceByMovingLoadIn(logDir, clusterModel, optimizedGoals, + ActionType.LEADERSHIP_MOVEMENT, optimizationOptions)) { + logger.debug("Successfully balanced {} for broker {} logDir {} by moving in leaders.", resource(), broker.id(), logDir.name()); + requireMoreLoad = false; + } } - } - boolean balanced = true; - if (requireLessLoad) { - if (!rebalanceByMovingLoadOut(broker, clusterModel, optimizedGoals, - ActionType.REPLICA_MOVEMENT, optimizationOptions)) { - balanced = rebalanceBySwappingLoadOut(broker, clusterModel, optimizedGoals, optimizationOptions); + boolean balanced = true; + if (requireLessLoad) { + if (!rebalanceByMovingLoadOut(logDir, clusterModel, optimizedGoals, + ActionType.REPLICA_MOVEMENT, optimizationOptions)) { + balanced = rebalanceBySwappingLoadOut(logDir, clusterModel, optimizedGoals, optimizationOptions); + } + } else if (requireMoreLoad) { + if (!rebalanceByMovingLoadIn(logDir, clusterModel, optimizedGoals, + ActionType.REPLICA_MOVEMENT, optimizationOptions)) { + balanced = rebalanceBySwappingLoadIn(logDir, clusterModel, optimizedGoals, optimizationOptions); + } } - } else if (requireMoreLoad) { - if (!rebalanceByMovingLoadIn(broker, clusterModel, optimizedGoals, - ActionType.REPLICA_MOVEMENT, optimizationOptions)) { - balanced = rebalanceBySwappingLoadIn(broker, clusterModel, optimizedGoals, optimizationOptions); + if (balanced) { + logger.debug("Successfully balanced {} for broker {} logDir {} by moving leaders and replicas.", resource(), broker.id(), logDir.name()); + } else { + logger.debug("Balance {} for broker {} logDir {} failed.", resource(), broker.id(), logDir.name()); } } - if (balanced) { - logger.debug("Successfully balanced {} for broker {} by moving leaders and replicas.", resource(), broker.id()); - } } - private boolean rebalanceByMovingLoadOut(Broker broker, + private boolean rebalanceByMovingLoadOut(LogDir logDir, ClusterModel clusterModel, Set optimizedGoals, ActionType actionType, OptimizationOptions optimizationOptions) { - SortedSet candidateBrokers = sortedCandidateBrokersUnderThreshold(clusterModel, this.balanceUpperThreshold, optimizationOptions, broker, false); - SortedSet replicasToMove = sortedCandidateReplicas(broker, actionType, optimizationOptions, true); + SortedSet candidateLogDirs = sortedCandidateLogDirsUnderThreshold(clusterModel, this.balanceUpperThreshold, optimizationOptions, logDir.broker(), false); + SortedSet replicasToMove = sortedCandidateReplicas(logDir, actionType, optimizationOptions, true); for (Replica replica : replicasToMove) { - Broker acceptedBroker = maybeApplyBalancingAction(clusterModel, replica, candidateBrokers, actionType, optimizedGoals, optimizationOptions); + LogDir acceptedLogDir = maybeApplyBalancingAction(clusterModel, replica, candidateLogDirs, actionType, optimizedGoals, optimizationOptions); - if (acceptedBroker != null) { - if (broker.utilizationFor(resource()) < this.balanceUpperThreshold) { + if (acceptedLogDir != null) { + if (logDir.utilizationFor(resource()) < this.balanceUpperThreshold) { return true; } - // Remove and reinsert the broker so the order is correct. + // Remove and reinsert the logDir so the order is correct. // candidateBrokers.remove(acceptedBroker); - candidateBrokers.removeIf(b -> b.id() == acceptedBroker.id()); - if (acceptedBroker.utilizationFor(resource()) < this.balanceUpperThreshold) { - candidateBrokers.add(acceptedBroker); + candidateLogDirs.removeIf(b -> b.broker().id() == acceptedLogDir.broker().id() && b.name().equals(acceptedLogDir.name())); + if (acceptedLogDir.utilizationFor(resource()) < this.balanceUpperThreshold) { + candidateLogDirs.add(acceptedLogDir); } } } @@ -102,37 +103,37 @@ private boolean rebalanceByMovingLoadOut(Broker broker, return false; } - private boolean rebalanceByMovingLoadIn(Broker broker, + private boolean rebalanceByMovingLoadIn(LogDir logDir, ClusterModel clusterModel, Set optimizedGoals, ActionType actionType, OptimizationOptions optimizationOptions) { - SortedSet candidateBrokers = sortedCandidateBrokersOverThreshold(clusterModel, this.balanceLowerThreshold, optimizationOptions, broker, true); - Iterator candidateBrokersIt = candidateBrokers.iterator(); - Broker nextCandidateBroker = null; + SortedSet candidateLogDirs = sortedCandidateLogDirsOverThreshold(clusterModel, this.balanceLowerThreshold, optimizationOptions, logDir.broker(), true); + Iterator candidateLogDirsIt = candidateLogDirs.iterator(); + LogDir nextCandidateLogDir = null; while (true) { - Broker candidateBroker; - if (nextCandidateBroker != null) { - candidateBroker = nextCandidateBroker; - nextCandidateBroker = null; - } else if (candidateBrokersIt.hasNext()) { - candidateBroker = candidateBrokersIt.next(); + LogDir candidateLogDir; + if (nextCandidateLogDir != null) { + candidateLogDir = nextCandidateLogDir; + nextCandidateLogDir = null; + } else if (candidateLogDirsIt.hasNext()) { + candidateLogDir = candidateLogDirsIt.next(); } else { break; } - SortedSet replicasToMove = sortedCandidateReplicas(candidateBroker, actionType, optimizationOptions, true); + SortedSet replicasToMove = sortedCandidateReplicas(candidateLogDir, actionType, optimizationOptions, true); for (Replica replica : replicasToMove) { - Broker acceptedBroker = maybeApplyBalancingAction(clusterModel, replica, Collections.singletonList(broker), actionType, optimizedGoals, optimizationOptions); - if (acceptedBroker != null) { - if (broker.utilizationFor(resource()) > this.balanceLowerThreshold) { + LogDir acceptedLogDir = maybeApplyBalancingAction(clusterModel, replica, Collections.singletonList(logDir), actionType, optimizedGoals, optimizationOptions); + if (acceptedLogDir != null) { + if (logDir.utilizationFor(resource()) > this.balanceLowerThreshold) { return true; } - if (candidateBrokersIt.hasNext() || nextCandidateBroker != null) { - if (nextCandidateBroker == null) { - nextCandidateBroker = candidateBrokersIt.next(); + if (candidateLogDirsIt.hasNext() || nextCandidateLogDir != null) { + if (nextCandidateLogDir == null) { + nextCandidateLogDir = candidateLogDirsIt.next(); } - if (candidateBroker.utilizationFor(resource()) < nextCandidateBroker.utilizationFor(resource())) { + if (candidateLogDir.utilizationFor(resource()) < nextCandidateLogDir.utilizationFor(resource())) { break; } } @@ -143,25 +144,25 @@ private boolean rebalanceByMovingLoadIn(Broker broker, return false; } - private boolean rebalanceBySwappingLoadOut(Broker broker, - ClusterModel clusterModel, - Set optimizedGoals, - OptimizationOptions optimizationOptions) { - return false; - } - - private boolean rebalanceBySwappingLoadIn(Broker broker, + private boolean rebalanceBySwappingLoadOut(LogDir logDir, ClusterModel clusterModel, Set optimizedGoals, OptimizationOptions optimizationOptions) { return false; } + private boolean rebalanceBySwappingLoadIn(LogDir logDir, + ClusterModel clusterModel, + Set optimizedGoals, + OptimizationOptions optimizationOptions) { + return false; + } + private SortedSet sortedCandidateBrokersUnderThreshold(ClusterModel clusterModel, - double utilizationThreshold, - OptimizationOptions optimizationOptions, - Broker excludedBroker, - boolean reverse) { + double utilizationThreshold, + OptimizationOptions optimizationOptions, + Broker excludedBroker, + boolean reverse) { return clusterModel.sortedBrokersFor( b -> b.utilizationFor(resource()) < utilizationThreshold && !excludedBroker.equals(b) @@ -170,11 +171,26 @@ private SortedSet sortedCandidateBrokersUnderThreshold(ClusterModel clus , resource(), reverse); } - private SortedSet sortedCandidateBrokersOverThreshold(ClusterModel clusterModel, + private SortedSet sortedCandidateLogDirsUnderThreshold(ClusterModel clusterModel, double utilizationThreshold, OptimizationOptions optimizationOptions, Broker excludedBroker, boolean reverse) { + + return clusterModel.sortedLogDirsFor( + b -> b.utilizationFor(resource()) < utilizationThreshold + && !excludedBroker.equals(b.broker()) + // filter brokers + && (optimizationOptions.balanceBrokers().isEmpty() || optimizationOptions.balanceBrokers().contains(b.broker().id())) + , resource(), reverse); + } + + + private SortedSet sortedCandidateBrokersOverThreshold(ClusterModel clusterModel, + double utilizationThreshold, + OptimizationOptions optimizationOptions, + Broker excludedBroker, + boolean reverse) { return clusterModel.sortedBrokersFor( b -> b.utilizationFor(resource()) > utilizationThreshold && !excludedBroker.equals(b) @@ -183,6 +199,20 @@ private SortedSet sortedCandidateBrokersOverThreshold(ClusterModel clust , resource(), reverse); } + private SortedSet sortedCandidateLogDirsOverThreshold(ClusterModel clusterModel, + double utilizationThreshold, + OptimizationOptions optimizationOptions, + Broker excludedBroker, + boolean reverse) { + + return clusterModel.sortedLogDirsFor( + b -> b.utilizationFor(resource()) > utilizationThreshold + && !excludedBroker.equals(b.broker()) + // filter brokers + && (optimizationOptions.balanceBrokers().isEmpty() || optimizationOptions.balanceBrokers().contains(b.broker().id())) + , resource(), reverse); + } + private SortedSet sortedCandidateReplicas(Broker broker, ActionType actionType, OptimizationOptions optimizationOptions, @@ -196,12 +226,27 @@ private SortedSet sortedCandidateReplicas(Broker broker, , resource(), reverse); } + private SortedSet sortedCandidateReplicas(LogDir logDir, + ActionType actionType, + OptimizationOptions optimizationOptions, + boolean reverse) { + return logDir.sortedReplicasFor( + // exclude topic + r -> !optimizationOptions.excludedTopics().contains(r.topicPartition().topic()) + && r.load().loadFor(resource()) > 0.0 + // LEADERSHIP_MOVEMENT or NW_OUT is require leader replica + && (actionType != ActionType.LEADERSHIP_MOVEMENT && resource() != Resource.NW_OUT || r.isLeader()) + , resource(), reverse); + } + protected abstract Resource resource(); @Override protected boolean selfSatisfied(ClusterModel clusterModel, BalancingAction action) { Broker destinationBroker = clusterModel.broker(action.destinationBrokerId()); + LogDir destinationLogDir = destinationBroker.logDir(action.destinationLogDir()); Broker sourceBroker = clusterModel.broker(action.sourceBrokerId()); + LogDir sourceLogDir = sourceBroker.logDir(action.sourceLogDir()); Replica sourceReplica = sourceBroker.replica(action.topicPartition()); Load loadToChange; @@ -214,8 +259,8 @@ protected boolean selfSatisfied(ClusterModel clusterModel, BalancingAction actio } else { loadToChange = sourceReplica.load(); } - double sourceUtilization = sourceBroker.expectedUtilizationAfterRemove(resource(), loadToChange); - double destinationUtilization = destinationBroker.expectedUtilizationAfterAdd(resource(), loadToChange); + double sourceUtilization = sourceLogDir.expectedUtilizationAfterRemove(resource(), loadToChange); + double destinationUtilization = destinationLogDir.expectedUtilizationAfterAdd(resource(), loadToChange); return sourceUtilization >= this.balanceLowerThreshold && destinationUtilization <= this.balanceUpperThreshold; } diff --git a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/utils/MetadataUtils.java b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/utils/MetadataUtils.java index 3df57c6c5..dda4e94ce 100644 --- a/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/utils/MetadataUtils.java +++ b/km-enterprise/km-rebalance/src/main/java/com/xiaojukeji/know/streaming/km/rebalance/algorithm/utils/MetadataUtils.java @@ -1,13 +1,21 @@ package com.xiaojukeji.know.streaming.km.rebalance.algorithm.utils; import org.apache.kafka.clients.*; +import org.apache.kafka.clients.admin.LogDirDescription; +import org.apache.kafka.clients.admin.ReplicaInfo; import org.apache.kafka.clients.consumer.internals.NoAvailableBrokersException; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.DescribeLogDirsRequestData; +import org.apache.kafka.common.message.DescribeLogDirsResponseData; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.Selector; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.DescribeLogDirsRequest; +import org.apache.kafka.common.requests.DescribeLogDirsResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.utils.LogContext; @@ -17,9 +25,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.Properties; +import java.util.*; /** * @author leewei @@ -89,4 +95,84 @@ public static Cluster metadata(Properties props) { networkClient.close(); } } + + public static Map> describeLogDirs(Properties props, List nodes) { + props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.BytesSerializer"); + props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.BytesSerializer"); + ProducerConfig config = new ProducerConfig(props); + + Time time = Time.SYSTEM; + LogContext logContext = new LogContext("Metadata client"); + + ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext); + Selector selector = new Selector( + NetworkReceive.UNLIMITED, + config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), + new org.apache.kafka.common.metrics.Metrics(), + time, + "metadata-client", + Collections.singletonMap("client", "metadata-client"), + false, + channelBuilder, + logContext + ); + + NetworkClient networkClient = new NetworkClient( + selector, + new ManualMetadataUpdater(), + "metadata-client", + 1, + config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), + config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG), + config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), + config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), + config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), + config.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG), + config.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG), + ClientDnsLookup.DEFAULT, + time, + true, + new ApiVersions(), + logContext + ); + + Map> allDescriptions = new HashMap<>(); + try { + for (int i = 0; i < nodes.size(); i++) { + Node sourceNode = nodes.get(i); + try { + if (NetworkClientUtils.awaitReady(networkClient, sourceNode, time, 10 * 1000)) { + ClientRequest describeLogDirsRequest = networkClient.newClientRequest(String.valueOf(sourceNode.id()), new DescribeLogDirsRequest.Builder(new DescribeLogDirsRequestData().setTopics(null)), time.milliseconds(), true); + ClientResponse describeLogDirsResponse = NetworkClientUtils.sendAndReceive(networkClient, describeLogDirsRequest, time); + DescribeLogDirsResponse logDirsResponse = (DescribeLogDirsResponse)describeLogDirsResponse.responseBody(); + Map descriptions = logDirDescriptions(logDirsResponse); + allDescriptions.put(sourceNode.id(), descriptions); + } + } catch (IOException e) { + logger.warn("Connection to " + sourceNode + " error", e); + throw new NoAvailableBrokersException(); + } + } + } finally { + networkClient.close(); + } + return allDescriptions; + } + + + private static Map logDirDescriptions(DescribeLogDirsResponse response) { + Map result = new HashMap<>(response.data().results().size()); + for (DescribeLogDirsResponseData.DescribeLogDirsResult logDirResult : response.data().results()) { + Map replicaInfoMap = new HashMap<>(); + for (DescribeLogDirsResponseData.DescribeLogDirsTopic t : logDirResult.topics()) { + for (DescribeLogDirsResponseData.DescribeLogDirsPartition p : t.partitions()) { + replicaInfoMap.put( + new TopicPartition(t.name(), p.partitionIndex()), + new ReplicaInfo(p.partitionSize(), p.offsetLag(), p.isFutureKey())); + } + } + result.put(logDirResult.logDir(), new LogDirDescription(Errors.forCode(logDirResult.errorCode()).exception(), replicaInfoMap)); + } + return result; + } }