From ccbd7f7d9a9788b5828575c5a2b7763a35a7d5c9 Mon Sep 17 00:00:00 2001 From: Gargi Jaiswal Date: Tue, 19 Aug 2025 11:49:41 +0530 Subject: [PATCH 1/2] Removed dead codes, changed optional parameters and other refactoring --- .../hadoop/hdds/scm/client/ScmClient.java | 26 +++---- .../StorageContainerLocationProtocol.java | 26 +++---- .../storage/DiskBalancerConfiguration.java | 24 ++++--- .../common/interfaces/Container.java | 7 +- .../diskbalancer/DiskBalancerInfo.java | 5 +- .../diskbalancer/DiskBalancerService.java | 6 +- .../container/keyvalue/KeyValueContainer.java | 35 +--------- .../container/keyvalue/KeyValueHandler.java | 2 +- ...ocationProtocolClientSideTranslatorPB.java | 68 +++++++++++++------ .../hdds/scm/node/DiskBalancerManager.java | 55 ++++++++------- ...ocationProtocolServerSideTranslatorPB.java | 26 +++---- .../scm/server/SCMClientProtocolServer.java | 16 ++--- .../scm/node/TestDiskBalancerManager.java | 7 +- .../scm/cli/ContainerOperationClient.java | 16 ++--- .../datanode/DiskBalancerCommonOptions.java | 6 +- .../datanode/DiskBalancerStartSubcommand.java | 9 ++- .../DiskBalancerStatusSubcommand.java | 5 +- .../DiskBalancerUpdateSubcommand.java | 9 ++- .../ozone/scm/node/TestDiskBalancer.java | 37 +++++----- ...ancerDuringDecommissionAndMaintenance.java | 55 +++++++-------- 20 files changed, 214 insertions(+), 226 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java index 29aef0205225..19c1fc30ae41 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java @@ -496,24 +496,24 @@ List getDiskBalancerReport( * @throws IOException */ List getDiskBalancerStatus( - Optional> hosts, - Optional runningStatus) + List hosts, + HddsProtos.DiskBalancerRunningStatus runningStatus) throws IOException; /** * Start DiskBalancer. */ List startDiskBalancer( - Optional threshold, - Optional bandwidthInMB, - Optional parallelThread, - Optional stopAfterDiskEven, - Optional> hosts) throws IOException; + Double threshold, + Long bandwidthInMB, + Integer parallelThread, + Boolean stopAfterDiskEven, + List hosts) throws IOException; /** * Stop DiskBalancer. */ - List stopDiskBalancer(Optional> hosts) + List stopDiskBalancer(List hosts) throws IOException; @@ -521,9 +521,9 @@ List stopDiskBalancer(Optional> hosts) * Update DiskBalancer Configuration. */ List updateDiskBalancerConfiguration( - Optional threshold, - Optional bandwidth, - Optional parallelThread, - Optional stopAfterDiskEven, - Optional> hosts) throws IOException; + Double threshold, + Long bandwidth, + Integer parallelThread, + Boolean stopAfterDiskEven, + List hosts) throws IOException; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java index 9996d90cb87a..d9fb0ec583d3 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java @@ -499,35 +499,35 @@ List getDiskBalancerReport( * Get DiskBalancer status. */ List getDiskBalancerStatus( - Optional> hosts, - Optional runningStatus, + List hosts, + HddsProtos.DiskBalancerRunningStatus runningStatus, int clientVersion) throws IOException; /** * Start DiskBalancer. */ List startDiskBalancer( - Optional threshold, - Optional bandwidthInMB, - Optional parallelThread, - Optional stopAfterDiskEven, - Optional> hosts) throws IOException; + Double threshold, + Long bandwidthInMB, + Integer parallelThread, + Boolean stopAfterDiskEven, + List hosts) throws IOException; /** * Stop DiskBalancer. */ - List stopDiskBalancer(Optional> hosts) + List stopDiskBalancer(List hosts) throws IOException; /** * Update DiskBalancer Configuration. */ List updateDiskBalancerConfiguration( - Optional threshold, - Optional bandwidthInMB, - Optional parallelThread, - Optional stopAfterDiskEven, - Optional> hosts) throws IOException; + Double threshold, + Long bandwidthInMB, + Integer parallelThread, + Boolean stopAfterDiskEven, + List hosts) throws IOException; /** * Trigger a reconcile command to datanodes for the current container ID. diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java index 6984bb94a95d..3580b54c8ef9 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java @@ -21,7 +21,6 @@ import jakarta.annotation.Nonnull; import java.time.Duration; -import java.util.Optional; import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; import org.apache.hadoop.hdds.conf.ConfigTag; @@ -117,13 +116,22 @@ public final class DiskBalancerConfiguration { description = "If true, the DiskBalancer will automatically stop once disks are balanced.") private boolean stopAfterDiskEven = true; - public DiskBalancerConfiguration(Optional threshold, - Optional bandwidthInMB, - Optional parallelThread, Optional stopAfterDiskEven) { - threshold.ifPresent(aDouble -> this.threshold = aDouble); - bandwidthInMB.ifPresent(aLong -> this.diskBandwidthInMB = aLong); - parallelThread.ifPresent(integer -> this.parallelThread = integer); - stopAfterDiskEven.ifPresent(bool -> this.stopAfterDiskEven = bool); + public DiskBalancerConfiguration(Double threshold, + Long bandwidthInMB, + Integer parallelThread, + Boolean stopAfterDiskEven) { + if (threshold != null) { + this.threshold = threshold; + } + if (bandwidthInMB != null) { + this.diskBandwidthInMB = bandwidthInMB; + } + if (parallelThread != null) { + this.parallelThread = parallelThread; + } + if (stopAfterDiskEven != null) { + this.stopAfterDiskEven = stopAfterDiskEven; + } } public DiskBalancerConfiguration() { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java index 9c110bded021..d7f71e6e3b74 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java @@ -144,11 +144,6 @@ void updateDataScanTimestamp(Instant timestamp) void importContainerData(InputStream stream, ContainerPacker packer) throws IOException; - /** - * Import the container from a container path. - */ - void importContainerData(Path containerPath) throws IOException; - /** * Export all the data of the container to one output archive with the help * of the packer. @@ -204,7 +199,7 @@ DataScanResult scanData(DataTransferThrottler throttler, Canceler canceler) /** * Copy all the data of the container to the destination path. */ - void copyContainerData(Path destPath) throws IOException; + void copyContainerDirectory(Path destPath) throws IOException; /** Acquire read lock. */ void readLock(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java index eaad116dcf35..579d3db23f2c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java @@ -18,7 +18,6 @@ package org.apache.hadoop.ozone.container.diskbalancer; import java.util.Objects; -import java.util.Optional; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration; @@ -101,8 +100,8 @@ public void updateFromConf(DiskBalancerConfiguration diskBalancerConf) { } public StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto toDiskBalancerReportProto() { - DiskBalancerConfiguration conf = new DiskBalancerConfiguration(Optional.of(threshold), - Optional.of(bandwidthInMB), Optional.of(parallelThread), Optional.of(stopAfterDiskEven)); + DiskBalancerConfiguration conf = new DiskBalancerConfiguration(threshold, + bandwidthInMB, parallelThread, stopAfterDiskEven); HddsProtos.DiskBalancerConfigurationProto confProto = conf.toProtobufBuilder().build(); StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto.Builder builder = diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java index b01217b74bce..b91405a42f20 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java @@ -20,7 +20,6 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.base.Strings; import java.io.File; import java.io.IOException; @@ -29,6 +28,7 @@ import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -159,7 +159,7 @@ public DiskBalancerService(OzoneContainer ozoneContainer, this.conf = conf; String diskBalancerInfoPath = getDiskBalancerInfoPath(); - Preconditions.checkNotNull(diskBalancerInfoPath); + Objects.requireNonNull(diskBalancerInfoPath); diskBalancerInfoFile = new File(diskBalancerInfoPath); inProgressContainers = ConcurrentHashMap.newKeySet(); @@ -176,7 +176,7 @@ public DiskBalancerService(OzoneContainer ozoneContainer, .getContainerChoosingPolicyClass().newInstance(); } catch (Exception e) { LOG.error("Got exception when initializing DiskBalancerService", e); - throw new RuntimeException(e); + throw new IOException(e); } metrics = DiskBalancerServiceMetrics.create(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index b3ba31da8e83..b8214882f12e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -23,7 +23,6 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.QUASI_CLOSED; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_DESCRIPTOR_MISSING; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_FILES_CREATE_ERROR; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_OPEN; @@ -660,38 +659,6 @@ public void importContainerData(KeyValueContainerData originalContainerData) update(originalContainerData.getMetadata(), true); } - @Override - public void importContainerData(Path containerPath) throws IOException { - writeLock(); - try { - if (!getContainerFile().exists()) { - String errorMessage = String.format( - "Can't load container (cid=%d) data from a specific location" - + " as the container descriptor (%s) is missing", - getContainerData().getContainerID(), - getContainerFile().getAbsolutePath()); - throw new StorageContainerException(errorMessage, - CONTAINER_DESCRIPTOR_MISSING); - } - KeyValueContainerData originalContainerData = - (KeyValueContainerData) ContainerDataYaml - .readContainerFile(getContainerFile()); - - importContainerData(originalContainerData); - } catch (Exception ex) { - if (ex instanceof StorageContainerException && - ((StorageContainerException) ex).getResult() == - CONTAINER_DESCRIPTOR_MISSING) { - throw ex; - } - //delete all the temporary data in case of any exception. - cleanupFailedImport(); - throw ex; - } finally { - writeUnlock(); - } - } - private void cleanupFailedImport() { try { if (containerData.hasSchema(OzoneConsts.SCHEMA_V3)) { @@ -935,7 +902,7 @@ public DataScanResult scanData(DataTransferThrottler throttler, Canceler cancele } @Override - public void copyContainerData(Path destination) throws IOException { + public void copyContainerDirectory(Path destination) throws IOException { readLock(); try { // Closed/ Quasi closed containers are considered for replication by diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 9a2f5e485036..775c4b426f68 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -1590,7 +1590,7 @@ public void closeContainer(Container container) public void copyContainer(final Container container, Path destinationPath) throws IOException { final KeyValueContainer kvc = (KeyValueContainer) container; - kvc.copyContainerData(destinationPath); + kvc.copyContainerDirectory(destinationPath); } private KeyValueContainer createNewContainer( diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index 4c123c233e02..a58fff467eac 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -1213,14 +1213,18 @@ public List getDiskBalancerReport( @Override public List getDiskBalancerStatus( - Optional> hosts, - Optional status, + List hosts, + HddsProtos.DiskBalancerRunningStatus status, int clientVersion) throws IOException { DatanodeDiskBalancerInfoRequestProto.Builder requestBuilder = DatanodeDiskBalancerInfoRequestProto.newBuilder() .setInfoType(DatanodeDiskBalancerInfoType.status); - hosts.ifPresent(requestBuilder::addAllHosts); - status.ifPresent(requestBuilder::setStatus); + if (hosts != null && !hosts.isEmpty()) { + requestBuilder.addAllHosts(hosts); + } + if (status != null) { + requestBuilder.setStatus(status); + } DatanodeDiskBalancerInfoRequestProto request = requestBuilder.build(); DatanodeDiskBalancerInfoResponseProto response = @@ -1232,22 +1236,32 @@ public List getDiskBalancerStatus( } @Override - public List startDiskBalancer(Optional threshold, - Optional bandwidthInMB, Optional parallelThread, - Optional stopAfterDiskEven, Optional> hosts) + public List startDiskBalancer(Double threshold, + Long bandwidthInMB, Integer parallelThread, + Boolean stopAfterDiskEven, List hosts) throws IOException { HddsProtos.DiskBalancerConfigurationProto.Builder confBuilder = HddsProtos.DiskBalancerConfigurationProto.newBuilder(); - threshold.ifPresent(confBuilder::setThreshold); - bandwidthInMB.ifPresent(confBuilder::setDiskBandwidthInMB); - parallelThread.ifPresent(confBuilder::setParallelThread); - stopAfterDiskEven.ifPresent(confBuilder::setStopAfterDiskEven); + if (threshold != null) { + confBuilder.setThreshold(threshold); + } + if (bandwidthInMB != null) { + confBuilder.setDiskBandwidthInMB(bandwidthInMB); + } + if (parallelThread != null) { + confBuilder.setParallelThread(parallelThread); + } + if (stopAfterDiskEven != null) { + confBuilder.setStopAfterDiskEven(stopAfterDiskEven); + } DatanodeDiskBalancerOpRequestProto.Builder requestBuilder = DatanodeDiskBalancerOpRequestProto.newBuilder() .setOpType(HddsProtos.DiskBalancerOpType.START) .setConf(confBuilder); - hosts.ifPresent(requestBuilder::addAllHosts); + if (hosts != null && !hosts.isEmpty()) { + requestBuilder.addAllHosts(hosts); + } DatanodeDiskBalancerOpResponseProto response = submitRequest(Type.DatanodeDiskBalancerOp, @@ -1263,12 +1277,14 @@ public List startDiskBalancer(Optional threshold, } @Override - public List stopDiskBalancer(Optional> hosts) + public List stopDiskBalancer(List hosts) throws IOException { DatanodeDiskBalancerOpRequestProto.Builder requestBuilder = DatanodeDiskBalancerOpRequestProto.newBuilder() .setOpType(HddsProtos.DiskBalancerOpType.STOP); - hosts.ifPresent(requestBuilder::addAllHosts); + if (hosts != null && !hosts.isEmpty()) { + requestBuilder.addAllHosts(hosts); + } DatanodeDiskBalancerOpResponseProto response = submitRequest(Type.DatanodeDiskBalancerOp, @@ -1285,21 +1301,31 @@ public List stopDiskBalancer(Optional> hosts) @Override public List updateDiskBalancerConfiguration( - Optional threshold, Optional bandwidthInMB, - Optional parallelThread, Optional stopAfterDiskEven, Optional> hosts) + Double threshold, Long bandwidthInMB, + Integer parallelThread, Boolean stopAfterDiskEven, List hosts) throws IOException { HddsProtos.DiskBalancerConfigurationProto.Builder confBuilder = HddsProtos.DiskBalancerConfigurationProto.newBuilder(); - threshold.ifPresent(confBuilder::setThreshold); - bandwidthInMB.ifPresent(confBuilder::setDiskBandwidthInMB); - parallelThread.ifPresent(confBuilder::setParallelThread); - stopAfterDiskEven.ifPresent(confBuilder::setStopAfterDiskEven); + if (threshold != null) { + confBuilder.setThreshold(threshold); + } + if (bandwidthInMB != null) { + confBuilder.setDiskBandwidthInMB(bandwidthInMB); + } + if (parallelThread != null) { + confBuilder.setParallelThread(parallelThread); + } + if (stopAfterDiskEven != null) { + confBuilder.setStopAfterDiskEven(stopAfterDiskEven); + } DatanodeDiskBalancerOpRequestProto.Builder requestBuilder = DatanodeDiskBalancerOpRequestProto.newBuilder() .setOpType(HddsProtos.DiskBalancerOpType.UPDATE) .setConf(confBuilder); - hosts.ifPresent(requestBuilder::addAllHosts); + if (hosts != null && !hosts.isEmpty()) { + requestBuilder.addAllHosts(hosts); + } DatanodeDiskBalancerOpResponseProto response = submitRequest(Type.DatanodeDiskBalancerOp, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java index bd4cb7919655..5e66d081a253 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java @@ -27,7 +27,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -103,12 +102,12 @@ public List getDiskBalancerReport( * If hosts is null, return status of all datanodes in balancing. */ public List getDiskBalancerStatus( - Optional> hosts, - Optional status, + List hosts, + HddsProtos.DiskBalancerRunningStatus status, int clientVersion) throws IOException { List filterDns = null; - if (hosts.isPresent() && !hosts.get().isEmpty()) { - filterDns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(), + if (hosts != null && !hosts.isEmpty()) { + filterDns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts, useHostnames).stream() .filter(dn -> { try { @@ -127,7 +126,7 @@ public List getDiskBalancerStatus( } // Filter Running Status by default - HddsProtos.DiskBalancerRunningStatus filterStatus = status.orElse(null); + HddsProtos.DiskBalancerRunningStatus filterStatus = status; if (filterDns != null) { return filterDns.stream() @@ -154,12 +153,12 @@ public List getDiskBalancerStatus( * @throws IOException */ public List startDiskBalancer( - Optional threshold, Optional bandwidthInMB, - Optional parallelThread, Optional stopAfterDiskEven, - Optional> hosts) throws IOException { + Double threshold, Long bandwidthInMB, + Integer parallelThread, Boolean stopAfterDiskEven, + List hosts) throws IOException { List dns; - if (hosts.isPresent()) { - dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(), + if (hosts != null && !hosts.isEmpty()) { + dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts, useHostnames); } else { dns = nodeManager.getNodes(NodeStatus.inServiceHealthy()); @@ -193,11 +192,11 @@ public List startDiskBalancer( * If hosts is not specified, send commands to all datanodes. * @param hosts Datanodes that command will apply on * */ - public List stopDiskBalancer(Optional> hosts) + public List stopDiskBalancer(List hosts) throws IOException { List dns; - if (hosts.isPresent()) { - dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(), + if (hosts != null && !hosts.isEmpty()) { + dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts, useHostnames); } else { dns = nodeManager.getNodes(NodeStatus.inServiceHealthy()); @@ -227,12 +226,12 @@ public List stopDiskBalancer(Optional> hosts) * @throws IOException */ public List updateDiskBalancerConfiguration( - Optional threshold, Optional bandwidthInMB, - Optional parallelThread, Optional stopAfterDiskEven, Optional> hosts) + Double threshold, Long bandwidthInMB, + Integer parallelThread, Boolean stopAfterDiskEven, List hosts) throws IOException { List dns; - if (hosts.isPresent()) { - dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(), + if (hosts != null && !hosts.isEmpty()) { + dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts, useHostnames); } else { dns = nodeManager.getNodes(NodeStatus.inServiceHealthy()); @@ -360,15 +359,23 @@ public void markStatusUnknown(DatanodeDetails dn) { } private DiskBalancerConfiguration attachDiskBalancerConf( - DatanodeDetails dn, Optional threshold, - Optional bandwidthInMB, Optional parallelThread, Optional stopAfterDiskEven) { + DatanodeDetails dn, Double threshold, + Long bandwidthInMB, Integer parallelThread, Boolean stopAfterDiskEven) { DiskBalancerConfiguration baseConf = statusMap.containsKey(dn) ? statusMap.get(dn).getDiskBalancerConfiguration() : new DiskBalancerConfiguration(); - threshold.ifPresent(baseConf::setThreshold); - bandwidthInMB.ifPresent(baseConf::setDiskBandwidthInMB); - parallelThread.ifPresent(baseConf::setParallelThread); - stopAfterDiskEven.ifPresent(baseConf::setStopAfterDiskEven); + if (threshold != null) { + baseConf.setThreshold(threshold); + } + if (bandwidthInMB != null) { + baseConf.setDiskBandwidthInMB(bandwidthInMB); + } + if (parallelThread != null) { + baseConf.setParallelThread(parallelThread); + } + if (stopAfterDiskEven != null) { + baseConf.setStopAfterDiskEven(stopAfterDiskEven); + } return baseConf; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java index cfa64a3253fc..c7f036367757 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -1384,10 +1384,10 @@ public DatanodeDiskBalancerInfoResponseProto getDatanodeDiskBalancerInfo( break; case status: infoProtoList = impl.getDiskBalancerStatus( - Optional.of(request.getHostsList()), + request.getHostsList().isEmpty() ? null : request.getHostsList(), // If an optional proto enum field is not set, it will return the first // enum value. So, we need to check if the field is set. - request.hasStatus() ? Optional.of(request.getStatus()) : Optional.empty(), + request.hasStatus() ? request.getStatus() : null, clientVersion); break; default: @@ -1416,24 +1416,24 @@ public DatanodeDiskBalancerOpResponseProto getDatanodeDiskBalancerOp( switch (request.getOpType()) { case START: errors = impl.startDiskBalancer( - conf.hasThreshold() ? Optional.of(conf.getThreshold()) : Optional.empty(), - conf.hasDiskBandwidthInMB() ? Optional.of(conf.getDiskBandwidthInMB()) : Optional.empty(), - conf.hasParallelThread() ? Optional.of(conf.getParallelThread()) : Optional.empty(), - conf.hasStopAfterDiskEven() ? Optional.of(conf.getStopAfterDiskEven()) : Optional.empty(), - request.getHostsList().isEmpty() ? Optional.empty() : Optional.of(request.getHostsList())); + conf.hasThreshold() ? conf.getThreshold() : null, + conf.hasDiskBandwidthInMB() ? conf.getDiskBandwidthInMB() : null, + conf.hasParallelThread() ? conf.getParallelThread() : null, + conf.hasStopAfterDiskEven() ? conf.getStopAfterDiskEven() : null, + request.getHostsList().isEmpty() ? null : request.getHostsList()); break; case UPDATE: errors = impl.updateDiskBalancerConfiguration( - conf.hasThreshold() ? Optional.of(conf.getThreshold()) : Optional.empty(), - conf.hasDiskBandwidthInMB() ? Optional.of(conf.getDiskBandwidthInMB()) : Optional.empty(), - conf.hasParallelThread() ? Optional.of(conf.getParallelThread()) : Optional.empty(), - conf.hasStopAfterDiskEven() ? Optional.of(conf.getStopAfterDiskEven()) : Optional.empty(), - request.getHostsList().isEmpty() ? Optional.empty() : Optional.of(request.getHostsList())); + conf.hasThreshold() ? conf.getThreshold() : null, + conf.hasDiskBandwidthInMB() ? conf.getDiskBandwidthInMB() : null, + conf.hasParallelThread() ? conf.getParallelThread() : null, + conf.hasStopAfterDiskEven() ? conf.getStopAfterDiskEven() : null, + request.getHostsList().isEmpty() ? null : request.getHostsList()); break; case STOP: errors = impl.stopDiskBalancer( - request.getHostsList().isEmpty() ? Optional.empty() : Optional.of(request.getHostsList())); + request.getHostsList().isEmpty() ? null : request.getHostsList()); break; default: errors = new ArrayList<>(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index 24eaa586b31e..f578a295698a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -1530,8 +1530,8 @@ public List getDiskBalancerReport( @Override public List getDiskBalancerStatus( - Optional> hosts, - Optional status, + List hosts, + HddsProtos.DiskBalancerRunningStatus status, int clientVersion) throws IOException { checkDiskBalancerEnabled(); return scm.getDiskBalancerManager().getDiskBalancerStatus(hosts, status, @@ -1539,9 +1539,9 @@ public List getDiskBalancerStatus( } @Override - public List startDiskBalancer(Optional threshold, - Optional bandwidthInMB, Optional parallelThread, - Optional stopAfterDiskEven, Optional> hosts) + public List startDiskBalancer(Double threshold, + Long bandwidthInMB, Integer parallelThread, + Boolean stopAfterDiskEven, List hosts) throws IOException { checkDiskBalancerEnabled(); @@ -1557,7 +1557,7 @@ public List startDiskBalancer(Optional threshold, } @Override - public List stopDiskBalancer(Optional> hosts) + public List stopDiskBalancer(List hosts) throws IOException { checkDiskBalancerEnabled(); @@ -1572,8 +1572,8 @@ public List stopDiskBalancer(Optional> hosts) @Override public List updateDiskBalancerConfiguration( - Optional threshold, Optional bandwidthInMB, - Optional parallelThread, Optional stopAfterDiskEven, Optional> hosts) + Double threshold, Long bandwidthInMB, + Integer parallelThread, Boolean stopAfterDiskEven, List hosts) throws IOException { checkDiskBalancerEnabled(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java index e3aaf0d87b0e..b696ab6bd632 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java @@ -20,7 +20,6 @@ import java.io.File; import java.io.IOException; import java.util.List; -import java.util.Optional; import java.util.Random; import java.util.stream.Collectors; import org.apache.hadoop.hdds.HddsConfigKeys; @@ -88,8 +87,7 @@ public void testDatanodeDiskBalancerStatus() throws IOException { Collectors.toList()); List statusProtoList = - diskBalancerManager.getDiskBalancerStatus(Optional.of(dns), - Optional.empty(), + diskBalancerManager.getDiskBalancerStatus(dns, null, ClientVersion.CURRENT_VERSION); Assertions.assertEquals(3, statusProtoList.size()); @@ -100,8 +98,7 @@ public void testDatanodeDiskBalancerStatus() throws IOException { Collectors.toList()); statusProtoList = - diskBalancerManager.getDiskBalancerStatus(Optional.of(dns), - Optional.empty(), + diskBalancerManager.getDiskBalancerStatus(dns, null, ClientVersion.CURRENT_VERSION); Assertions.assertEquals(1, statusProtoList.size()); diff --git a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java index 55044e3dfec8..d5dbff4dad20 100644 --- a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java +++ b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java @@ -611,23 +611,23 @@ public List getDiskBalancerReport( } @Override - public List startDiskBalancer(Optional threshold, - Optional bandwidthInMB, Optional parallelThread, Optional stopAfterDiskEven, - Optional> hosts) throws IOException { + public List startDiskBalancer(Double threshold, + Long bandwidthInMB, Integer parallelThread, Boolean stopAfterDiskEven, + List hosts) throws IOException { return storageContainerLocationClient.startDiskBalancer(threshold, bandwidthInMB, parallelThread, stopAfterDiskEven, hosts); } @Override - public List stopDiskBalancer(Optional> hosts) + public List stopDiskBalancer(List hosts) throws IOException { return storageContainerLocationClient.stopDiskBalancer(hosts); } @Override public List getDiskBalancerStatus( - Optional> hosts, - Optional runningStatus) + List hosts, + HddsProtos.DiskBalancerRunningStatus runningStatus) throws IOException { return storageContainerLocationClient.getDiskBalancerStatus(hosts, runningStatus, ClientVersion.CURRENT_VERSION); @@ -635,8 +635,8 @@ public List getDiskBalancerStatus( @Override public List updateDiskBalancerConfiguration( - Optional threshold, Optional bandwidth, - Optional parallelThread, Optional stopAfterDiskEven, Optional> hosts) + Double threshold, Long bandwidth, + Integer parallelThread, Boolean stopAfterDiskEven, List hosts) throws IOException { return storageContainerLocationClient.updateDiskBalancerConfiguration( threshold, bandwidth, parallelThread, stopAfterDiskEven, hosts); diff --git a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerCommonOptions.java b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerCommonOptions.java index d9cbf22f4327..2abac2249674 100644 --- a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerCommonOptions.java +++ b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerCommonOptions.java @@ -19,7 +19,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.Optional; import picocli.CommandLine; /** @@ -60,9 +59,8 @@ public String getHostString() { return isAllHosts() ? "All datanodes" : String.join("\n", getDatanodes()); } - public Optional> getSpecifiedDatanodes() { - return getDatanodes().isEmpty() ? - Optional.empty() : Optional.of(getDatanodes()); + public List getSpecifiedDatanodes() { + return getDatanodes().isEmpty() ? null : getDatanodes(); } public boolean isAllHosts() { diff --git a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java index c37f56019862..49d1fe010507 100644 --- a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java +++ b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java @@ -20,7 +20,6 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.List; -import java.util.Optional; import org.apache.hadoop.hdds.cli.HddsVersionProvider; import org.apache.hadoop.hdds.scm.DatanodeAdminError; import org.apache.hadoop.hdds.scm.cli.ScmSubcommand; @@ -43,19 +42,19 @@ public class DiskBalancerStartSubcommand extends ScmSubcommand { description = "Percentage deviation from average utilization of " + "the disks after which a datanode will be rebalanced (for " + "example, '10' for 10%%).") - private Optional threshold; + private Double threshold; @Option(names = {"-b", "--bandwidthInMB"}, description = "Maximum bandwidth for DiskBalancer per second.") - private Optional bandwidthInMB; + private Long bandwidthInMB; @Option(names = {"-p", "--parallelThread"}, description = "Max parallelThread for DiskBalancer.") - private Optional parallelThread; + private Integer parallelThread; @Option(names = {"-s", "--stop-after-disk-even"}, description = "Stop DiskBalancer automatically after disk utilization is even.") - private Optional stopAfterDiskEven; + private Boolean stopAfterDiskEven; @CommandLine.Mixin private DiskBalancerCommonOptions commonOptions = diff --git a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStatusSubcommand.java b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStatusSubcommand.java index d8d0113eb13f..34a7abbadf60 100644 --- a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStatusSubcommand.java +++ b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStatusSubcommand.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Optional; import org.apache.hadoop.hdds.cli.HddsVersionProvider; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.cli.ScmSubcommand; @@ -51,8 +50,8 @@ public class DiskBalancerStatusSubcommand extends ScmSubcommand { public void execute(ScmClient scmClient) throws IOException { List resultProto = scmClient.getDiskBalancerStatus( - hosts.isEmpty() ? Optional.empty() : Optional.of(hosts), - state == null ? Optional.empty() : Optional.of(state)); + hosts.isEmpty() ? null : hosts, + state); System.out.println(generateStatus(resultProto)); } diff --git a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java index 6f1b7ea0d1de..b6d2dcc45108 100644 --- a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java +++ b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java @@ -20,7 +20,6 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.List; -import java.util.Optional; import org.apache.hadoop.hdds.cli.HddsVersionProvider; import org.apache.hadoop.hdds.scm.DatanodeAdminError; import org.apache.hadoop.hdds.scm.cli.ScmSubcommand; @@ -43,19 +42,19 @@ public class DiskBalancerUpdateSubcommand extends ScmSubcommand { description = "Percentage deviation from average utilization of " + "the disks after which a datanode will be rebalanced (for " + "example, '10' for 10%%).") - private Optional threshold; + private Double threshold; @Option(names = {"-b", "--bandwidthInMB"}, description = "Maximum bandwidth for DiskBalancer per second.") - private Optional bandwidthInMB; + private Long bandwidthInMB; @Option(names = {"-p", "--parallelThread"}, description = "Max parallelThread for DiskBalancer.") - private Optional parallelThread; + private Integer parallelThread; @Option(names = {"-s", "--stop-after-disk-even"}, description = "Stop DiskBalancer automatically after disk utilization is even.") - private Optional stopAfterDiskEven; + private Boolean stopAfterDiskEven; @CommandLine.Mixin private DiskBalancerCommonOptions commonOptions = diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java index cde2a7ce2ac1..5d39d260ead6 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java @@ -28,7 +28,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.hadoop.hdds.HddsConfigKeys; @@ -113,11 +112,11 @@ public void testDiskBalancerStopAfterEven() throws IOException, // Start DiskBalancer on all datanodes diskBalancerManager.startDiskBalancer( - Optional.of(10.0), // threshold - Optional.of(10L), // bandwidth in MB - Optional.of(5), // parallel threads - Optional.of(true), // stopAfterDiskEven - Optional.empty()); // apply to all datanodes + 10.0, // threshold + 10L, // bandwidth in MB + 5, // parallel threads + true, // stopAfterDiskEven + null); // apply to all datanodes // verify logs for all DNs has started String logs = logCapturer.getOutput(); @@ -144,16 +143,16 @@ public void testDatanodeDiskBalancerStatus() throws IOException, InterruptedExce DatanodeDetails toDecommission = dns.get(0).getDatanodeDetails(); diskBalancerManager.startDiskBalancer( - Optional.of(10.0), // threshold - Optional.of(10L), // bandwidth in MB - Optional.of(5), // parallel threads - Optional.of(true), // stopAfterDiskEven - Optional.empty()); + 10.0, // threshold + 10L, // bandwidth in MB + 5, // parallel threads + true, // stopAfterDiskEven + null); //all DNs IN_SERVICE, so disk balancer status for all should be present List statusProtoList = - diskBalancerManager.getDiskBalancerStatus(Optional.empty(), - Optional.empty(), + diskBalancerManager.getDiskBalancerStatus(null, + null, ClientVersion.CURRENT_VERSION); assertEquals(3, statusProtoList.size()); @@ -165,15 +164,15 @@ public void testDatanodeDiskBalancerStatus() throws IOException, InterruptedExce waitForDnToReachOpState(nm, toDecommission, DECOMMISSIONING); //one DN is in DECOMMISSIONING state, so disk balancer status for it should not be present - statusProtoList = diskBalancerManager.getDiskBalancerStatus(Optional.empty(), - Optional.empty(), + statusProtoList = diskBalancerManager.getDiskBalancerStatus(null, + null, ClientVersion.CURRENT_VERSION); assertEquals(2, statusProtoList.size()); // Check status for the decommissioned DN should not be present statusProtoList = diskBalancerManager.getDiskBalancerStatus( - Optional.of(Collections.singletonList(getDNHostAndPort(toDecommission))), - Optional.empty(), + Collections.singletonList(getDNHostAndPort(toDecommission)), + null, ClientVersion.CURRENT_VERSION); assertEquals(0, statusProtoList.size()); @@ -183,8 +182,8 @@ public void testDatanodeDiskBalancerStatus() throws IOException, InterruptedExce // Check status for the recommissioned DN should now be present statusProtoList = diskBalancerManager.getDiskBalancerStatus( - Optional.of(Collections.singletonList(getDNHostAndPort(toDecommission))), - Optional.empty(), + Collections.singletonList(getDNHostAndPort(toDecommission)), + null, ClientVersion.CURRENT_VERSION); assertEquals(1, statusProtoList.size()); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancerDuringDecommissionAndMaintenance.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancerDuringDecommissionAndMaintenance.java index b33b119ce805..81490a9b3cef 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancerDuringDecommissionAndMaintenance.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancerDuringDecommissionAndMaintenance.java @@ -30,7 +30,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.hadoop.hdds.HddsConfigKeys; @@ -103,7 +102,7 @@ public static void cleanup() throws Exception { @AfterEach public void stopDiskBalancer() throws IOException, InterruptedException, TimeoutException { // Stop disk balancer after each test - diskBalancerManager.stopDiskBalancer(Optional.empty()); + diskBalancerManager.stopDiskBalancer(null); // Verify that all DNs have stopped DiskBalancerService for (HddsDatanodeService dn : cluster.getHddsDatanodes()) { GenericTestUtils.waitFor(() -> { @@ -124,11 +123,11 @@ public void testDiskBalancerWithDecommissionAndMaintenanceNodes() // Start disk balancer on all DNs diskBalancerManager.startDiskBalancer( - Optional.of(10.0), - Optional.of(10L), - Optional.of(5), - Optional.of(false), - Optional.empty()); + 10.0, + 10L, + 5, + false, + null); NodeManager nm = cluster.getStorageContainerManager().getScmNodeManager(); @@ -149,8 +148,8 @@ public void testDiskBalancerWithDecommissionAndMaintenanceNodes() //get diskBalancer status List statusProtoList = - diskBalancerManager.getDiskBalancerStatus(Optional.empty(), - Optional.empty(), + diskBalancerManager.getDiskBalancerStatus(null, + null, ClientVersion.CURRENT_VERSION); // Verify that decommissioning and maintenance DN is not @@ -193,8 +192,8 @@ public void testDiskBalancerWithDecommissionAndMaintenanceNodes() // Verify that recommissioned DN is included in DiskBalancer report and status reportProtoList = diskBalancerManager.getDiskBalancerReport(5, ClientVersion.CURRENT_VERSION); - statusProtoList = diskBalancerManager.getDiskBalancerStatus(Optional.empty(), - Optional.empty(), + statusProtoList = diskBalancerManager.getDiskBalancerStatus(null, + null, ClientVersion.CURRENT_VERSION); boolean isRecommissionedDnInReport = reportProtoList.stream() @@ -228,18 +227,17 @@ public void testStopDiskBalancerOnDecommissioningNode() throws Exception { // Start disk balancer on this specific DN diskBalancerManager.startDiskBalancer( - Optional.of(10.0), - Optional.of(10L), - Optional.of(1), - Optional.of(false), - Optional.of(dnAddressList)); + 10.0, + 10L, + 1, + false, + dnAddressList); // Verify diskBalancer is running GenericTestUtils.waitFor(() -> { try { HddsProtos.DatanodeDiskBalancerInfoProto status = - diskBalancerManager.getDiskBalancerStatus(Optional.of(dnAddressList), - Optional.empty(), + diskBalancerManager.getDiskBalancerStatus(dnAddressList, null, ClientVersion.CURRENT_VERSION).stream().findFirst().orElse(null); return status != null && status.getRunningStatus() == HddsProtos.DiskBalancerRunningStatus.RUNNING; } catch (IOException e) { @@ -258,7 +256,7 @@ public void testStopDiskBalancerOnDecommissioningNode() throws Exception { 100, 5000); // Attempt to stop disk balancer on the decommissioning DN - diskBalancerManager.stopDiskBalancer(Optional.of(dnAddressList)); + diskBalancerManager.stopDiskBalancer(dnAddressList); // Verify disk balancer is now explicitly stopped (operationalState becomes STOPPED) final String expectedLogForStop = @@ -272,8 +270,7 @@ public void testStopDiskBalancerOnDecommissioningNode() throws Exception { // Verify it does not automatically restart (since it was explicitly stopped) HddsProtos.DatanodeDiskBalancerInfoProto statusAfterRecommission = - diskBalancerManager.getDiskBalancerStatus(Optional.of(dnAddressList), - Optional.empty(), + diskBalancerManager.getDiskBalancerStatus(dnAddressList, null, ClientVersion.CURRENT_VERSION).stream().findFirst().orElse(null); assertEquals(HddsProtos.DiskBalancerRunningStatus.STOPPED, statusAfterRecommission.getRunningStatus()); } @@ -292,8 +289,7 @@ public void testStartDiskBalancerOnDecommissioningNode() throws Exception { GenericTestUtils.waitFor(() -> { try { HddsProtos.DatanodeDiskBalancerInfoProto status = - diskBalancerManager.getDiskBalancerStatus(Optional.of(dnAddressList), - Optional.empty(), + diskBalancerManager.getDiskBalancerStatus(dnAddressList, null, ClientVersion.CURRENT_VERSION).stream().findFirst().orElse(null); return status != null && status.getRunningStatus() == HddsProtos.DiskBalancerRunningStatus.STOPPED; } catch (IOException e) { @@ -312,11 +308,11 @@ public void testStartDiskBalancerOnDecommissioningNode() throws Exception { // Attempt to start disk balancer on the decommissioning DN diskBalancerManager.startDiskBalancer( - Optional.of(10.0), - Optional.of(10L), - Optional.of(1), - Optional.of(false), - Optional.of(dnAddressList)); + 10.0, + 10L, + 1, + false, + dnAddressList); // Verify disk balancer goes to PAUSED_BY_NODE_STATE final String expectedLogForPause = @@ -332,8 +328,7 @@ public void testStartDiskBalancerOnDecommissioningNode() throws Exception { GenericTestUtils.waitFor(() -> { try { HddsProtos.DatanodeDiskBalancerInfoProto status = - diskBalancerManager.getDiskBalancerStatus(Optional.of(dnAddressList), - Optional.empty(), + diskBalancerManager.getDiskBalancerStatus(dnAddressList, null, ClientVersion.CURRENT_VERSION).stream().findFirst().orElse(null); return status != null && status.getRunningStatus() == HddsProtos.DiskBalancerRunningStatus.RUNNING; } catch (IOException e) { From 154bdb37ef34c4c55690607457c1dda2ad89ff0d Mon Sep 17 00:00:00 2001 From: Gargi Jaiswal Date: Wed, 20 Aug 2025 10:54:04 +0530 Subject: [PATCH 2/2] fixed disk balancer to use ContainerID, getCurrentUsage() and updated unit tests. --- .../hadoop/hdds/scm/client/ScmClient.java | 27 +-- .../StorageContainerLocationProtocol.java | 27 +-- .../hadoop/hdds/utils/SlidingWindow.java | 179 ------------------ .../hadoop/hdds/utils/TestSlidingWindow.java | 141 -------------- .../diskbalancer/DiskBalancerService.java | 14 +- .../policy/ContainerChoosingPolicy.java | 3 +- .../DefaultContainerChoosingPolicy.java | 6 +- .../policy/DefaultVolumeChoosingPolicy.java | 29 +-- .../diskbalancer/TestDiskBalancerTask.java | 57 ++++-- ...ocationProtocolClientSideTranslatorPB.java | 15 +- .../scm/server/SCMClientProtocolServer.java | 17 +- hadoop-ozone/cli-admin/pom.xml | 4 + .../scm/cli/ContainerOperationClient.java | 17 +- .../scm/node/TestContainerChoosingPolicy.java | 11 +- 14 files changed, 133 insertions(+), 414 deletions(-) delete mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/SlidingWindow.java delete mode 100644 hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestSlidingWindow.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java index 19c1fc30ae41..dec42494a337 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.scm.client; +import jakarta.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.List; @@ -496,24 +497,24 @@ List getDiskBalancerReport( * @throws IOException */ List getDiskBalancerStatus( - List hosts, - HddsProtos.DiskBalancerRunningStatus runningStatus) + @Nullable List hosts, + @Nullable HddsProtos.DiskBalancerRunningStatus runningStatus) throws IOException; /** * Start DiskBalancer. */ List startDiskBalancer( - Double threshold, - Long bandwidthInMB, - Integer parallelThread, - Boolean stopAfterDiskEven, - List hosts) throws IOException; + @Nullable Double threshold, + @Nullable Long bandwidthInMB, + @Nullable Integer parallelThread, + @Nullable Boolean stopAfterDiskEven, + @Nullable List hosts) throws IOException; /** * Stop DiskBalancer. */ - List stopDiskBalancer(List hosts) + List stopDiskBalancer(@Nullable List hosts) throws IOException; @@ -521,9 +522,9 @@ List stopDiskBalancer(List hosts) * Update DiskBalancer Configuration. */ List updateDiskBalancerConfiguration( - Double threshold, - Long bandwidth, - Integer parallelThread, - Boolean stopAfterDiskEven, - List hosts) throws IOException; + @Nullable Double threshold, + @Nullable Long bandwidth, + @Nullable Integer parallelThread, + @Nullable Boolean stopAfterDiskEven, + @Nullable List hosts) throws IOException; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java index d9fb0ec583d3..84eb62833409 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.scm.protocol; +import jakarta.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.Collections; @@ -499,35 +500,35 @@ List getDiskBalancerReport( * Get DiskBalancer status. */ List getDiskBalancerStatus( - List hosts, - HddsProtos.DiskBalancerRunningStatus runningStatus, + @Nullable List hosts, + @Nullable HddsProtos.DiskBalancerRunningStatus runningStatus, int clientVersion) throws IOException; /** * Start DiskBalancer. */ List startDiskBalancer( - Double threshold, - Long bandwidthInMB, - Integer parallelThread, - Boolean stopAfterDiskEven, - List hosts) throws IOException; + @Nullable Double threshold, + @Nullable Long bandwidthInMB, + @Nullable Integer parallelThread, + @Nullable Boolean stopAfterDiskEven, + @Nullable List hosts) throws IOException; /** * Stop DiskBalancer. */ - List stopDiskBalancer(List hosts) + List stopDiskBalancer(@Nullable List hosts) throws IOException; /** * Update DiskBalancer Configuration. */ List updateDiskBalancerConfiguration( - Double threshold, - Long bandwidthInMB, - Integer parallelThread, - Boolean stopAfterDiskEven, - List hosts) throws IOException; + @Nullable Double threshold, + @Nullable Long bandwidthInMB, + @Nullable Integer parallelThread, + @Nullable Boolean stopAfterDiskEven, + @Nullable List hosts) throws IOException; /** * Trigger a reconcile command to datanodes for the current container ID. diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/SlidingWindow.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/SlidingWindow.java deleted file mode 100644 index 744e842a398a..000000000000 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/SlidingWindow.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.utils; - -import com.google.common.annotations.VisibleForTesting; -import java.time.Clock; -import java.time.Duration; -import java.util.ArrayDeque; -import java.util.Deque; -import java.util.concurrent.TimeUnit; - -/** - * - * A sliding window implementation that combines time-based expiry with a - * maximum size constraint. The window tracks event timestamps and maintains two - * limits: - *
    - *
  • Time-based: Events older than the specified expiry duration are - * automatically removed - *
  • Size-based: The window maintains at most windowSize latest events, removing - * older events when this limit is exceeded - *
- * - * The window is considered full when the number of non-expired events exceeds - * the specified window size. Events are automatically pruned based on both - * their age and the maximum size constraint. - */ -public class SlidingWindow { - private final Object lock = new Object(); - private final int windowSize; - private final Deque timestamps; - private final long expiryDurationMillis; - private final Clock clock; - - /** - * Default constructor that uses a monotonic clock. - * - * @param windowSize the maximum number of events that are tracked - * @param expiryDuration the duration after which an entry in the window expires - */ - public SlidingWindow(int windowSize, Duration expiryDuration) { - this(windowSize, expiryDuration, new MonotonicClock()); - } - - /** - * Constructor with a custom clock for testing. - * - * @param windowSize the maximum number of events that are tracked - * @param expiryDuration the duration after which an entry in the window expires - * @param clock the clock to use for time measurements - */ - public SlidingWindow(int windowSize, Duration expiryDuration, Clock clock) { - if (windowSize < 0) { - throw new IllegalArgumentException("Window size must be greater than 0"); - } - if (expiryDuration.isNegative() || expiryDuration.isZero()) { - throw new IllegalArgumentException("Expiry duration must be greater than 0"); - } - this.windowSize = windowSize; - this.expiryDurationMillis = expiryDuration.toMillis(); - this.clock = clock; - // We limit the initial queue size to 100 to control the memory usage - this.timestamps = new ArrayDeque<>(Math.min(windowSize + 1, 100)); - } - - public void add() { - synchronized (lock) { - if (isExceeded()) { - timestamps.remove(); - } - - timestamps.add(getCurrentTime()); - } - } - - /** - * Checks if the sliding window has exceeded its maximum size. - * This is useful to track if we have encountered more events than the window's defined limit. - * @return true if the number of tracked timestamps in the sliding window - * exceeds the specified window size, false otherwise. - */ - public boolean isExceeded() { - synchronized (lock) { - removeExpired(); - return timestamps.size() > windowSize; - } - } - - /** - * Returns the current number of events that are tracked within the sliding window queue. - * The number of events can exceed the window size. - * This method ensures that expired events are removed before computing the count. - * - * @return the number of valid timestamps currently in the sliding window - */ - @VisibleForTesting - public int getNumEvents() { - synchronized (lock) { - removeExpired(); - return timestamps.size(); - } - } - - /** - * Returns the current number of events that are tracked within the sliding window queue. - * The number of events cannot exceed the window size. - * This method ensures that expired events are removed before computing the count. - * - * @return the number of valid timestamps currently in the sliding window - */ - public int getNumEventsInWindow() { - synchronized (lock) { - removeExpired(); - return Math.min(timestamps.size(), windowSize); - } - } - - private void removeExpired() { - synchronized (lock) { - long currentTime = getCurrentTime(); - long expirationThreshold = currentTime - expiryDurationMillis; - - while (!timestamps.isEmpty() && timestamps.peek() < expirationThreshold) { - timestamps.remove(); - } - } - } - - public int getWindowSize() { - return windowSize; - } - - private long getCurrentTime() { - return clock.millis(); - } - - /** - * A custom monotonic clock implementation. - * Implementation of Clock that uses System.nanoTime() for real usage. - * See {@see org.apache.ozone.test.TestClock} - */ - private static final class MonotonicClock extends Clock { - @Override - public long millis() { - return TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); - } - - @Override - public java.time.Instant instant() { - return java.time.Instant.ofEpochMilli(millis()); - } - - @Override - public java.time.ZoneId getZone() { - return java.time.ZoneOffset.UTC; - } - - @Override - public Clock withZone(java.time.ZoneId zone) { - // Ignore zone for monotonic clock - throw new UnsupportedOperationException("Sliding Window class does not allow changing the timezone"); - } - } -} diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestSlidingWindow.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestSlidingWindow.java deleted file mode 100644 index 369426bcfd08..000000000000 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestSlidingWindow.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.utils; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.time.Duration; -import org.apache.ozone.test.TestClock; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -/** - * Tests for {@link SlidingWindow} class. - */ -class TestSlidingWindow { - - private TestClock testClock; - - @BeforeEach - void setup() { - testClock = TestClock.newInstance(); - } - - @Test - void testConstructorValidation() { - // Test invalid window size - assertThrows(IllegalArgumentException.class, () -> new SlidingWindow(-1, Duration.ofMillis(100))); - - // Test invalid expiry duration - assertThrows(IllegalArgumentException.class, () -> new SlidingWindow(1, Duration.ofMillis(0))); - assertThrows(IllegalArgumentException.class, () -> new SlidingWindow(1, Duration.ofMillis(-1))); - } - - @Test - void testAdd() { - SlidingWindow slidingWindow = new SlidingWindow(3, Duration.ofSeconds(5), testClock); - for (int i = 0; i < slidingWindow.getWindowSize(); i++) { - slidingWindow.add(); - assertEquals(i + 1, slidingWindow.getNumEvents()); - assertFalse(slidingWindow.isExceeded()); - } - - slidingWindow.add(); - assertEquals(slidingWindow.getWindowSize() + 1, slidingWindow.getNumEvents()); - assertTrue(slidingWindow.isExceeded()); - } - - @Test - void testEventExpiration() { - SlidingWindow slidingWindow = new SlidingWindow(2, Duration.ofMillis(500), testClock); - - // Add events to reach threshold - slidingWindow.add(); - slidingWindow.add(); - slidingWindow.add(); - assertEquals(3, slidingWindow.getNumEvents()); - assertTrue(slidingWindow.isExceeded()); - - // Fast forward time to expire events - testClock.fastForward(600); - - assertEquals(0, slidingWindow.getNumEvents()); - assertFalse(slidingWindow.isExceeded()); - - // Add one more event - should not be enough to mark as full - slidingWindow.add(); - assertEquals(1, slidingWindow.getNumEvents()); - assertFalse(slidingWindow.isExceeded()); - } - - @Test - void testPartialExpiration() { - SlidingWindow slidingWindow = new SlidingWindow(3, Duration.ofSeconds(1), testClock); - - slidingWindow.add(); - slidingWindow.add(); - slidingWindow.add(); - slidingWindow.add(); - assertEquals(4, slidingWindow.getNumEvents()); - assertTrue(slidingWindow.isExceeded()); - - testClock.fastForward(600); - slidingWindow.add(); // this will remove the oldest event as the window is full - assertEquals(4, slidingWindow.getNumEvents()); - - // Fast forward time to expire the oldest events - testClock.fastForward(500); - assertEquals(1, slidingWindow.getNumEvents()); - assertFalse(slidingWindow.isExceeded()); - } - - @Test - void testZeroWindowSize() { - SlidingWindow slidingWindow = new SlidingWindow(0, Duration.ofSeconds(5), testClock); - - // Verify initial state - assertEquals(0, slidingWindow.getWindowSize()); - assertEquals(0, slidingWindow.getNumEvents()); - assertFalse(slidingWindow.isExceeded()); - - // Add an event - with window size 0, any event should cause isExceeded to return true - slidingWindow.add(); - assertEquals(1, slidingWindow.getNumEvents()); - assertTrue(slidingWindow.isExceeded()); - - // Add another event - should replace the previous one as window is exceeded - slidingWindow.add(); - assertEquals(1, slidingWindow.getNumEvents()); - assertTrue(slidingWindow.isExceeded()); - - // Test expiration - testClock.fastForward(6000); // Move past expiry time - assertEquals(0, slidingWindow.getNumEvents()); - assertFalse(slidingWindow.isExceeded()); - - // Add multiple events in sequence - should always keep only the latest one - for (int i = 0; i < 5; i++) { - slidingWindow.add(); - assertEquals(1, slidingWindow.getNumEvents()); - assertTrue(slidingWindow.isExceeded()); - } - } -} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java index b91405a42f20..b827d2251f6a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration; import org.apache.hadoop.hdds.server.ServerUtils; @@ -60,6 +61,7 @@ import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; +import org.apache.hadoop.ozone.container.common.volume.StorageVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory; import org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy; import org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeChoosingPolicy; @@ -83,8 +85,6 @@ public class DiskBalancerService extends BackgroundService { public static final String DISK_BALANCER_DIR = "diskBalancer"; - private static final String DISK_BALANCER_TMP_DIR = "tmp"; - private OzoneContainer ozoneContainer; private final ConfigurationSource conf; @@ -102,7 +102,7 @@ public class DiskBalancerService extends BackgroundService { private AtomicLong balancedBytesInLastWindow = new AtomicLong(0L); private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow()); - private Set inProgressContainers; + private Set inProgressContainers; private static FaultInjector injector; /** @@ -408,7 +408,7 @@ public BackgroundTaskQueue getTasks() { DiskBalancerTask task = new DiskBalancerTask(toBalanceContainer, sourceVolume, destVolume); queue.add(task); - inProgressContainers.add(toBalanceContainer.getContainerID()); + inProgressContainers.add(ContainerID.valueOf(toBalanceContainer.getContainerID())); deltaSizes.put(sourceVolume, deltaSizes.getOrDefault(sourceVolume, 0L) - toBalanceContainer.getBytesUsed()); } else { @@ -620,7 +620,7 @@ public int getPriority() { } private void postCall(boolean success, long startTime) { - inProgressContainers.remove(containerData.getContainerID()); + inProgressContainers.remove(ContainerID.valueOf(containerData.getContainerID())); deltaSizes.put(sourceVolume, deltaSizes.get(sourceVolume) + containerData.getBytesUsed()); destVolume.incCommittedBytes(0 - containerDefaultSize); @@ -677,7 +677,7 @@ public long calculateBytesToMove(MutableVolumeSet inputVolumeSet) { private Path getDiskBalancerTmpDir(HddsVolume hddsVolume) { return Paths.get(hddsVolume.getVolumeRootDir()) - .resolve(DISK_BALANCER_TMP_DIR).resolve(DISK_BALANCER_DIR); + .resolve(StorageVolume.TMP_DIR_NAME).resolve(DISK_BALANCER_DIR); } public DiskBalancerServiceMetrics getMetrics() { @@ -708,7 +708,7 @@ public void setContainerChoosingPolicy(ContainerChoosingPolicy containerChoosing } @VisibleForTesting - public Set getInProgressContainers() { + public Set getInProgressContainers() { return inProgressContainers; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/ContainerChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/ContainerChoosingPolicy.java index c3e89ab93d18..aa80ccdf8679 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/ContainerChoosingPolicy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/ContainerChoosingPolicy.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.container.diskbalancer.policy; import java.util.Set; +import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; @@ -35,5 +36,5 @@ public interface ContainerChoosingPolicy { * @return a Container */ ContainerData chooseContainer(OzoneContainer ozoneContainer, - HddsVolume volume, Set inProgressContainerIDs); + HddsVolume volume, Set inProgressContainerIDs); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java index 76a0f30c149d..bfc9d15a1daa 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.Set; import java.util.concurrent.ExecutionException; +import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; @@ -48,7 +49,7 @@ public class DefaultContainerChoosingPolicy implements ContainerChoosingPolicy { @Override public ContainerData chooseContainer(OzoneContainer ozoneContainer, - HddsVolume hddsVolume, Set inProgressContainerIDs) { + HddsVolume hddsVolume, Set inProgressContainerIDs) { Iterator> itr; try { itr = CACHE.get().get(hddsVolume, @@ -61,7 +62,8 @@ public ContainerData chooseContainer(OzoneContainer ozoneContainer, while (itr.hasNext()) { ContainerData containerData = itr.next().getContainerData(); if (!inProgressContainerIDs.contains( - containerData.getContainerID()) && (containerData.isClosed() || (test && containerData.isQuasiClosed()))) { + ContainerID.valueOf(containerData.getContainerID())) && + (containerData.isClosed() || (test && containerData.isQuasiClosed()))) { return containerData; } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java index cba87740f712..20cd2aef0c47 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java @@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.fs.SpaceUsageSource; import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil; import org.apache.hadoop.ozone.container.common.volume.AvailableSpaceFilter; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; @@ -58,20 +59,26 @@ public Pair chooseVolume(MutableVolumeSet volumeSet, List volumes = StorageVolumeUtil .getHddsVolumesList(volumeSet.getVolumesList()) .stream() - .filter(volume -> - Math.abs( - ((double)((volume.getCurrentUsage().getCapacity() - volume.getCurrentUsage().getAvailable()) + .filter(volume -> { + SpaceUsageSource usage = volume.getCurrentUsage(); + + return Math.abs( + ((double)((usage.getCapacity() - usage.getAvailable()) + deltaMap.getOrDefault(volume, 0L) + volume.getCommittedBytes())) - / volume.getCurrentUsage().getCapacity() - idealUsage) >= normalizedThreshold) - .sorted((v1, v2) -> - Double.compare( - (double) ((v2.getCurrentUsage().getCapacity() - v2.getCurrentUsage().getAvailable()) + / usage.getCapacity() - idealUsage) >= normalizedThreshold; + + }).sorted((v1, v2) -> { + SpaceUsageSource usage1 = v1.getCurrentUsage(); + SpaceUsageSource usage2 = v2.getCurrentUsage(); + + return Double.compare( + (double) ((usage2.getCapacity() - usage2.getAvailable()) + deltaMap.getOrDefault(v2, 0L) + v2.getCommittedBytes()) / - v2.getCurrentUsage().getCapacity(), - (double) ((v1.getCurrentUsage().getCapacity() - v1.getCurrentUsage().getAvailable()) + usage2.getCapacity(), + (double) ((usage1.getCapacity() - usage1.getAvailable()) + deltaMap.getOrDefault(v1, 0L) + v1.getCommittedBytes()) / - v1.getCurrentUsage().getCapacity())) - .collect(Collectors.toList()); + usage1.getCapacity()); + }).collect(Collectors.toList()); // Can not generate DiskBalancerTask if we have less than 2 results if (volumes.size() <= 1) { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java index ee72c347a605..f914d5f1c78c 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration; import org.apache.hadoop.hdds.utils.FaultInjector; @@ -71,6 +72,7 @@ import org.apache.hadoop.ozone.container.common.volume.StorageVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy; +import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; @@ -84,7 +86,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; @@ -117,6 +118,7 @@ public class TestDiskBalancerTask { private static final long CONTAINER_SIZE = 1024L * 1024L; // 1 MB private final TestFaultInjector kvFaultInjector = new TestFaultInjector(); + private String schemaVersion; /** * A FaultInjector that can be configured to throw an exception on a @@ -306,8 +308,11 @@ public void moveSuccess(State containerState) throws IOException { assertEquals(initialSourceDelta, diskBalancerService.getDeltaSizes().get(sourceVolume)); } - @Test - public void moveFailsAfterCopy() throws IOException, InterruptedException, TimeoutException, ExecutionException { + @ContainerTestVersionInfo.ContainerTest + public void moveFailsAfterCopy(ContainerTestVersionInfo versionInfo) + throws IOException, InterruptedException, TimeoutException, ExecutionException { + setLayoutAndSchemaForTest(versionInfo); + Container container = createContainer(CONTAINER_ID, sourceVolume, State.CLOSED); long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace(); long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace(); @@ -335,7 +340,7 @@ public void moveFailsAfterCopy() throws IOException, InterruptedException, Timeo } return false; }, 100, 30000); - assertTrue(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID)); + assertTrue(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID))); serviceFaultInjector.resume(); // wait for task to be completed @@ -351,12 +356,15 @@ public void moveFailsAfterCopy() throws IOException, InterruptedException, Timeo assertFalse(Files.exists(tempContainerDir), "Temp container directory should be cleaned up"); assertEquals(1, diskBalancerService.getMetrics().getFailureCount()); assertEquals(initialDestCommitted, destVolume.getCommittedBytes()); - assertFalse(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID)); + assertFalse(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID))); assertEquals(initialSourceDelta, diskBalancerService.getDeltaSizes().get(sourceVolume)); } - @Test - public void moveFailsOnAtomicMove() throws IOException, InterruptedException, TimeoutException, ExecutionException { + @ContainerTestVersionInfo.ContainerTest + public void moveFailsOnAtomicMove(ContainerTestVersionInfo versionInfo) + throws IOException, InterruptedException, TimeoutException, ExecutionException { + setLayoutAndSchemaForTest(versionInfo); + Container container = createContainer(CONTAINER_ID, sourceVolume, State.CLOSED); long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace(); long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace(); @@ -394,7 +402,7 @@ public void moveFailsOnAtomicMove() throws IOException, InterruptedException, Ti } return false; }, 100, 30000); - assertTrue(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID)); + assertTrue(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID))); serviceFaultInjector.resume(); completableFuture.get(); @@ -412,13 +420,15 @@ public void moveFailsOnAtomicMove() throws IOException, InterruptedException, Ti assertTrue(testfile.toFile().exists(), "testfile should not be cleaned up"); assertEquals(1, diskBalancerService.getMetrics().getFailureCount()); assertEquals(initialDestCommitted, destVolume.getCommittedBytes()); - assertFalse(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID)); + assertFalse(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID))); assertEquals(initialSourceDelta, diskBalancerService.getDeltaSizes().get(sourceVolume)); } - @Test - public void moveFailsDuringInMemoryUpdate() + @ContainerTestVersionInfo.ContainerTest + public void moveFailsDuringInMemoryUpdate(ContainerTestVersionInfo versionInfo) throws IOException, InterruptedException, TimeoutException, ExecutionException { + setLayoutAndSchemaForTest(versionInfo); + Container container = createContainer(CONTAINER_ID, sourceVolume, State.QUASI_CLOSED); long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace(); long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace(); @@ -453,7 +463,7 @@ public void moveFailsDuringInMemoryUpdate() } return false; }, 100, 30000); - assertTrue(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID)); + assertTrue(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID))); serviceFaultInjector.resume(); // wait for task to be completed completableFuture.get(); @@ -476,12 +486,14 @@ public void moveFailsDuringInMemoryUpdate() "Moved container at destination should be cleaned up on failure"); assertEquals(1, diskBalancerService.getMetrics().getFailureCount()); assertEquals(initialDestCommitted, destVolume.getCommittedBytes()); - assertFalse(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID)); + assertFalse(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID))); assertEquals(initialSourceDelta, diskBalancerService.getDeltaSizes().get(sourceVolume)); } - @Test - public void moveFailsDuringOldContainerRemove() throws IOException { + @ContainerTestVersionInfo.ContainerTest + public void moveFailsDuringOldContainerRemove(ContainerTestVersionInfo versionInfo) throws IOException { + setLayoutAndSchemaForTest(versionInfo); + Container container = createContainer(CONTAINER_ID, sourceVolume, State.CLOSED); long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace(); long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace(); @@ -523,13 +535,15 @@ public void moveFailsDuringOldContainerRemove() throws IOException { assertEquals(initialDestUsed + CONTAINER_SIZE, destVolume.getCurrentUsage().getUsedSpace()); assertEquals(initialDestCommitted, destVolume.getCommittedBytes()); - assertFalse(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID)); + assertFalse(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID))); assertEquals(initialSourceDelta, diskBalancerService.getDeltaSizes().get(sourceVolume)); } } - @Test - public void testDestVolumeCommittedSpaceReleased() throws IOException { + @ContainerTestVersionInfo.ContainerTest + public void testDestVolumeCommittedSpaceReleased(ContainerTestVersionInfo versionInfo) throws IOException { + setLayoutAndSchemaForTest(versionInfo); + createContainer(CONTAINER_ID, sourceVolume, State.CLOSED); long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace(); long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace(); @@ -558,7 +572,7 @@ public void testDestVolumeCommittedSpaceReleased() throws IOException { assertEquals(0, destVolume.getCommittedBytes() - initialDestCommitted); assertEquals(1, diskBalancerService.getMetrics().getFailureCount()); assertEquals(initialDestCommitted, destVolume.getCommittedBytes()); - assertFalse(diskBalancerService.getInProgressContainers().contains(CONTAINER_ID)); + assertFalse(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID))); assertEquals(initialSourceDelta, diskBalancerService.getDeltaSizes().get(sourceVolume)); } @@ -585,4 +599,9 @@ private KeyValueContainer createContainer(long containerId, HddsVolume vol, Stat private DiskBalancerService.DiskBalancerTask getTask() { return (DiskBalancerService.DiskBalancerTask) diskBalancerService.getTasks().poll(); } + + private void setLayoutAndSchemaForTest(ContainerTestVersionInfo versionInfo) { + this.schemaVersion = versionInfo.getSchemaVersion(); + ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf); + } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index a58fff467eac..a6658e4c5e96 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; +import jakarta.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; @@ -1213,8 +1214,8 @@ public List getDiskBalancerReport( @Override public List getDiskBalancerStatus( - List hosts, - HddsProtos.DiskBalancerRunningStatus status, + @Nullable List hosts, + @Nullable HddsProtos.DiskBalancerRunningStatus status, int clientVersion) throws IOException { DatanodeDiskBalancerInfoRequestProto.Builder requestBuilder = DatanodeDiskBalancerInfoRequestProto.newBuilder() @@ -1237,8 +1238,8 @@ public List getDiskBalancerStatus( @Override public List startDiskBalancer(Double threshold, - Long bandwidthInMB, Integer parallelThread, - Boolean stopAfterDiskEven, List hosts) + @Nullable Long bandwidthInMB, @Nullable Integer parallelThread, + @Nullable Boolean stopAfterDiskEven, @Nullable List hosts) throws IOException { HddsProtos.DiskBalancerConfigurationProto.Builder confBuilder = HddsProtos.DiskBalancerConfigurationProto.newBuilder(); @@ -1277,7 +1278,7 @@ public List startDiskBalancer(Double threshold, } @Override - public List stopDiskBalancer(List hosts) + public List stopDiskBalancer(@Nullable List hosts) throws IOException { DatanodeDiskBalancerOpRequestProto.Builder requestBuilder = DatanodeDiskBalancerOpRequestProto.newBuilder() @@ -1301,8 +1302,8 @@ public List stopDiskBalancer(List hosts) @Override public List updateDiskBalancerConfiguration( - Double threshold, Long bandwidthInMB, - Integer parallelThread, Boolean stopAfterDiskEven, List hosts) + @Nullable Double threshold, @Nullable Long bandwidthInMB, + @Nullable Integer parallelThread, @Nullable Boolean stopAfterDiskEven, @Nullable List hosts) throws IOException { HddsProtos.DiskBalancerConfigurationProto.Builder confBuilder = HddsProtos.DiskBalancerConfigurationProto.newBuilder(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index f578a295698a..c5efbe1bb8bc 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -35,6 +35,7 @@ import com.google.common.collect.Maps; import com.google.protobuf.BlockingService; import com.google.protobuf.ProtocolMessageEnum; +import jakarta.annotation.Nullable; import java.io.IOException; import java.net.InetSocketAddress; import java.time.Duration; @@ -1530,8 +1531,8 @@ public List getDiskBalancerReport( @Override public List getDiskBalancerStatus( - List hosts, - HddsProtos.DiskBalancerRunningStatus status, + @Nullable List hosts, + @Nullable HddsProtos.DiskBalancerRunningStatus status, int clientVersion) throws IOException { checkDiskBalancerEnabled(); return scm.getDiskBalancerManager().getDiskBalancerStatus(hosts, status, @@ -1539,9 +1540,9 @@ public List getDiskBalancerStatus( } @Override - public List startDiskBalancer(Double threshold, - Long bandwidthInMB, Integer parallelThread, - Boolean stopAfterDiskEven, List hosts) + public List startDiskBalancer(@Nullable Double threshold, + @Nullable Long bandwidthInMB, Integer parallelThread, + @Nullable Boolean stopAfterDiskEven, List hosts) throws IOException { checkDiskBalancerEnabled(); @@ -1557,7 +1558,7 @@ public List startDiskBalancer(Double threshold, } @Override - public List stopDiskBalancer(List hosts) + public List stopDiskBalancer(@Nullable List hosts) throws IOException { checkDiskBalancerEnabled(); @@ -1572,8 +1573,8 @@ public List stopDiskBalancer(List hosts) @Override public List updateDiskBalancerConfiguration( - Double threshold, Long bandwidthInMB, - Integer parallelThread, Boolean stopAfterDiskEven, List hosts) + @Nullable Double threshold, @Nullable Long bandwidthInMB, + @Nullable Integer parallelThread, @Nullable Boolean stopAfterDiskEven, @Nullable List hosts) throws IOException { checkDiskBalancerEnabled(); diff --git a/hadoop-ozone/cli-admin/pom.xml b/hadoop-ozone/cli-admin/pom.xml index 7357cdeb3bff..e10864206e8f 100644 --- a/hadoop-ozone/cli-admin/pom.xml +++ b/hadoop-ozone/cli-admin/pom.xml @@ -60,6 +60,10 @@ info.picocli picocli + + jakarta.annotation + jakarta.annotation-api + org.apache.commons commons-lang3 diff --git a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java index d5dbff4dad20..cfca72a67116 100644 --- a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java +++ b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT; import com.google.common.base.Preconditions; +import jakarta.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -611,23 +612,23 @@ public List getDiskBalancerReport( } @Override - public List startDiskBalancer(Double threshold, - Long bandwidthInMB, Integer parallelThread, Boolean stopAfterDiskEven, - List hosts) throws IOException { + public List startDiskBalancer(@Nullable Double threshold, + @Nullable Long bandwidthInMB, @Nullable Integer parallelThread, @Nullable Boolean stopAfterDiskEven, + @Nullable List hosts) throws IOException { return storageContainerLocationClient.startDiskBalancer(threshold, bandwidthInMB, parallelThread, stopAfterDiskEven, hosts); } @Override - public List stopDiskBalancer(List hosts) + public List stopDiskBalancer(@Nullable List hosts) throws IOException { return storageContainerLocationClient.stopDiskBalancer(hosts); } @Override public List getDiskBalancerStatus( - List hosts, - HddsProtos.DiskBalancerRunningStatus runningStatus) + @Nullable List hosts, + @Nullable HddsProtos.DiskBalancerRunningStatus runningStatus) throws IOException { return storageContainerLocationClient.getDiskBalancerStatus(hosts, runningStatus, ClientVersion.CURRENT_VERSION); @@ -635,8 +636,8 @@ public List getDiskBalancerStatus( @Override public List updateDiskBalancerConfiguration( - Double threshold, Long bandwidth, - Integer parallelThread, Boolean stopAfterDiskEven, List hosts) + @Nullable Double threshold, @Nullable Long bandwidth, + @Nullable Integer parallelThread, @Nullable Boolean stopAfterDiskEven, @Nullable List hosts) throws IOException { return storageContainerLocationClient.updateDiskBalancerConfiguration( threshold, bandwidth, parallelThread, stopAfterDiskEven, hosts); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPolicy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPolicy.java index 68b5aa4e940f..2869e8de90ee 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPolicy.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPolicy.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hdds.fs.SpaceUsagePersistence; import org.apache.hadoop.hdds.fs.SpaceUsageSource; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto; +import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.impl.ContainerDataScanOrder; @@ -90,7 +91,7 @@ public class TestContainerChoosingPolicy { private ContainerController containerController; // Simulate containers currently being balanced (in progress) - private Set inProgressContainerIDs = ConcurrentHashMap.newKeySet(); + private Set inProgressContainerIDs = ConcurrentHashMap.newKeySet(); @BeforeEach public void setup() throws Exception { @@ -139,7 +140,7 @@ public void testContainerDeletionAfterIteratorGeneration() throws Exception { ContainerData container = containerChoosingPolicy.chooseContainer(ozoneContainer, volume, inProgressContainerIDs); assertEquals(containerList.get(0).getContainerData().getContainerID(), container.getContainerID()); ozoneContainer.getContainerSet().removeContainer(containerList.get(1).getContainerData().getContainerID()); - inProgressContainerIDs.add(container.getContainerID()); + inProgressContainerIDs.add(ContainerID.valueOf(container.getContainerID())); container = containerChoosingPolicy.chooseContainer(ozoneContainer, volume, inProgressContainerIDs); assertEquals(containerList.get(1).getContainerData().getContainerID(), container.getContainerID()); } @@ -175,7 +176,7 @@ private void testPolicyPerformance(String policyName, ContainerChoosingPolicy po } else { containerChosen++; if (inProgressContainerIDs.size() < MAX_IN_PROGRESS) { - inProgressContainerIDs.add(c.getContainerID()); + inProgressContainerIDs.add(ContainerID.valueOf(c.getContainerID())); } } } catch (Exception e) { @@ -231,7 +232,7 @@ public void createVolumes() throws IOException { } public void createContainers() { - List closedContainerIDs = new ArrayList<>(); + List closedContainerIDs = new ArrayList<>(); Random random = new Random(); long startTime = System.currentTimeMillis(); @@ -256,7 +257,7 @@ public void createContainers() { // Collect IDs of closed containers if (!isOpen) { - closedContainerIDs.add((long) i); + closedContainerIDs.add(ContainerID.valueOf((long) i)); } }