diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java index 646ff8a5a025..a234682f7412 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancer.java @@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -52,7 +51,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; /** * Container balancer is a service in SCM to move containers between over- and @@ -86,15 +84,15 @@ public class ContainerBalancer { private long clusterRemaining; private double clusterAvgUtilisation; private double upperLimit; + private double lowerLimit; private volatile boolean balancerRunning; private volatile Thread currentBalancingThread; private Lock lock; private ContainerBalancerSelectionCriteria selectionCriteria; private Map sourceToTargetMap; - private Map sizeLeavingNode; - private Map sizeEnteringNode; private Set selectedContainers; private FindTargetStrategy findTargetStrategy; + private FindSourceStrategy findSourceStrategy; private Map> moveSelectionToFutureMap; @@ -131,8 +129,9 @@ public ContainerBalancer( this.unBalancedNodes = new ArrayList<>(); this.lock = new ReentrantLock(); - findTargetStrategy = - new FindTargetGreedy(containerManager, placementPolicy); + findTargetStrategy = new FindTargetGreedy( + containerManager, placementPolicy, nodeManager); + findSourceStrategy = new FindSourceGreedy(nodeManager); } /** @@ -251,7 +250,6 @@ private boolean initializeIteration() { this.selectedContainers.clear(); this.overUtilizedNodes.clear(); this.underUtilizedNodes.clear(); - this.withinThresholdUtilizedNodes.clear(); this.unBalancedNodes.clear(); this.countDatanodesInvolvedPerIteration = 0; this.sizeMovedPerIteration = 0; @@ -262,20 +260,19 @@ private boolean initializeIteration() { clusterAvgUtilisation); } - // under utilized nodes have utilization(that is, used / capacity) less - // than lower limit - double lowerLimit = clusterAvgUtilisation - threshold; - // over utilized nodes have utilization(that is, used / capacity) greater // than upper limit this.upperLimit = clusterAvgUtilisation + threshold; + // under utilized nodes have utilization(that is, used / capacity) less + // than lower limit + this.lowerLimit = clusterAvgUtilisation - threshold; if (LOG.isDebugEnabled()) { LOG.debug("Lower limit for utilization is {} and Upper limit for " + "utilization is {}", lowerLimit, upperLimit); } - long overUtilizedBytes = 0L, underUtilizedBytes = 0L; + long totalOverUtilizedBytes = 0L, totalUnderUtilizedBytes = 0L; // find over and under utilized nodes for (DatanodeUsageInfo datanodeUsageInfo : datanodeUsageInfos) { if (!isBalancerRunning()) { @@ -291,7 +288,7 @@ private boolean initializeIteration() { datanodeUsageInfo.getScmNodeStat().getRemaining().get(), utilization); } - if (utilization > upperLimit) { + if (Double.compare(utilization, upperLimit) > 0) { overUtilizedNodes.add(datanodeUsageInfo); metrics.incrementDatanodesNumToBalance(1); @@ -300,27 +297,30 @@ private boolean initializeIteration() { ratioToPercent(utilization))); // amount of bytes greater than upper limit in this node - overUtilizedBytes += ratioToBytes( + Long overUtilizedBytes = ratioToBytes( datanodeUsageInfo.getScmNodeStat().getCapacity().get(), utilization) - ratioToBytes( datanodeUsageInfo.getScmNodeStat().getCapacity().get(), upperLimit); - } else if (utilization < lowerLimit) { + totalOverUtilizedBytes += overUtilizedBytes; + } else if (Double.compare(utilization, lowerLimit) < 0) { underUtilizedNodes.add(datanodeUsageInfo); metrics.incrementDatanodesNumToBalance(1); // amount of bytes lesser than lower limit in this node - underUtilizedBytes += ratioToBytes( + Long underUtilizedBytes = ratioToBytes( datanodeUsageInfo.getScmNodeStat().getCapacity().get(), lowerLimit) - ratioToBytes( datanodeUsageInfo.getScmNodeStat().getCapacity().get(), utilization); + totalUnderUtilizedBytes += underUtilizedBytes; } else { withinThresholdUtilizedNodes.add(datanodeUsageInfo); } } metrics.setDataSizeToBalanceGB( - Math.max(overUtilizedBytes, underUtilizedBytes) / OzoneConsts.GB); + Math.max(totalOverUtilizedBytes, totalUnderUtilizedBytes) / + OzoneConsts.GB); Collections.reverse(underUtilizedNodes); unBalancedNodes = new ArrayList<>( @@ -338,102 +338,62 @@ private boolean initializeIteration() { overUtilizedNodes.size(), underUtilizedNodes.size()); selectionCriteria = new ContainerBalancerSelectionCriteria(config, - nodeManager, replicationManager, containerManager); + nodeManager, replicationManager, containerManager, findSourceStrategy); sourceToTargetMap = new HashMap<>(overUtilizedNodes.size() + withinThresholdUtilizedNodes.size()); - - // initialize maps to track how much size is leaving and entering datanodes - sizeLeavingNode = new HashMap<>(overUtilizedNodes.size() + - withinThresholdUtilizedNodes.size()); - overUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode - .put(datanodeUsageInfo.getDatanodeDetails(), 0L)); - withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeLeavingNode - .put(datanodeUsageInfo.getDatanodeDetails(), 0L)); - - sizeEnteringNode = new HashMap<>(underUtilizedNodes.size() + - withinThresholdUtilizedNodes.size()); - underUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode - .put(datanodeUsageInfo.getDatanodeDetails(), 0L)); - withinThresholdUtilizedNodes.forEach(datanodeUsageInfo -> sizeEnteringNode - .put(datanodeUsageInfo.getDatanodeDetails(), 0L)); - return true; } private IterationResult doIteration() { // note that potential and selected targets are updated in the following // loop - List potentialTargets = getPotentialTargets(); + //TODO(jacksonyao): take withinThresholdUtilizedNodes as candidate for both + // source and target + findSourceStrategy.reInitialize(getPotentialSources(), config, lowerLimit); + List potentialTargets = getPotentialTargets(); + findTargetStrategy.reInitialize(potentialTargets, config, upperLimit); + Set selectedTargets = new HashSet<>(potentialTargets.size()); moveSelectionToFutureMap = new HashMap<>(unBalancedNodes.size()); boolean isMoveGenerated = false; - try { // match each overUtilized node with a target - for (DatanodeUsageInfo datanodeUsageInfo : overUtilizedNodes) { + while (true) { + DatanodeDetails source = + findSourceStrategy.getNextCandidateSourceDataNode(); + if (source == null) { + break; + } if (!isBalancerRunning()) { return IterationResult.ITERATION_INTERRUPTED; } - DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails(); + IterationResult result = checkConditionsForBalancing(); if (result != null) { return result; } - ContainerMoveSelection moveSelection = - matchSourceWithTarget(source, potentialTargets); + ContainerMoveSelection moveSelection = matchSourceWithTarget(source); if (moveSelection != null) { isMoveGenerated = true; LOG.info("ContainerBalancer is trying to move container {} from " + "source datanode {} to target datanode {}", - moveSelection.getContainerID().toString(), source.getUuidString(), + moveSelection.getContainerID().toString(), + source.getUuidString(), moveSelection.getTargetNode().getUuidString()); if (moveContainer(source, moveSelection)) { // consider move successful for now, and update selection criteria - potentialTargets = updateTargetsAndSelectionCriteria( - potentialTargets, selectedTargets, moveSelection, source); + updateTargetsAndSelectionCriteria( + selectedTargets, moveSelection, source); } + } else { + // can not find any target for this source + findSourceStrategy.removeCandidateSourceDataNode(source); } } - // if not all underUtilized nodes have been selected, try to match - // withinThresholdUtilized nodes with underUtilized nodes - if (selectedTargets.size() < underUtilizedNodes.size()) { - potentialTargets.removeAll(selectedTargets); - Collections.reverse(withinThresholdUtilizedNodes); - - for (DatanodeUsageInfo datanodeUsageInfo : - withinThresholdUtilizedNodes) { - if (!balancerRunning) { - return IterationResult.ITERATION_INTERRUPTED; - } - DatanodeDetails source = datanodeUsageInfo.getDatanodeDetails(); - IterationResult result = checkConditionsForBalancing(); - if (result != null) { - return result; - } - - ContainerMoveSelection moveSelection = - matchSourceWithTarget(source, potentialTargets); - if (moveSelection != null) { - isMoveGenerated = true; - LOG.info("ContainerBalancer is trying to move container {} from " + - "source datanode {} to target datanode {}", - moveSelection.getContainerID().toString(), - source.getUuidString(), - moveSelection.getTargetNode().getUuidString()); - - if (moveContainer(source, moveSelection)) { - // consider move successful for now, and update selection criteria - potentialTargets = - updateTargetsAndSelectionCriteria(potentialTargets, - selectedTargets, moveSelection, source); - } - } - } - } if (!isMoveGenerated) { //no move option is generated, so the cluster can not be //balanced any more, just stop iteration and exit @@ -502,12 +462,9 @@ private void checkIterationMoveResults(Set selectedTargets) { * Match a source datanode with a target datanode and identify the container * to move. * - * @param potentialTargets Collection of potential targets to move - * container to * @return ContainerMoveSelection containing the selected target and container */ - private ContainerMoveSelection matchSourceWithTarget( - DatanodeDetails source, Collection potentialTargets) { + private ContainerMoveSelection matchSourceWithTarget(DatanodeDetails source) { NavigableSet candidateContainers = selectionCriteria.getCandidateContainers(source); @@ -518,14 +475,14 @@ private ContainerMoveSelection matchSourceWithTarget( } return null; } + if (LOG.isDebugEnabled()) { LOG.debug("ContainerBalancer is finding suitable target for source " + "datanode {}", source.getUuidString()); } ContainerMoveSelection moveSelection = findTargetStrategy.findTargetForContainerMove( - source, potentialTargets, candidateContainers, - this::canSizeEnterTarget); + source, candidateContainers); if (moveSelection == null) { if (LOG.isDebugEnabled()) { @@ -622,15 +579,13 @@ private boolean moveContainer(DatanodeDetails source, /** * Update targets and selection criteria after a move. * - * @param potentialTargets potential target datanodes * @param selectedTargets selected target datanodes * @param moveSelection the target datanode and container that has been * just selected * @param source the source datanode * @return List of updated potential targets */ - private List updateTargetsAndSelectionCriteria( - Collection potentialTargets, + private void updateTargetsAndSelectionCriteria( Set selectedTargets, ContainerMoveSelection moveSelection, DatanodeDetails source) { // count source if it has not been involved in move earlier @@ -647,10 +602,6 @@ private List updateTargetsAndSelectionCriteria( selectedTargets.add(moveSelection.getTargetNode()); selectedContainers.add(moveSelection.getContainerID()); selectionCriteria.setSelectedContainers(selectedContainers); - - return potentialTargets.stream() - .filter(node -> sizeEnteringNode.get(node) < - config.getMaxSizeEnteringTarget()).collect(Collectors.toList()); } /** @@ -689,46 +640,31 @@ private long ratioToBytes(Long nodeCapacity, double utilizationRatio) { return (clusterCapacity - clusterRemaining) / (double) clusterCapacity; } + + + /** - * Checks if specified size can enter specified target datanode - * according to {@link ContainerBalancerConfiguration} - * "size.entering.target.max". + * Get potential targets for container move. Potential targets are under + * utilized and within threshold utilized nodes. * - * @param target target datanode in which size is entering - * @param size size in bytes - * @return true if size can enter target, else false + * @return A list of potential target DatanodeUsageInfo. */ - boolean canSizeEnterTarget(DatanodeDetails target, long size) { - if (sizeEnteringNode.containsKey(target)) { - long sizeEnteringAfterMove = sizeEnteringNode.get(target) + size; - //size can be moved into target datanode only when the following - //two condition are met. - //1 sizeEnteringAfterMove does not succeed the configured - // MaxSizeEnteringTarget - //2 current usage of target datanode plus sizeEnteringAfterMove - // is smaller than or equal to upperLimit - return sizeEnteringAfterMove <= config.getMaxSizeEnteringTarget() && - nodeManager.getUsageInfo(target) - .calculateUtilization(sizeEnteringAfterMove) <= upperLimit; - } - return false; + private List getPotentialTargets() { + //TODO(jacksonyao): take withinThresholdUtilizedNodes as candidate for both + // source and target + return underUtilizedNodes; } /** - * Get potential targets for container move. Potential targets are under + * Get potential sourecs for container move. Potential sourecs are over * utilized and within threshold utilized nodes. * - * @return A list of potential target DatanodeDetails. + * @return A list of potential source DatanodeUsageInfo. */ - private List getPotentialTargets() { - List potentialTargets = new ArrayList<>( - underUtilizedNodes.size() + withinThresholdUtilizedNodes.size()); - - underUtilizedNodes - .forEach(node -> potentialTargets.add(node.getDatanodeDetails())); - withinThresholdUtilizedNodes - .forEach(node -> potentialTargets.add(node.getDatanodeDetails())); - return potentialTargets; + private List getPotentialSources() { + //TODO(jacksonyao): take withinThresholdUtilizedNodes as candidate for both + // source and target + return overUtilizedNodes; } /** @@ -756,10 +692,10 @@ private void incSizeSelectedForMoving(DatanodeDetails source, sizeMovedPerIteration += size; // update sizeLeavingNode map with the recent moveSelection - sizeLeavingNode.put(source, sizeLeavingNode.get(source) + size); + findSourceStrategy.increaseSizeLeaving(source, size); // update sizeEnteringNode map with the recent moveSelection - sizeEnteringNode.put(target, sizeEnteringNode.get(target) + size); + findTargetStrategy.increaseSizeEntering(target, size); } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java index 4e7b88999824..a77c1b7698ed 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerConfiguration.java @@ -307,9 +307,16 @@ public String toString() { "%-50s %s%n" + "%-50s %s%n" + "%-50s %s%n" + - "%-50s %dB%n", "Key", "Value", "Threshold", + "%-50s %dGB%n"+ + "%-50s %dGB%n"+ + "%-50s %dGB%n", "Key", "Value", "Threshold", threshold, "Max Datanodes to Involve per Iteration(ratio)", maxDatanodesRatioToInvolvePerIteration, - "Max Size to Move per Iteration", maxSizeToMovePerIteration); + "Max Size to Move per Iteration", + maxSizeToMovePerIteration / OzoneConsts.GB, + "Max Size Entering Target per Iteration", + maxSizeEnteringTarget / OzoneConsts.GB, + "Max Size Leaving Source per Iteration", + maxSizeLeavingSource / OzoneConsts.GB); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerSelectionCriteria.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerSelectionCriteria.java index 11d157184e53..b5f5acd8fa5b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerSelectionCriteria.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/ContainerBalancerSelectionCriteria.java @@ -48,18 +48,21 @@ public class ContainerBalancerSelectionCriteria { private ContainerManager containerManager; private Set selectedContainers; private Set excludeContainers; + private FindSourceStrategy findSourceStrategy; public ContainerBalancerSelectionCriteria( ContainerBalancerConfiguration balancerConfiguration, NodeManager nodeManager, ReplicationManager replicationManager, - ContainerManager containerManager) { + ContainerManager containerManager, + FindSourceStrategy findSourceStrategy) { this.balancerConfiguration = balancerConfiguration; this.nodeManager = nodeManager; this.replicationManager = replicationManager; this.containerManager = containerManager; selectedContainers = new HashSet<>(); excludeContainers = balancerConfiguration.getExcludeContainers(); + this.findSourceStrategy = findSourceStrategy; } /** @@ -115,6 +118,23 @@ public NavigableSet getCandidateContainers( } }); + //if the utilization of the source data node becomes lower than lowerLimit + //after the container is moved out , then the container can not be + // a candidate one, and we should remove it from the candidateContainers. + containerIDSet.removeIf(c -> { + ContainerInfo cInfo; + try { + cInfo = containerManager.getContainer(c); + } catch (ContainerNotFoundException e) { + LOG.warn("Could not find container {} when " + + "be matched with a move target", c); + //remove this not found container + return true; + } + return !findSourceStrategy.canSizeLeaveSource( + node, cInfo.getUsedBytes()); + }); + containerIDSet.removeIf(this::isContainerReplicatingOrDeleting); return containerIDSet; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceGreedy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceGreedy.java new file mode 100644 index 000000000000..591461d88750 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceGreedy.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container.balancer; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.UUID; + +/** + * The selection criteria for selecting source datanodes , the containers of + * which will be moved out. + */ +public class FindSourceGreedy implements FindSourceStrategy{ + private static final Logger LOG = + LoggerFactory.getLogger(FindSourceGreedy.class); + private Map sizeLeavingNode; + private PriorityQueue potentialSources; + private NodeManager nodeManager; + private ContainerBalancerConfiguration config; + private Double lowerLimit; + + FindSourceGreedy(NodeManager nodeManager) { + sizeLeavingNode = new HashMap<>(); + potentialSources = new PriorityQueue<>((a, b) -> { + double currentUsageOfA = a.calculateUtilization( + -sizeLeavingNode.get(a.getDatanodeDetails())); + double currentUsageOfB = b.calculateUtilization( + -sizeLeavingNode.get(b.getDatanodeDetails())); + //in descending order + int ret = Double.compare(currentUsageOfB, currentUsageOfA); + if (ret != 0) { + return ret; + } + UUID uuidA = a.getDatanodeDetails().getUuid(); + UUID uuidB = b.getDatanodeDetails().getUuid(); + return uuidA.compareTo(uuidB); + }); + this.nodeManager = nodeManager; + } + + private void setLowerLimit(Double lowerLimit) { + this.lowerLimit = lowerLimit; + } + + private void setPotentialSources( + List potentialSourceDataNodes) { + potentialSources.clear(); + sizeLeavingNode.clear(); + potentialSourceDataNodes.forEach( + c -> sizeLeavingNode.put(c.getDatanodeDetails(), 0L)); + potentialSources.addAll(potentialSourceDataNodes); + } + + private void setConfiguration(ContainerBalancerConfiguration conf) { + this.config = conf; + } + + /** + * increase the Leaving size of a candidate source data node. + */ + @Override + public void increaseSizeLeaving(DatanodeDetails dui, long size) { + Long currentSize = sizeLeavingNode.get(dui); + if(currentSize != null) { + sizeLeavingNode.put(dui, currentSize + size); + //reorder according to the latest sizeLeavingNode + potentialSources.add(nodeManager.getUsageInfo(dui)); + return; + } + LOG.warn("Cannot find datanode {} in candidate source datanodes", + dui.getUuid()); + } + + /** + * get the next candidate source data node according to + * the strategy. + * + * @return the nex candidate source data node. + */ + @Override + public DatanodeDetails getNextCandidateSourceDataNode() { + if (potentialSources.isEmpty()) { + LOG.info("no more candidate source data node"); + return null; + } + return potentialSources.poll().getDatanodeDetails(); + } + + /** + * remove the specified data node from candidate source + * data nodes. + */ + @Override + public void removeCandidateSourceDataNode(DatanodeDetails dui){ + potentialSources.removeIf(a -> a.getDatanodeDetails().equals(dui)); + } + + /** + * Checks if specified size can leave a specified target datanode + * according to {@link ContainerBalancerConfiguration} + * "size.entering.target.max". + * + * @param source target datanode in which size is entering + * @param size size in bytes + * @return true if size can leave, else false + */ + @Override + public boolean canSizeLeaveSource(DatanodeDetails source, long size) { + if (sizeLeavingNode.containsKey(source)) { + long sizeLeavingAfterMove = sizeLeavingNode.get(source) + size; + //size can be moved out of source datanode only when the following + //two condition are met. + //1 sizeLeavingAfterMove does not succeed the configured + // MaxSizeLeavingTarget + //2 after subtracting sizeLeavingAfterMove, the usage is bigger + // than or equal to lowerLimit + return sizeLeavingAfterMove <= config.getMaxSizeLeavingSource() && + Double.compare(nodeManager.getUsageInfo(source) + .calculateUtilization(-sizeLeavingAfterMove), lowerLimit) >= 0; + } + return false; + } + + /** + * reInitialize FindSourceStrategy. + */ + @Override + public void reInitialize(List potentialDataNodes, + ContainerBalancerConfiguration conf, + Double lowLimit) { + setConfiguration(conf); + setLowerLimit(lowLimit); + setPotentialSources(potentialDataNodes); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceStrategy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceStrategy.java new file mode 100644 index 000000000000..826ecac6d069 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceStrategy.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.balancer; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; + +import java.util.List; + +/** + * This interface can be used to implement strategies to get a + * source datanode. + */ +public interface FindSourceStrategy { + + /** + * get the next candidate source data node according to + * the strategy. + * + * @return the nex candidate source data node. + */ + DatanodeDetails getNextCandidateSourceDataNode(); + + /** + * remove the specified data node from candidate source + * data nodes. + */ + void removeCandidateSourceDataNode(DatanodeDetails dui); + + /** + * increase the Leaving size of a candidate source data node. + */ + void increaseSizeLeaving(DatanodeDetails dui, long size); + + /** + * Checks if specified size can leave a specified source datanode + * according to {@link ContainerBalancerConfiguration} + * "size.entering.target.max". + * + * @param source target datanode in which size is entering + * @param size size in bytes + * @return true if size can leave, else false + */ + boolean canSizeLeaveSource(DatanodeDetails source, long size); + + /** + * reInitialize FindSourceStrategy. + */ + void reInitialize(List potentialDataNodes, + ContainerBalancerConfiguration config, Double lowerLimit); +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetGreedy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetGreedy.java index 9d3deb435e9e..550894314655 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetGreedy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetGreedy.java @@ -26,13 +26,17 @@ import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; +import org.apache.hadoop.hdds.scm.node.NodeManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; -import java.util.function.BiFunction; +import java.util.TreeSet; +import java.util.UUID; import java.util.stream.Collectors; /** @@ -44,37 +48,72 @@ public class FindTargetGreedy implements FindTargetStrategy { private ContainerManager containerManager; private PlacementPolicy placementPolicy; + private Map sizeEnteringNode; + private NodeManager nodeManager; + private ContainerBalancerConfiguration config; + private Double upperLimit; + private TreeSet potentialTargets; public FindTargetGreedy( ContainerManager containerManager, - PlacementPolicy placementPolicy) { + PlacementPolicy placementPolicy, + NodeManager nodeManager) { + sizeEnteringNode = new HashMap<>(); this.containerManager = containerManager; this.placementPolicy = placementPolicy; + this.nodeManager = nodeManager; + + potentialTargets = new TreeSet<>((a, b) -> { + double currentUsageOfA = a.calculateUtilization( + sizeEnteringNode.get(a.getDatanodeDetails())); + double currentUsageOfB = b.calculateUtilization( + sizeEnteringNode.get(b.getDatanodeDetails())); + int ret = Double.compare(currentUsageOfA, currentUsageOfB); + if (ret != 0) { + return ret; + } + UUID uuidA = a.getDatanodeDetails().getUuid(); + UUID uuidB = b.getDatanodeDetails().getUuid(); + return uuidA.compareTo(uuidB); + }); + } + + private void setUpperLimit(Double upperLimit){ + this.upperLimit = upperLimit; + } + + private void setPotentialTargets( + List potentialTargetDataNodes) { + sizeEnteringNode.clear(); + potentialTargetDataNodes.forEach( + p -> sizeEnteringNode.put(p.getDatanodeDetails(), 0L)); + potentialTargets.clear(); + potentialTargets.addAll(potentialTargetDataNodes); + } + + private void setConfiguration(ContainerBalancerConfiguration conf) { + this.config = conf; } /** * Find a {@link ContainerMoveSelection} consisting of a target and * container to move for a source datanode. Favours more under-utilized nodes. * @param source Datanode to find a target for - * @param potentialTargets Collection of potential target datanodes * @param candidateContainers Set of candidate containers satisfying * selection criteria * {@link ContainerBalancerSelectionCriteria} - * @param canSizeEnterTarget A functional interface whose apply * (DatanodeDetails, Long) method returns true if the size specified in the * second argument can enter the specified DatanodeDetails node * @return Found target and container */ @Override public ContainerMoveSelection findTargetForContainerMove( - DatanodeDetails source, Collection potentialTargets, - Set candidateContainers, - BiFunction canSizeEnterTarget) { - for (DatanodeDetails target : potentialTargets) { + DatanodeDetails source, Set candidateContainers) { + for (DatanodeUsageInfo targetInfo : potentialTargets) { + DatanodeDetails target = targetInfo.getDatanodeDetails(); for (ContainerID container : candidateContainers) { Set replicas; ContainerInfo containerInfo; - try { replicas = containerManager.getContainerReplicas(container); containerInfo = containerManager.getContainer(container); @@ -88,7 +127,7 @@ public ContainerMoveSelection findTargetForContainerMove( replica -> replica.getDatanodeDetails().equals(target)) && containerMoveSatisfiesPlacementPolicy(container, replicas, source, target) && - canSizeEnterTarget.apply(target, containerInfo.getUsedBytes())) { + canSizeEnterTarget(target, containerInfo.getUsedBytes())) { return new ContainerMoveSelection(target, container); } } @@ -107,8 +146,7 @@ public ContainerMoveSelection findTargetForContainerMove( * @param target Target datanode for container move * @return true if placement policy is satisfied, otherwise false */ - @Override - public boolean containerMoveSatisfiesPlacementPolicy( + private boolean containerMoveSatisfiesPlacementPolicy( ContainerID containerID, Set replicas, DatanodeDetails source, DatanodeDetails target) { ContainerInfo containerInfo; @@ -132,4 +170,61 @@ public boolean containerMoveSatisfiesPlacementPolicy( return placementStatus.isPolicySatisfied(); } + + /** + * Checks if specified size can enter specified target datanode + * according to {@link ContainerBalancerConfiguration} + * "size.entering.target.max". + * + * @param target target datanode in which size is entering + * @param size size in bytes + * @return true if size can enter target, else false + */ + private boolean canSizeEnterTarget(DatanodeDetails target, long size) { + if (sizeEnteringNode.containsKey(target)) { + long sizeEnteringAfterMove = sizeEnteringNode.get(target) + size; + //size can be moved into target datanode only when the following + //two condition are met. + //1 sizeEnteringAfterMove does not succeed the configured + // MaxSizeEnteringTarget + //2 current usage of target datanode plus sizeEnteringAfterMove + // is smaller than or equal to upperLimit + return sizeEnteringAfterMove <= config.getMaxSizeEnteringTarget() && + Double.compare(nodeManager.getUsageInfo(target) + .calculateUtilization(sizeEnteringAfterMove), upperLimit) <= 0; + } + return false; + } + + /** + * increase the Entering size of a candidate target data node. + */ + @Override + public void increaseSizeEntering(DatanodeDetails target, long size) { + if(sizeEnteringNode.containsKey(target)) { + long totalEnteringSize = sizeEnteringNode.get(target) + size; + sizeEnteringNode.put(target, totalEnteringSize); + potentialTargets.removeIf( + c -> c.getDatanodeDetails().equals(target)); + if(totalEnteringSize < config.getMaxSizeEnteringTarget()) { + //reorder + potentialTargets.add(nodeManager.getUsageInfo(target)); + } + return; + } + LOG.warn("Cannot find {} in the candidates target nodes", + target.getUuid()); + } + + /** + * reInitialize FindTargetStrategy with the given new parameters. + */ + @Override + public void reInitialize(List potentialDataNodes, + ContainerBalancerConfiguration conf, + Double upLimit) { + setConfiguration(conf); + setUpperLimit(upLimit); + setPotentialTargets(potentialDataNodes); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetStrategy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetStrategy.java index 444f365cf9ee..de87860fd31f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetStrategy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindTargetStrategy.java @@ -20,11 +20,10 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; -import java.util.Collection; +import java.util.List; import java.util.Set; -import java.util.function.BiFunction; /** * This interface can be used to implement strategies to find a target for a @@ -39,33 +38,26 @@ public interface FindTargetStrategy { * enter a potential target. * * @param source Datanode to find a target for - * @param potentialTargets Collection of potential target datanodes * @param candidateContainers Set of candidate containers satisfying * selection criteria * {@link ContainerBalancerSelectionCriteria} - * @param canSizeEnterTarget A functional interface whose apply * (DatanodeDetails, Long) method returns true if the size specified in the * second argument can enter the specified DatanodeDetails node * @return {@link ContainerMoveSelection} containing the target node and * selected container */ ContainerMoveSelection findTargetForContainerMove( - DatanodeDetails source, Collection potentialTargets, - Set candidateContainers, - BiFunction canSizeEnterTarget); + DatanodeDetails source, Set candidateContainers); /** - * Checks whether moving the specified container from the specified source - * to target datanode will satisfy the placement policy. - * - * @param containerID Container to be moved from source to target - * @param replicas Set of replicas of the given container - * @param source Source datanode for container move - * @param target Target datanode for container move - * @return true if placement policy is satisfied + * increase the Entering size of a candidate target data node. */ - boolean containerMoveSatisfiesPlacementPolicy(ContainerID containerID, - Set replicas, - DatanodeDetails source, - DatanodeDetails target); + void increaseSizeEntering(DatanodeDetails target, long size); + + /** + * reInitialize FindTargetStrategy. + */ + void reInitialize(List potentialDataNodes, + ContainerBalancerConfiguration config, Double upperLimit); + }