From 09a0b8e1b721c7484447b21ae92e05c2b9cd3df8 Mon Sep 17 00:00:00 2001 From: Symious Date: Tue, 16 Aug 2022 10:40:22 +0800 Subject: [PATCH] HDDS-7106. [DiskBalancer] Client-SCM interface (#3663) --- .../hadoop/hdds/scm/client/ScmClient.java | 41 ++++ .../StorageContainerLocationProtocol.java | 30 +++ .../storage/DiskBalancerConfiguration.java | 165 ++++++++++++++++ .../apache/hadoop/hdds/conf/ConfigTag.java | 3 +- ...ocationProtocolClientSideTranslatorPB.java | 56 ++++++ .../src/main/proto/ScmAdminProtocol.proto | 21 +++ .../src/main/proto/hdds.proto | 13 ++ .../hdds/scm/node/DiskBalancerManager.java | 176 ++++++++++++++++++ .../hdds/scm/node/DiskBalancerStatus.java | 50 +++++ .../scm/node/NodeDecommissionManager.java | 123 +----------- .../hadoop/hdds/scm/node/NodeUtils.java | 150 +++++++++++++++ ...ocationProtocolServerSideTranslatorPB.java | 30 +++ .../scm/server/SCMClientProtocolServer.java | 51 +++++ .../scm/server/StorageContainerManager.java | 8 + .../scm/node/TestDiskBalancerManager.java | 99 ++++++++++ .../scm/node/TestNodeDecommissionManager.java | 13 +- .../scm/cli/ContainerOperationClient.java | 36 ++++ .../ozone/scm/node/TestDiskBalancer.java | 96 ++++++++++ 18 files changed, 1036 insertions(+), 125 deletions(-) create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeUtils.java create mode 100644 hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java index daeb5c5ddfca..76c07d1c1b32 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java @@ -398,4 +398,45 @@ StatusAndMessages finalizeScmUpgrade(String upgradeClientID) StatusAndMessages queryUpgradeFinalizationProgress( String upgradeClientID, boolean force, boolean readonly) throws IOException; + + /** + * Get DiskBalancer status. + * @param count top datanodes that need balancing + * @return List of DatanodeDiskBalancerInfo. + * @throws IOException + */ + List getDiskBalancerReport( + int count) throws IOException; + + /** + * Get DiskBalancer status. + * @param hosts If hosts is not null, return status of hosts; If hosts is + * null, return status of all datanodes in balancing. + * @return List of DatanodeDiskBalancerInfo. + * @throws IOException + */ + List getDiskBalancerStatus( + Optional> hosts) throws IOException; + + /** + * Start DiskBalancer. + */ + void startDiskBalancer( + Optional threshold, + Optional bandwidth, + Optional> hosts) throws IOException; + + /** + * Stop DiskBalancer. + */ + void stopDiskBalancer(Optional> hosts) throws IOException; + + + /** + * Update DiskBalancer Configuration. + */ + void updateDiskBalancerConfiguration( + Optional threshold, + Optional bandwidth, + Optional> hosts) throws IOException; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java index 51f2bb3f64db..966461345599 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java @@ -426,4 +426,34 @@ StatusAndMessages queryUpgradeFinalizationProgress( Token getContainerToken(ContainerID containerID) throws IOException; long getContainerCount() throws IOException; + + List getDiskBalancerReport( + int count, int clientVersion) throws IOException; + + /** + * Get DiskBalancer status. + */ + List getDiskBalancerStatus( + Optional> hosts, int clientVersion) throws IOException; + + /** + * Start DiskBalancer. + */ + void startDiskBalancer( + Optional threshold, + Optional bandwidth, + Optional> hosts) throws IOException; + + /** + * Stop DiskBalancer. + */ + void stopDiskBalancer(Optional> hosts) throws IOException; + + /** + * Update DiskBalancer Configuration. + */ + void updateDiskBalancerConfiguration( + Optional threshold, + Optional bandwidth, + Optional> hosts) throws IOException; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java new file mode 100644 index 000000000000..704b383679b0 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.storage; + +import org.apache.hadoop.hdds.conf.Config; +import org.apache.hadoop.hdds.conf.ConfigGroup; +import org.apache.hadoop.hdds.conf.ConfigTag; +import org.apache.hadoop.hdds.conf.ConfigType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class contains configuration values for the DiskBalancer. + */ +@ConfigGroup(prefix = "hdds.datanode.disk.balancer") +public final class DiskBalancerConfiguration { + private static final Logger LOG = + LoggerFactory.getLogger(DiskBalancerConfiguration.class); + + @Config(key = "volume.density.threshold", type = ConfigType.AUTO, + defaultValue = "10", tags = {ConfigTag.DISKBALANCER}, + description = "Threshold is a percentage in the range of 0 to 100. A " + + "datanode is considered balanced if for each volume, the " + + "utilization of the volume(used space to capacity ratio) differs" + + " from the utilization of the datanode(used space to capacity ratio" + + " of the entire datanode) no more than the threshold.") + private double threshold = 10d; + + @Config(key = "max.disk.throughputInMBPerSec", type = ConfigType.AUTO, + defaultValue = "10", tags = {ConfigTag.DISKBALANCER}, + description = "The max balance speed.") + private double diskBandwidth = 10; + + @Config(key = "parallel.thread", type = ConfigType.AUTO, + defaultValue = "5", tags = {ConfigTag.DISKBALANCER}, + description = "The max parallel balance thread count.") + private int parallelThread = 5; + + /** + * Gets the threshold value for DiskBalancer. + * + * @return percentage value in the range 0 to 100 + */ + public double getThreshold() { + return threshold; + } + + public double getThresholdAsRatio() { + return threshold / 100; + } + + /** + * Sets the threshold value for Disk Balancer. + * + * @param threshold a percentage value in the range 0 to 100 + */ + public void setThreshold(double threshold) { + if (threshold < 0d || threshold >= 100d) { + throw new IllegalArgumentException( + "Threshold must be a percentage(double) in the range 0 to 100."); + } + this.threshold = threshold; + } + + /** + * Gets the disk bandwidth value for Disk Balancer. + * + * @return max disk bandwidth per second + */ + public double getDiskBandwidth() { + return diskBandwidth; + } + + /** + * Sets the disk bandwidth value for Disk Balancer. + * + * @param diskBandwidth the bandwidth to control balance speed + */ + public void setDiskBandwidth(double diskBandwidth) { + if (diskBandwidth <= 0d) { + throw new IllegalArgumentException( + "diskBandwidth must be a value larger than 0."); + } + this.diskBandwidth = diskBandwidth; + } + + /** + * Gets the parallel thread for Disk Balancer. + * + * @return parallel thread + */ + public int getParallelThread() { + return parallelThread; + } + + /** + * Sets the parallel thread for Disk Balancer. + * + * @param parallelThread the parallel thread count + */ + public void setParallelThread(int parallelThread) { + if (parallelThread <= 0) { + throw new IllegalArgumentException( + "parallelThread must be a value larger than 0."); + } + this.parallelThread = parallelThread; + } + @Override + public String toString() { + return String.format("Disk Balancer Configuration values:%n" + + "%-50s %s%n" + + "%-50s %s%n" + + "%-50s %s%n" + + "%-50s %s%n", + "Key", "Value", + "Threshold", threshold, "Max disk bandwidth", diskBandwidth, + "Parallel Thread", parallelThread); + } + + public HddsProtos.DiskBalancerConfigurationProto.Builder toProtobufBuilder() { + HddsProtos.DiskBalancerConfigurationProto.Builder builder = + HddsProtos.DiskBalancerConfigurationProto.newBuilder(); + + builder.setThreshold(threshold) + .setDiskBandwidth(diskBandwidth) + .setParallelThread(parallelThread); + return builder; + } + + static DiskBalancerConfiguration fromProtobuf( + @NotNull HddsProtos.DiskBalancerConfigurationProto proto, + @NotNull OzoneConfiguration ozoneConfiguration) { + DiskBalancerConfiguration config = + ozoneConfiguration.getObject(DiskBalancerConfiguration.class); + if (proto.hasThreshold()) { + config.setThreshold(proto.getThreshold()); + } + if (proto.hasDiskBandwidth()) { + config.setDiskBandwidth(proto.getDiskBandwidth()); + } + if (proto.hasParallelThread()) { + config.setParallelThread(proto.getParallelThread()); + } + return config; + } +} diff --git a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java index 8cf584d75f61..9be8bdc6793e 100644 --- a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java +++ b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java @@ -46,5 +46,6 @@ public enum ConfigTag { DELETION, HA, BALANCER, - UPGRADE + UPGRADE, + DISKBALANCER } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index b8e377def241..e820bce807c0 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -42,6 +42,9 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeAdminErrorResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeUsageInfoRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeUsageInfoResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoType; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesResponseProto; @@ -1018,6 +1021,59 @@ public long getContainerCount() throws IOException { return response.getContainerCount(); } + @Override + public List getDiskBalancerReport( + int count, int clientVersion) throws IOException { + DatanodeDiskBalancerInfoRequestProto request = + DatanodeDiskBalancerInfoRequestProto.newBuilder() + .setInfoType(DatanodeDiskBalancerInfoType.report) + .setCount(count) + .build(); + + DatanodeDiskBalancerInfoResponseProto response = + submitRequest(Type.DatanodeDiskBalancerInfo, + builder -> builder.setDatanodeDiskBalancerInfoRequest(request)) + .getDatanodeDiskBalancerInfoResponse(); + + return response.getInfoList(); + } + + @Override + public List getDiskBalancerStatus( + Optional> hosts, int clientVersion) throws IOException { + DatanodeDiskBalancerInfoRequestProto.Builder requestBuilder = + DatanodeDiskBalancerInfoRequestProto.newBuilder() + .setInfoType(DatanodeDiskBalancerInfoType.status); + hosts.ifPresent(requestBuilder::addAllHosts); + DatanodeDiskBalancerInfoRequestProto request = requestBuilder.build(); + + DatanodeDiskBalancerInfoResponseProto response = + submitRequest(Type.DatanodeDiskBalancerInfo, + builder -> builder.setDatanodeDiskBalancerInfoRequest(request)) + .getDatanodeDiskBalancerInfoResponse(); + + return response.getInfoList(); + } + + @Override + public void startDiskBalancer(Optional threshold, + Optional bandwidth, Optional> hosts) + throws IOException { + + } + + @Override + public void stopDiskBalancer(Optional> hosts) + throws IOException { + + } + + @Override + public void updateDiskBalancerConfiguration(Optional threshold, + Optional bandwidth, Optional> hosts) + throws IOException { + } + @Override public Object getUnderlyingProxyObject() { return rpcProxy; diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto index ccb5e2155e44..7054ee391060 100644 --- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto +++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto @@ -78,6 +78,7 @@ message ScmContainerLocationRequest { optional GetContainerReplicasRequestProto getContainerReplicasRequest = 39; optional ReplicationManagerReportRequestProto replicationManagerReportRequest = 40; optional ResetDeletedBlockRetryCountRequestProto resetDeletedBlockRetryCountRequest = 41; + optional DatanodeDiskBalancerInfoRequestProto DatanodeDiskBalancerInfoRequest = 42; } message ScmContainerLocationResponse { @@ -127,6 +128,7 @@ message ScmContainerLocationResponse { optional GetContainerReplicasResponseProto getContainerReplicasResponse = 39; optional ReplicationManagerReportResponseProto getReplicationManagerReportResponse = 40; optional ResetDeletedBlockRetryCountResponseProto resetDeletedBlockRetryCountResponse = 41; + optional DatanodeDiskBalancerInfoResponseProto DatanodeDiskBalancerInfoResponse = 42; enum Status { OK = 1; @@ -174,6 +176,7 @@ enum Type { GetContainerReplicas = 34; GetReplicationManagerReport = 35; ResetDeletedBlockRetryCount = 36; + DatanodeDiskBalancerInfo= 37; } /** @@ -325,6 +328,24 @@ message DatanodeUsageInfoResponseProto { repeated DatanodeUsageInfoProto info = 1; } +enum DatanodeDiskBalancerInfoType{ + report = 1; + status = 2; +} + +/* + Datanode disk balancer status request message. +*/ +message DatanodeDiskBalancerInfoRequestProto { + required DatanodeDiskBalancerInfoType infoType = 1; + optional uint32 count = 2; + repeated string hosts = 3; +} + +message DatanodeDiskBalancerInfoResponseProto { + repeated DatanodeDiskBalancerInfoProto info = 1; +} + /* Decommission a list of hosts */ diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto index 555431b199ec..a641f88bbefc 100644 --- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto +++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto @@ -451,3 +451,16 @@ message ContainerBalancerConfigurationProto { required bool shouldRun = 18; optional int32 nextIterationIndex = 19; } + +message DiskBalancerConfigurationProto { + optional double threshold = 1; + optional double diskBandwidth = 2; + optional int32 parallelThread = 3; +} + +message DatanodeDiskBalancerInfoProto { + required DatanodeDetailsProto node = 1; + required double currentVolumeDensitySum = 2; + optional bool diskBalancerRunning = 3; + optional DiskBalancerConfigurationProto diskBalancerConf = 4; +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java new file mode 100644 index 000000000000..5ca66e9eefa4 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.node; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE; + +/** + * Maintains information about the DiskBalancer on SCM side. + */ +public class DiskBalancerManager { + public static final Logger LOG = + LoggerFactory.getLogger(DiskBalancerManager.class); + + private final EventPublisher scmNodeEventPublisher; + private final SCMContext scmContext; + private final NodeManager nodeManager; + private Map statusMap; + private boolean useHostnames; + + /** + * Constructs DiskBalancer Manager. + */ + public DiskBalancerManager(OzoneConfiguration conf, + EventPublisher eventPublisher, + SCMContext scmContext, + NodeManager nodeManager) { + this.scmNodeEventPublisher = eventPublisher; + this.scmContext = scmContext; + this.nodeManager = nodeManager; + this.useHostnames = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME, + DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT); + this.statusMap = new ConcurrentHashMap<>(); + } + + public List getDiskBalancerReport( + int count, int clientVersion) throws IOException { + + List reportList = + new ArrayList<>(); + + for (DatanodeDetails datanodeDetails: nodeManager.getNodes(IN_SERVICE, + HddsProtos.NodeState.HEALTHY)) { + double volumeDensitySum = + getVolumeDataDensitySumForDatanodeDetails(datanodeDetails); + reportList.add(HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder() + .setCurrentVolumeDensitySum(volumeDensitySum) + .setNode(datanodeDetails.toProto(clientVersion)) + .build()); + } + + reportList.sort((t1, t2) -> Double.compare(t2.getCurrentVolumeDensitySum(), + t1.getCurrentVolumeDensitySum())); + return reportList.stream().limit(count).collect(Collectors.toList()); + } + + /** + * If hosts is not null, return status of hosts; + * If hosts is null, return status of all datanodes in balancing. + */ + public List getDiskBalancerStatus( + Optional> hosts, int clientVersion) throws IOException { + List statusList = + new ArrayList<>(); + List filterDns = null; + if (hosts.isPresent() && !hosts.get().isEmpty()) { + filterDns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(), + useHostnames); + } + + for (DatanodeDetails datanodeDetails: nodeManager.getNodes(IN_SERVICE, + HddsProtos.NodeState.HEALTHY)) { + if (shouldReturnDatanode(filterDns, datanodeDetails)) { + double volumeDensitySum = + getVolumeDataDensitySumForDatanodeDetails(datanodeDetails); + statusList.add(HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder() + .setCurrentVolumeDensitySum(volumeDensitySum) + .setDiskBalancerRunning(isRunning(datanodeDetails)) + .setDiskBalancerConf(statusMap.getOrDefault(datanodeDetails, + DiskBalancerStatus.DUMMY_STATUS) + .getDiskBalancerConfiguration().toProtobufBuilder()) + .setNode(datanodeDetails.toProto(clientVersion)) + .build()); + } + } + return statusList; + } + + private boolean shouldReturnDatanode(List hosts, + DatanodeDetails datanodeDetails) { + if (hosts == null || hosts.isEmpty()) { + return isRunning(datanodeDetails); + } else { + return hosts.contains(datanodeDetails); + } + } + + /** + * Get volume density for a specific DatanodeDetails node. + * + * @param datanodeDetails DatanodeDetails + * @return DiskBalancer report. + */ + private double getVolumeDataDensitySumForDatanodeDetails( + DatanodeDetails datanodeDetails) { + Preconditions.checkArgument(datanodeDetails instanceof DatanodeInfo); + + DatanodeInfo datanodeInfo = (DatanodeInfo) datanodeDetails; + + double totalCapacity = 0d, totalUsed = 0d; + for (StorageContainerDatanodeProtocolProtos.StorageReportProto reportProto : + datanodeInfo.getStorageReports()) { + totalCapacity += reportProto.getCapacity(); + totalUsed += reportProto.getScmUsed(); + } + + Preconditions.checkArgument(totalCapacity != 0); + double idealUsage = totalUsed / totalCapacity; + + double volumeDensitySum = datanodeInfo.getStorageReports().stream() + .map(report -> + Math.abs((double)report.getScmUsed() / report.getCapacity() + - idealUsage)) + .mapToDouble(Double::valueOf).sum(); + + return volumeDensitySum; + } + + private boolean isRunning(DatanodeDetails datanodeDetails) { + return statusMap + .getOrDefault(datanodeDetails, DiskBalancerStatus.DUMMY_STATUS) + .isRunning(); + } + + @VisibleForTesting + public void addRunningDatanode(DatanodeDetails datanodeDetails) { + statusMap.put(datanodeDetails, new DiskBalancerStatus(true, + new DiskBalancerConfiguration())); + } +} \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java new file mode 100644 index 000000000000..ed22e80e3c1a --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.node; + +import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Maintains DiskBalancerConfiguration and isRunning. + */ +public class DiskBalancerStatus { + public static final Logger LOG = + LoggerFactory.getLogger(DiskBalancerStatus.class); + + private boolean isRunning; + private DiskBalancerConfiguration diskBalancerConfiguration; + + public static final DiskBalancerStatus DUMMY_STATUS = + new DiskBalancerStatus(false, new DiskBalancerConfiguration()); + + public DiskBalancerStatus(boolean isRunning, DiskBalancerConfiguration conf) { + this.isRunning = isRunning; + this.diskBalancerConfiguration = conf; + } + + public boolean isRunning() { + return isRunning; + } + + public DiskBalancerConfiguration getDiskBalancerConfiguration() { + return diskBalancerConfiguration; + } +} \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java index 1ea04cdfc3ce..ef7605a55408 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java @@ -32,12 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.InetAddress; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -63,115 +58,6 @@ public class NodeDecommissionManager { private static final Logger LOG = LoggerFactory.getLogger(NodeDecommissionManager.class); - static class HostDefinition { - private String rawHostname; - private String hostname; - private int port; - - HostDefinition(String hostname) throws InvalidHostStringException { - this.rawHostname = hostname; - parseHostname(); - } - - public String getRawHostname() { - return rawHostname; - } - - public String getHostname() { - return hostname; - } - - public int getPort() { - return port; - } - - private void parseHostname() throws InvalidHostStringException { - try { - // A URI *must* have a scheme, so just create a fake one - URI uri = new URI("empty://" + rawHostname.trim()); - this.hostname = uri.getHost(); - this.port = uri.getPort(); - - if (this.hostname == null) { - throw new InvalidHostStringException("The string " + rawHostname + - " does not contain a value hostname or hostname:port definition"); - } - } catch (URISyntaxException e) { - throw new InvalidHostStringException( - "Unable to parse the hoststring " + rawHostname, e); - } - } - } - - private List mapHostnamesToDatanodes(List hosts) - throws InvalidHostStringException { - List results = new LinkedList<>(); - for (String hostString : hosts) { - HostDefinition host = new HostDefinition(hostString); - InetAddress addr; - try { - addr = InetAddress.getByName(host.getHostname()); - } catch (UnknownHostException e) { - throw new InvalidHostStringException("Unable to resolve host " - + host.getRawHostname(), e); - } - String dnsName; - if (useHostnames) { - dnsName = addr.getHostName(); - } else { - dnsName = addr.getHostAddress(); - } - List found = nodeManager.getNodesByAddress(dnsName); - if (found.size() == 0) { - throw new InvalidHostStringException("Host " + host.getRawHostname() - + " (" + dnsName + ") is not running any datanodes registered" - + " with SCM." - + " Please check the host name."); - } else if (found.size() == 1) { - if (host.getPort() != -1 && - !validateDNPortMatch(host.getPort(), found.get(0))) { - throw new InvalidHostStringException("Host " + host.getRawHostname() - + " is running a datanode registered with SCM," - + " but the port number doesn't match." - + " Please check the port number."); - } - results.add(found.get(0)); - } else if (found.size() > 1) { - DatanodeDetails match = null; - for (DatanodeDetails dn : found) { - if (validateDNPortMatch(host.getPort(), dn)) { - match = dn; - break; - } - } - if (match == null) { - throw new InvalidHostStringException("Host " + host.getRawHostname() - + " is running multiple datanodes registered with SCM," - + " but no port numbers match." - + " Please check the port number."); - } - results.add(match); - } - } - return results; - } - - /** - * Check if the passed port is used by the given DatanodeDetails object. If - * it is, return true, otherwise return false. - * @param port Port number to check if it is used by the datanode - * @param dn Datanode to check if it is using the given port - * @return True if port is used by the datanode. False otherwise. - */ - private boolean validateDNPortMatch(int port, DatanodeDetails dn) { - for (DatanodeDetails.Port p : dn.getPorts()) { - if (p.getValue() == port) { - return true; - } - } - return false; - } - public NodeDecommissionManager(OzoneConfiguration config, NodeManager nm, ContainerManager containerManager, SCMContext scmContext, EventPublisher eventQueue, ReplicationManager rm) { @@ -220,7 +106,8 @@ public DatanodeAdminMonitor getMonitor() { public synchronized List decommissionNodes( List nodes) throws InvalidHostStringException { - List dns = mapHostnamesToDatanodes(nodes); + List dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, + nodes, useHostnames); List errors = new ArrayList<>(); for (DatanodeDetails dn : dns) { try { @@ -285,7 +172,8 @@ public synchronized void startDecommission(DatanodeDetails dn) public synchronized List recommissionNodes( List nodes) throws InvalidHostStringException { - List dns = mapHostnamesToDatanodes(nodes); + List dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, + nodes, useHostnames); List errors = new ArrayList<>(); for (DatanodeDetails dn : dns) { try { @@ -322,7 +210,8 @@ public synchronized void recommission(DatanodeDetails dn) public synchronized List startMaintenanceNodes( List nodes, int endInHours) throws InvalidHostStringException { - List dns = mapHostnamesToDatanodes(nodes); + List dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, + nodes, useHostnames); List errors = new ArrayList<>(); for (DatanodeDetails dn : dns) { try { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeUtils.java new file mode 100644 index 000000000000..6df74d374672 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeUtils.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.node; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.LinkedList; +import java.util.List; + +/** + * Util class for Node operations. + */ +public final class NodeUtils { + + private static final Logger LOG = LoggerFactory.getLogger(NodeUtils.class); + + private NodeUtils() { + } + + public static List mapHostnamesToDatanodes( + NodeManager nodeManager, List hosts, boolean useHostnames) + throws InvalidHostStringException { + List results = new LinkedList<>(); + for (String hostString : hosts) { + HostDefinition host = new HostDefinition(hostString); + InetAddress addr; + try { + addr = InetAddress.getByName(host.getHostname()); + } catch (UnknownHostException e) { + throw new InvalidHostStringException("Unable to resolve host " + + host.getRawHostname(), e); + } + String dnsName; + if (useHostnames) { + dnsName = addr.getHostName(); + } else { + dnsName = addr.getHostAddress(); + } + List found = nodeManager.getNodesByAddress(dnsName); + if (found.size() == 0) { + throw new InvalidHostStringException("Host " + host.getRawHostname() + + " (" + dnsName + ") is not running any datanodes registered" + + " with SCM." + + " Please check the host name."); + } else if (found.size() == 1) { + if (host.getPort() != -1 && + !validateDNPortMatch(host.getPort(), found.get(0))) { + throw new InvalidHostStringException("Host " + host.getRawHostname() + + " is running a datanode registered with SCM," + + " but the port number doesn't match." + + " Please check the port number."); + } + results.add(found.get(0)); + } else if (found.size() > 1) { + DatanodeDetails match = null; + for (DatanodeDetails dn : found) { + if (validateDNPortMatch(host.getPort(), dn)) { + match = dn; + break; + } + } + if (match == null) { + throw new InvalidHostStringException("Host " + host.getRawHostname() + + " is running multiple datanodes registered with SCM," + + " but no port numbers match." + + " Please check the port number."); + } + results.add(match); + } + } + return results; + } + + /** + * Check if the passed port is used by the given DatanodeDetails object. If + * it is, return true, otherwise return false. + * @param port Port number to check if it is used by the datanode + * @param dn Datanode to check if it is using the given port + * @return True if port is used by the datanode. False otherwise. + */ + private static boolean validateDNPortMatch(int port, DatanodeDetails dn) { + for (DatanodeDetails.Port p : dn.getPorts()) { + if (p.getValue() == port) { + return true; + } + } + return false; + } + static class HostDefinition { + private String rawHostname; + private String hostname; + private int port; + + HostDefinition(String hostname) throws InvalidHostStringException { + this.rawHostname = hostname; + parseHostname(); + } + + public String getRawHostname() { + return rawHostname; + } + + public String getHostname() { + return hostname; + } + + public int getPort() { + return port; + } + + private void parseHostname() throws InvalidHostStringException { + try { + // A URI *must* have a scheme, so just create a fake one + URI uri = new URI("empty://" + rawHostname.trim()); + this.hostname = uri.getHost(); + this.port = uri.getPort(); + + if (this.hostname == null) { + throw new InvalidHostStringException("The string " + rawHostname + + " does not contain a value hostname or hostname:port definition"); + } + } catch (URISyntaxException e) { + throw new InvalidHostStringException( + "Unable to parse the hoststring " + rawHostname, e); + } + } + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java index e59c984174f5..f00a57e36636 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeAdminErrorResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeUsageInfoResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesRequestProto; @@ -656,6 +657,14 @@ public ScmContainerLocationResponse processRequest( getResetDeletedBlockRetryCount( request.getResetDeletedBlockRetryCountRequest())) .build(); + case DatanodeDiskBalancerInfo: + return ScmContainerLocationResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(Status.OK) + .setDatanodeDiskBalancerInfoResponse(getDatanodeDiskBalancerInfo( + request.getDatanodeDiskBalancerInfoRequest(), + request.getVersion())) + .build(); default: throw new IllegalArgumentException( "Unknown command type: " + request.getCmdType()); @@ -1157,4 +1166,25 @@ public GetContainerCountResponseProto getContainerCount( request.getTransactionIdList())) .build(); } + public DatanodeDiskBalancerInfoResponseProto getDatanodeDiskBalancerInfo( + StorageContainerLocationProtocolProtos. + DatanodeDiskBalancerInfoRequestProto request, int clientVersion) + throws IOException { + List infoProtoList; + switch (request.getInfoType()) { + case report: + infoProtoList = impl.getDiskBalancerReport(request.getCount(), + clientVersion); + break; + case status: + infoProtoList = impl.getDiskBalancerStatus( + Optional.of(request.getHostsList()), clientVersion); + break; + default: + infoProtoList = null; + } + return DatanodeDiskBalancerInfoResponseProto.newBuilder() + .addAllInfo(infoProtoList) + .build(); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index 5d6b357d652e..1afd264875fe 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -1123,6 +1123,57 @@ public long getContainerCount() throws IOException { return scm.getContainerManager().getContainers().size(); } + @Override + public List getDiskBalancerReport( + int count, int clientVersion) throws IOException { + // check admin authorisation + try { + getScm().checkAdminAccess(getRemoteUser()); + } catch (IOException e) { + LOG.error("Authorization failed", e); + throw e; + } + + return scm.getDiskBalancerManager().getDiskBalancerReport(count, + clientVersion); + } + + @Override + public List getDiskBalancerStatus( + Optional> hosts, int clientVersion) throws IOException { + // check admin authorisation + try { + getScm().checkAdminAccess(getRemoteUser()); + } catch (IOException e) { + LOG.error("Authorization failed", e); + throw e; + } + + return scm.getDiskBalancerManager().getDiskBalancerStatus(hosts, + clientVersion); + } + + @Override + public void startDiskBalancer(Optional threshold, + Optional bandwidth, Optional> hosts) + throws IOException { + // TODO: Send message to datanodes + } + + @Override + public void stopDiskBalancer(Optional> hosts) + throws IOException { + // TODO: Send message to datanodes + } + + + @Override + public void updateDiskBalancerConfiguration(Optional threshold, + Optional bandwidth, Optional> hosts) + throws IOException { + // TODO: Send message to datanodes + } + /** * Queries a list of Node that match a set of statuses. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 68aafdaaa295..40d08fa7ec8b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMHAUtils; import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator; import org.apache.hadoop.hdds.scm.ScmInfo; +import org.apache.hadoop.hdds.scm.node.DiskBalancerManager; import org.apache.hadoop.hdds.scm.node.NodeAddressUpdateHandler; import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager; import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManagerImpl; @@ -231,6 +232,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl private WritableContainerFactory writableContainerFactory; private FinalizationManager finalizationManager; private HDDSLayoutVersionManager scmLayoutVersionManager; + private DiskBalancerManager diskBalancerManager; private SCMMetadataStore scmMetadataStore; private CertificateStore certificateStore; @@ -780,6 +782,8 @@ private void initializeSystemManagers(OzoneConfiguration conf, .setSCMDBTransactionBuffer(scmHAManager.getDBTransactionBuffer()) .setRatisServer(scmHAManager.getRatisServer()) .build(); + diskBalancerManager = new DiskBalancerManager(conf, eventQueue, scmContext, + scmNodeManager); } /** @@ -1921,6 +1925,10 @@ public SCMServiceManager getSCMServiceManager() { return serviceManager; } + public DiskBalancerManager getDiskBalancerManager() { + return diskBalancerManager; + } + /** * Force SCM out of safe mode. */ diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java new file mode 100644 index 000000000000..541c9764b743 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.node; + +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.MockNodeManager; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.server.events.EventQueue; +import org.apache.hadoop.ozone.ClientVersion; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; + +/** + * Unit tests for the DiskBalancer manager. + */ + +public class TestDiskBalancerManager { + + private DiskBalancerManager diskBalancerManager; + private NodeManager nodeManager; + private OzoneConfiguration conf; + private String storageDir; + + @BeforeEach + public void setup() throws Exception { + conf = new OzoneConfiguration(); + storageDir = GenericTestUtils.getTempPath( + TestDiskBalancerManager.class.getSimpleName() + UUID.randomUUID()); + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir); + nodeManager = new MockNodeManager(true, 3); + diskBalancerManager = new DiskBalancerManager(conf, new EventQueue(), + SCMContext.emptyContext(), nodeManager); + } + + @Test + public void testDatanodeDiskBalancerReport() throws IOException { + List reportProtoList = + diskBalancerManager.getDiskBalancerReport(2, + ClientVersion.CURRENT_VERSION); + + Assertions.assertEquals(2, reportProtoList.size()); + Assertions.assertTrue( + reportProtoList.get(0).getCurrentVolumeDensitySum() + >= reportProtoList.get(1).getCurrentVolumeDensitySum()); + } + + @Test + public void testDatanodeDiskBalancerStatus() throws IOException { + diskBalancerManager.addRunningDatanode(nodeManager.getAllNodes().get(0)); + + // Simulate users asking all status of 3 datanodes + List dns = nodeManager.getAllNodes().stream().map( + DatanodeDetails::getIpAddress).collect( + Collectors.toList()); + + List statusProtoList = + diskBalancerManager.getDiskBalancerStatus(Optional.of(dns), + ClientVersion.CURRENT_VERSION); + + Assertions.assertEquals(3, statusProtoList.size()); + + // Simulate users asking status of 1 datanodes + dns = nodeManager.getAllNodes().stream().map( + DatanodeDetails::getIpAddress).limit(1).collect( + Collectors.toList()); + + statusProtoList = + diskBalancerManager.getDiskBalancerStatus(Optional.of(dns), + ClientVersion.CURRENT_VERSION); + + Assertions.assertEquals(1, statusProtoList.size()); + } +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java index 6851252a34b9..acc569392b31 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.DatanodeAdminError; import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.node.NodeUtils.HostDefinition; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.server.events.EventQueue; @@ -67,26 +68,24 @@ public void setup() throws Exception { @Test public void testHostStringsParseCorrectly() throws InvalidHostStringException { - NodeDecommissionManager.HostDefinition def = - new NodeDecommissionManager.HostDefinition("foobar"); + HostDefinition def = new HostDefinition("foobar"); assertEquals("foobar", def.getHostname()); assertEquals(-1, def.getPort()); - def = new NodeDecommissionManager.HostDefinition(" foobar "); + def = new HostDefinition(" foobar "); assertEquals("foobar", def.getHostname()); assertEquals(-1, def.getPort()); - def = new NodeDecommissionManager.HostDefinition("foobar:1234"); + def = new HostDefinition("foobar:1234"); assertEquals("foobar", def.getHostname()); assertEquals(1234, def.getPort()); - def = new NodeDecommissionManager.HostDefinition( - "foobar.mycompany.com:1234"); + def = new HostDefinition("foobar.mycompany.com:1234"); assertEquals("foobar.mycompany.com", def.getHostname()); assertEquals(1234, def.getPort()); try { - new NodeDecommissionManager.HostDefinition("foobar:abcd"); + new HostDefinition("foobar:abcd"); fail("InvalidHostStringException should have been thrown"); } catch (InvalidHostStringException e) { } diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java index e753d05c4499..0a78807886fb 100644 --- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java @@ -661,4 +661,40 @@ public StatusAndMessages queryUpgradeFinalizationProgress( return storageContainerLocationClient.queryUpgradeFinalizationProgress( upgradeClientID, force, readonly); } + + @Override + public List getDiskBalancerReport( + int count) throws IOException { + return storageContainerLocationClient.getDiskBalancerReport(count, + ClientVersion.CURRENT_VERSION); + } + + @Override + public void startDiskBalancer(Optional threshold, + Optional bandwidth, Optional> hosts) + throws IOException { + storageContainerLocationClient.startDiskBalancer(threshold, bandwidth, + hosts); + } + + @Override + public void stopDiskBalancer(Optional> hosts) + throws IOException { + storageContainerLocationClient.stopDiskBalancer(hosts); + } + + @Override + public List getDiskBalancerStatus( + Optional> hosts) throws IOException { + return storageContainerLocationClient.getDiskBalancerStatus(hosts, + ClientVersion.CURRENT_VERSION); + } + + @Override + public void updateDiskBalancerConfiguration(Optional threshold, + Optional bandwidth, Optional> hosts) + throws IOException { + storageContainerLocationClient.updateDiskBalancerConfiguration(threshold, + bandwidth, hosts); + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java new file mode 100644 index 000000000000..267209cf0343 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.scm.node; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.HddsTestUtils; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient; +import org.apache.hadoop.hdds.scm.client.ScmClient; +import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity; +import org.apache.hadoop.hdds.scm.node.DatanodeInfo; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * This class tests disk balancer operations. + */ +public class TestDiskBalancer { + + /** + * Set a timeout for each test. + */ + @Rule + public Timeout timeout = Timeout.seconds(300); + + private static ScmClient storageClient; + private static MiniOzoneCluster cluster; + private static OzoneConfiguration ozoneConf; + + @BeforeClass + public static void setup() throws Exception { + ozoneConf = new OzoneConfiguration(); + ozoneConf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, + SCMContainerPlacementCapacity.class, PlacementPolicy.class); + cluster = MiniOzoneCluster.newBuilder(ozoneConf).setNumDatanodes(3).build(); + storageClient = new ContainerOperationClient(ozoneConf); + cluster.waitForClusterToBeReady(); + + for (DatanodeDetails dn: cluster.getStorageContainerManager() + .getScmNodeManager().getAllNodes()) { + ((DatanodeInfo) dn).updateStorageReports( + HddsTestUtils.getRandomNodeReport(3, 1).getStorageReportList()); + } + } + + @AfterClass + public static void cleanup() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testDatanodeDiskBalancerReport() throws IOException { + List reportProtoList = + storageClient.getDiskBalancerReport(2); + + assertEquals(2, reportProtoList.size()); + Assert.assertTrue( + reportProtoList.get(0).getCurrentVolumeDensitySum() + >= reportProtoList.get(1).getCurrentVolumeDensitySum()); + } + + @Test + public void testDatanodeDiskBalancerStatus() throws IOException { + // TODO: Test status command with datanodes in balancing + } +}