diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java index 8857913c9ad2..17417f8c98d9 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java @@ -496,6 +496,7 @@ List startDiskBalancer( Optional threshold, Optional bandwidthInMB, Optional parallelThread, + Optional stopAfterDiskEven, Optional> hosts) throws IOException; /** @@ -512,5 +513,6 @@ List updateDiskBalancerConfiguration( Optional threshold, Optional bandwidth, Optional parallelThread, + Optional stopAfterDiskEven, Optional> hosts) throws IOException; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java index 8774e6eb073f..7b9ad4c10ff3 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java @@ -508,6 +508,7 @@ List startDiskBalancer( Optional threshold, Optional bandwidthInMB, Optional parallelThread, + Optional stopAfterDiskEven, Optional> hosts) throws IOException; /** @@ -523,5 +524,6 @@ List updateDiskBalancerConfiguration( Optional threshold, Optional bandwidthInMB, Optional parallelThread, + Optional stopAfterDiskEven, Optional> hosts) throws IOException; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java index 951cc62a8a72..6984bb94a95d 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java @@ -110,12 +110,20 @@ public final class DiskBalancerConfiguration { "service.") private Class containerChoosingPolicyClass; + @Config(key = "stop.after.disk.even", + type = ConfigType.BOOLEAN, + defaultValue = "true", + tags = {ConfigTag.DISKBALANCER}, + description = "If true, the DiskBalancer will automatically stop once disks are balanced.") + private boolean stopAfterDiskEven = true; + public DiskBalancerConfiguration(Optional threshold, Optional bandwidthInMB, - Optional parallelThread) { + Optional parallelThread, Optional stopAfterDiskEven) { threshold.ifPresent(aDouble -> this.threshold = aDouble); bandwidthInMB.ifPresent(aLong -> this.diskBandwidthInMB = aLong); parallelThread.ifPresent(integer -> this.parallelThread = integer); + stopAfterDiskEven.ifPresent(bool -> this.stopAfterDiskEven = bool); } public DiskBalancerConfiguration() { @@ -157,6 +165,14 @@ public Class getContainerChoosingPolicyClass() { return containerChoosingPolicyClass; } + public boolean isStopAfterDiskEven() { + return stopAfterDiskEven; + } + + public void setStopAfterDiskEven(boolean stopAfterDiskEven) { + this.stopAfterDiskEven = stopAfterDiskEven; + } + /** * Gets the threshold value for DiskBalancer. * @@ -234,10 +250,11 @@ public String toString() { "%-50s %s%n" + "%-50s %s%n" + "%-50s %s%n" + + "%-50s %s%n" + "%-50s %s%n", "Key", "Value", "Threshold", threshold, "Max disk bandwidth", diskBandwidthInMB, - "Parallel Thread", parallelThread); + "Parallel Thread", parallelThread, "Stop After Disk Even", stopAfterDiskEven); } public HddsProtos.DiskBalancerConfigurationProto.Builder toProtobufBuilder() { @@ -246,7 +263,8 @@ public HddsProtos.DiskBalancerConfigurationProto.Builder toProtobufBuilder() { builder.setThreshold(threshold) .setDiskBandwidthInMB(diskBandwidthInMB) - .setParallelThread(parallelThread); + .setParallelThread(parallelThread) + .setStopAfterDiskEven(stopAfterDiskEven); return builder; } @@ -262,6 +280,9 @@ public static DiskBalancerConfiguration fromProtobuf( if (proto.hasParallelThread()) { config.setParallelThread(proto.getParallelThread()); } + if (proto.hasStopAfterDiskEven()) { + config.setStopAfterDiskEven(proto.getStopAfterDiskEven()); + } return config; } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java index 236c22063f10..60eabf2376c3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java @@ -31,6 +31,7 @@ public class DiskBalancerInfo { private double threshold; private long bandwidthInMB; private int parallelThread; + private boolean stopAfterDiskEven; private DiskBalancerVersion version; private long successCount; private long failureCount; @@ -38,28 +39,30 @@ public class DiskBalancerInfo { private long balancedBytes; public DiskBalancerInfo(boolean shouldRun, double threshold, - long bandwidthInMB, int parallelThread) { - this(shouldRun, threshold, bandwidthInMB, parallelThread, + long bandwidthInMB, int parallelThread, boolean stopAfterDiskEven) { + this(shouldRun, threshold, bandwidthInMB, parallelThread, stopAfterDiskEven, DiskBalancerVersion.DEFAULT_VERSION); } public DiskBalancerInfo(boolean shouldRun, double threshold, - long bandwidthInMB, int parallelThread, DiskBalancerVersion version) { + long bandwidthInMB, int parallelThread, boolean stopAfterDiskEven, DiskBalancerVersion version) { this.shouldRun = shouldRun; this.threshold = threshold; this.bandwidthInMB = bandwidthInMB; this.parallelThread = parallelThread; + this.stopAfterDiskEven = stopAfterDiskEven; this.version = version; } @SuppressWarnings("checkstyle:ParameterNumber") public DiskBalancerInfo(boolean shouldRun, double threshold, - long bandwidthInMB, int parallelThread, DiskBalancerVersion version, + long bandwidthInMB, int parallelThread, boolean stopAfterDiskEven, DiskBalancerVersion version, long successCount, long failureCount, long bytesToMove, long balancedBytes) { this.shouldRun = shouldRun; this.threshold = threshold; this.bandwidthInMB = bandwidthInMB; this.parallelThread = parallelThread; + this.stopAfterDiskEven = stopAfterDiskEven; this.version = version; this.successCount = successCount; this.failureCount = failureCount; @@ -73,6 +76,7 @@ public DiskBalancerInfo(boolean shouldRun, this.threshold = diskBalancerConf.getThreshold(); this.bandwidthInMB = diskBalancerConf.getDiskBandwidthInMB(); this.parallelThread = diskBalancerConf.getParallelThread(); + this.stopAfterDiskEven = diskBalancerConf.isStopAfterDiskEven(); this.version = DiskBalancerVersion.DEFAULT_VERSION; } @@ -86,11 +90,14 @@ public void updateFromConf(DiskBalancerConfiguration diskBalancerConf) { if (parallelThread != diskBalancerConf.getParallelThread()) { setParallelThread(diskBalancerConf.getParallelThread()); } + if (stopAfterDiskEven != diskBalancerConf.isStopAfterDiskEven()) { + setStopAfterDiskEven(diskBalancerConf.isStopAfterDiskEven()); + } } public StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto toDiskBalancerReportProto() { DiskBalancerConfiguration conf = new DiskBalancerConfiguration(Optional.of(threshold), - Optional.of(bandwidthInMB), Optional.of(parallelThread)); + Optional.of(bandwidthInMB), Optional.of(parallelThread), Optional.of(stopAfterDiskEven)); HddsProtos.DiskBalancerConfigurationProto confProto = conf.toProtobufBuilder().build(); StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto.Builder builder = @@ -136,6 +143,14 @@ public void setParallelThread(int parallelThread) { this.parallelThread = parallelThread; } + public boolean isStopAfterDiskEven() { + return stopAfterDiskEven; + } + + public void setStopAfterDiskEven(boolean stopAfterDiskEven) { + this.stopAfterDiskEven = stopAfterDiskEven; + } + public DiskBalancerVersion getVersion() { return version; } @@ -157,12 +172,13 @@ public boolean equals(Object o) { Double.compare(that.threshold, threshold) == 0 && bandwidthInMB == that.bandwidthInMB && parallelThread == that.parallelThread && + stopAfterDiskEven == that.stopAfterDiskEven && version == that.version; } @Override public int hashCode() { - return Objects.hash(shouldRun, threshold, bandwidthInMB, parallelThread, + return Objects.hash(shouldRun, threshold, bandwidthInMB, parallelThread, stopAfterDiskEven, version); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java index 558b806b3904..610fe816a854 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java @@ -32,6 +32,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -58,7 +59,6 @@ import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures; import org.apache.hadoop.util.Time; -import org.apache.ratis.util.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,6 +82,7 @@ public class DiskBalancerService extends BackgroundService { private double threshold; private long bandwidthInMB; private int parallelThread; + private boolean stopAfterDiskEven; private DiskBalancerVersion version; @@ -161,8 +162,8 @@ private void constructTmpDir() throws IOException { StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())) { Path tmpDir = getDiskBalancerTmpDir(volume); try { - FileUtils.deleteFully(tmpDir); - FileUtils.createDirectories(tmpDir); + FileUtils.deleteDirectory(tmpDir.toFile()); + FileUtils.forceMkdir(tmpDir.toFile()); } catch (IOException ex) { LOG.warn("Can not reconstruct tmp directory under volume {}", volume, ex); @@ -206,6 +207,7 @@ private void applyDiskBalancerInfo(DiskBalancerInfo diskBalancerInfo) setThreshold(diskBalancerInfo.getThreshold()); setBandwidthInMB(diskBalancerInfo.getBandwidthInMB()); setParallelThread(diskBalancerInfo.getParallelThread()); + setStopAfterDiskEven(diskBalancerInfo.isStopAfterDiskEven()); setVersion(diskBalancerInfo.getVersion()); // Default executorService is ScheduledThreadPoolExecutor, so we can @@ -295,6 +297,10 @@ public void setParallelThread(int parallelThread) { this.parallelThread = parallelThread; } + public void setStopAfterDiskEven(boolean stopAfterDiskEven) { + this.stopAfterDiskEven = stopAfterDiskEven; + } + public void setVersion(DiskBalancerVersion version) { this.version = version; } @@ -309,6 +315,7 @@ public DiskBalancerReportProto getDiskBalancerReportProto() { .setThreshold(threshold) .setDiskBandwidthInMB(bandwidthInMB) .setParallelThread(parallelThread) + .setStopAfterDiskEven(stopAfterDiskEven) .build()) .build(); } @@ -354,6 +361,18 @@ public BackgroundTaskQueue getTasks() { } if (queue.isEmpty()) { + bytesToMove = 0; + if (stopAfterDiskEven) { + LOG.info("Disk balancer is stopped due to disk even as" + + " the property StopAfterDiskEven is set to true."); + setShouldRun(false); + try { + // Persist the updated shouldRun status into the YAML file + writeDiskBalancerInfoTo(getDiskBalancerInfo(), diskBalancerInfoFile); + } catch (IOException e) { + LOG.warn("Failed to persist updated DiskBalancerInfo to file.", e); + } + } metrics.incrIdleLoopNoAvailableVolumePairCount(); } else { bytesToMove = calculateBytesToMove(volumeSet); @@ -463,9 +482,11 @@ public BackgroundTaskResult call() { totalBalancedBytes.addAndGet(containerSize); } catch (IOException e) { moveSucceeded = false; + LOG.warn("Failed to move container {}", containerId, e); if (diskBalancerTmpDir != null) { try { - Files.deleteIfExists(diskBalancerTmpDir); + File dir = new File(String.valueOf(diskBalancerTmpDir)); + FileUtils.deleteDirectory(dir); } catch (IOException ex) { LOG.warn("Failed to delete tmp directory {}", diskBalancerTmpDir, ex); @@ -473,10 +494,11 @@ public BackgroundTaskResult call() { } if (diskBalancerDestDir != null) { try { - Files.deleteIfExists(diskBalancerDestDir); + File dir = new File(String.valueOf(diskBalancerDestDir)); + FileUtils.deleteDirectory(dir); } catch (IOException ex) { - LOG.warn("Failed to delete dest directory {}: {}.", - diskBalancerDestDir, ex.getMessage()); + LOG.warn("Failed to delete dest directory {}", + diskBalancerDestDir, ex); } } // Only need to check for destVolume, sourceVolume's usedSpace is @@ -514,7 +536,7 @@ private void postCall() { public DiskBalancerInfo getDiskBalancerInfo() { return new DiskBalancerInfo(shouldRun, threshold, bandwidthInMB, - parallelThread, version, metrics.getSuccessCount(), + parallelThread, stopAfterDiskEven, version, metrics.getSuccessCount(), metrics.getFailureCount(), bytesToMove, metrics.getSuccessBytes()); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerYaml.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerYaml.java index 381efdd08e46..d33e75cded73 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerYaml.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerYaml.java @@ -79,6 +79,7 @@ public static DiskBalancerInfo readDiskBalancerInfoFile(File path) diskBalancerInfoYaml.getThreshold(), diskBalancerInfoYaml.getBandwidthInMB(), diskBalancerInfoYaml.getParallelThread(), + diskBalancerInfoYaml.isStopAfterDiskEven(), DiskBalancerVersion.getDiskBalancerVersion( diskBalancerInfoYaml.version)); } @@ -94,6 +95,7 @@ public static class DiskBalancerInfoYaml { private double threshold; private long bandwidthInMB; private int parallelThread; + private boolean stopAfterDiskEven; private int version; @@ -102,11 +104,12 @@ public DiskBalancerInfoYaml() { } private DiskBalancerInfoYaml(boolean shouldRun, double threshold, - long bandwidthInMB, int parallelThread, int version) { + long bandwidthInMB, int parallelThread, boolean stopAfterDiskEven, int version) { this.shouldRun = shouldRun; this.threshold = threshold; this.bandwidthInMB = bandwidthInMB; this.parallelThread = parallelThread; + this.stopAfterDiskEven = stopAfterDiskEven; this.version = version; } @@ -142,6 +145,14 @@ public int getParallelThread() { return this.parallelThread; } + public boolean isStopAfterDiskEven() { + return stopAfterDiskEven; + } + + public void setStopAfterDiskEven(boolean stopAfterDiskEven) { + this.stopAfterDiskEven = stopAfterDiskEven; + } + public void setVersion(int version) { this.version = version; } @@ -159,6 +170,7 @@ private static DiskBalancerInfoYaml getDiskBalancerInfoYaml( diskBalancerInfo.getThreshold(), diskBalancerInfo.getBandwidthInMB(), diskBalancerInfo.getParallelThread(), + diskBalancerInfo.isStopAfterDiskEven(), diskBalancerInfo.getVersion().getVersion()); } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java index 831d9c47823f..47cd07a9506b 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java @@ -79,7 +79,7 @@ public class TestHeartbeatEndpointTask { public void setup() { datanodeStateMachine = mock(DatanodeStateMachine.class); container = mock(OzoneContainer.class); - when(container.getDiskBalancerInfo()).thenReturn(new DiskBalancerInfo(true, 10, 20, 30)); + when(container.getDiskBalancerInfo()).thenReturn(new DiskBalancerInfo(true, 10, 20, 30, true)); when(datanodeStateMachine.getContainer()).thenReturn(container); PipelineReportsProto pipelineReportsProto = mock(PipelineReportsProto.class); when(pipelineReportsProto.getPipelineReportList()).thenReturn(Collections.emptyList()); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java index 9c9360dc4bec..eb591215d61f 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java @@ -120,6 +120,7 @@ public void testUpdateService(ContainerTestVersionInfo versionInfo) throws Excep svc.setThreshold(10.0d); svc.setBandwidthInMB(1L); svc.setParallelThread(5); + svc.setStopAfterDiskEven(true); svc.setVersion(DiskBalancerVersion.DEFAULT_VERSION); svc.start(); @@ -128,14 +129,16 @@ public void testUpdateService(ContainerTestVersionInfo versionInfo) throws Excep assertEquals(10, svc.getDiskBalancerInfo().getThreshold(), 0.0); assertEquals(1, svc.getDiskBalancerInfo().getBandwidthInMB()); assertEquals(5, svc.getDiskBalancerInfo().getParallelThread()); + assertTrue(svc.getDiskBalancerInfo().isStopAfterDiskEven()); - DiskBalancerInfo newInfo = new DiskBalancerInfo(false, 20.0d, 5L, 10); + DiskBalancerInfo newInfo = new DiskBalancerInfo(false, 20.0d, 5L, 10, false); svc.refresh(newInfo); assertFalse(svc.getDiskBalancerInfo().isShouldRun()); assertEquals(20, svc.getDiskBalancerInfo().getThreshold(), 0.0); assertEquals(5, svc.getDiskBalancerInfo().getBandwidthInMB()); assertEquals(10, svc.getDiskBalancerInfo().getParallelThread()); + assertFalse(svc.getDiskBalancerInfo().isStopAfterDiskEven()); svc.shutdown(); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerYaml.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerYaml.java index 7eccda2e4750..6cdd087097b7 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerYaml.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerYaml.java @@ -39,13 +39,14 @@ public void testCreateYaml() throws IOException { double threshold = 10; long bandwidthInMB = 100; int parallelThread = 5; + boolean stopAfterDiskEven = true; DiskBalancerVersion version = DiskBalancerVersion.DEFAULT_VERSION; File file = new File(tmpDir.toString(), OZONE_SCM_DATANODE_DISK_BALANCER_INFO_FILE_DEFAULT); DiskBalancerInfo info = new DiskBalancerInfo(shouldRun, threshold, - bandwidthInMB, parallelThread, version); + bandwidthInMB, parallelThread, stopAfterDiskEven, version); DiskBalancerYaml.createDiskBalancerInfoFile(info, file); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index a5fd9cfbb7a2..43cff0f52dff 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -1218,12 +1218,14 @@ public List getDiskBalancerStatus( @Override public List startDiskBalancer(Optional threshold, Optional bandwidthInMB, Optional parallelThread, - Optional> hosts) throws IOException { + Optional stopAfterDiskEven, Optional> hosts) + throws IOException { HddsProtos.DiskBalancerConfigurationProto.Builder confBuilder = HddsProtos.DiskBalancerConfigurationProto.newBuilder(); threshold.ifPresent(confBuilder::setThreshold); bandwidthInMB.ifPresent(confBuilder::setDiskBandwidthInMB); parallelThread.ifPresent(confBuilder::setParallelThread); + stopAfterDiskEven.ifPresent(confBuilder::setStopAfterDiskEven); DatanodeDiskBalancerOpRequestProto.Builder requestBuilder = DatanodeDiskBalancerOpRequestProto.newBuilder() @@ -1268,13 +1270,14 @@ public List stopDiskBalancer(Optional> hosts) @Override public List updateDiskBalancerConfiguration( Optional threshold, Optional bandwidthInMB, - Optional parallelThread, Optional> hosts) + Optional parallelThread, Optional stopAfterDiskEven, Optional> hosts) throws IOException { HddsProtos.DiskBalancerConfigurationProto.Builder confBuilder = HddsProtos.DiskBalancerConfigurationProto.newBuilder(); threshold.ifPresent(confBuilder::setThreshold); bandwidthInMB.ifPresent(confBuilder::setDiskBandwidthInMB); parallelThread.ifPresent(confBuilder::setParallelThread); + stopAfterDiskEven.ifPresent(confBuilder::setStopAfterDiskEven); DatanodeDiskBalancerOpRequestProto.Builder requestBuilder = DatanodeDiskBalancerOpRequestProto.newBuilder() diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto index 10f1f197a1f1..da8aedd310d7 100644 --- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto +++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto @@ -552,6 +552,7 @@ message DiskBalancerConfigurationProto { optional double threshold = 1; optional uint64 diskBandwidthInMB = 2; optional int32 parallelThread = 3; + optional bool stopAfterDiskEven = 4; } enum DiskBalancerRunningStatus { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java index b6a703200713..4e18758e8904 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java @@ -140,8 +140,8 @@ public List getDiskBalancerStatus( */ public List startDiskBalancer( Optional threshold, Optional bandwidthInMB, - Optional parallelThread, Optional> hosts) - throws IOException { + Optional parallelThread, Optional stopAfterDiskEven, + Optional> hosts) throws IOException { List dns; if (hosts.isPresent()) { dns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(), @@ -161,7 +161,7 @@ public List startDiskBalancer( // If command doesn't have configuration change, then we reuse the // latest configuration reported from Datnaodes DiskBalancerConfiguration updateConf = attachDiskBalancerConf(dn, - threshold, bandwidthInMB, parallelThread); + threshold, bandwidthInMB, parallelThread, stopAfterDiskEven); DiskBalancerCommand command = new DiskBalancerCommand( HddsProtos.DiskBalancerOpType.START, updateConf); sendCommand(dn, command); @@ -213,7 +213,7 @@ public List stopDiskBalancer(Optional> hosts) */ public List updateDiskBalancerConfiguration( Optional threshold, Optional bandwidthInMB, - Optional parallelThread, Optional> hosts) + Optional parallelThread, Optional stopAfterDiskEven, Optional> hosts) throws IOException { List dns; if (hosts.isPresent()) { @@ -229,7 +229,7 @@ public List updateDiskBalancerConfiguration( // If command doesn't have configuration change, then we reuse the // latest configuration reported from Datnaodes DiskBalancerConfiguration updateConf = attachDiskBalancerConf(dn, - threshold, bandwidthInMB, parallelThread); + threshold, bandwidthInMB, parallelThread, stopAfterDiskEven); DiskBalancerCommand command = new DiskBalancerCommand( HddsProtos.DiskBalancerOpType.UPDATE, updateConf); sendCommand(dn, command); @@ -329,7 +329,7 @@ public void processDiskBalancerReport(DiskBalancerReportProto reportProto, statusMap.put(dn, new DiskBalancerStatus( isRunning ? DiskBalancerRunningStatus.RUNNING : DiskBalancerRunningStatus.STOPPED, diskBalancerConfiguration, successMoveCount, failureMoveCount, bytesToMove, balancedBytes)); - if (reportProto.hasBalancedBytes()) { + if (reportProto.hasBalancedBytes() && balancedBytesMap != null) { balancedBytesMap.put(dn, reportProto.getBalancedBytes()); } } @@ -346,13 +346,14 @@ public void markStatusUnknown(DatanodeDetails dn) { private DiskBalancerConfiguration attachDiskBalancerConf( DatanodeDetails dn, Optional threshold, - Optional bandwidthInMB, Optional parallelThread) { + Optional bandwidthInMB, Optional parallelThread, Optional stopAfterDiskEven) { DiskBalancerConfiguration baseConf = statusMap.containsKey(dn) ? statusMap.get(dn).getDiskBalancerConfiguration() : new DiskBalancerConfiguration(); threshold.ifPresent(baseConf::setThreshold); bandwidthInMB.ifPresent(baseConf::setDiskBandwidthInMB); parallelThread.ifPresent(baseConf::setParallelThread); + stopAfterDiskEven.ifPresent(baseConf::setStopAfterDiskEven); return baseConf; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java index 9b6647924f00..87cbd064010d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -1404,6 +1404,7 @@ public DatanodeDiskBalancerOpResponseProto getDatanodeDiskBalancerOp( conf.hasThreshold() ? Optional.of(conf.getThreshold()) : Optional.empty(), conf.hasDiskBandwidthInMB() ? Optional.of(conf.getDiskBandwidthInMB()) : Optional.empty(), conf.hasParallelThread() ? Optional.of(conf.getParallelThread()) : Optional.empty(), + conf.hasStopAfterDiskEven() ? Optional.of(conf.getStopAfterDiskEven()) : Optional.empty(), request.getHostsList().isEmpty() ? Optional.empty() : Optional.of(request.getHostsList())); break; case UPDATE: @@ -1412,6 +1413,7 @@ public DatanodeDiskBalancerOpResponseProto getDatanodeDiskBalancerOp( conf.hasThreshold() ? Optional.of(conf.getThreshold()) : Optional.empty(), conf.hasDiskBandwidthInMB() ? Optional.of(conf.getDiskBandwidthInMB()) : Optional.empty(), conf.hasParallelThread() ? Optional.of(conf.getParallelThread()) : Optional.empty(), + conf.hasStopAfterDiskEven() ? Optional.of(conf.getStopAfterDiskEven()) : Optional.empty(), request.getHostsList().isEmpty() ? Optional.empty() : Optional.of(request.getHostsList())); break; case STOP: diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index 1ab3ede316eb..2a7f259e799b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -1530,7 +1530,8 @@ public List getDiskBalancerStatus( @Override public List startDiskBalancer(Optional threshold, Optional bandwidthInMB, Optional parallelThread, - Optional> hosts) throws IOException { + Optional stopAfterDiskEven, Optional> hosts) + throws IOException { try { getScm().checkAdminAccess(getRemoteUser(), false); } catch (IOException e) { @@ -1539,7 +1540,7 @@ public List startDiskBalancer(Optional threshold, } return scm.getDiskBalancerManager() - .startDiskBalancer(threshold, bandwidthInMB, parallelThread, hosts); + .startDiskBalancer(threshold, bandwidthInMB, parallelThread, stopAfterDiskEven, hosts); } @Override @@ -1557,7 +1558,7 @@ public List stopDiskBalancer(Optional> hosts) @Override public List updateDiskBalancerConfiguration( Optional threshold, Optional bandwidthInMB, - Optional parallelThread, Optional> hosts) + Optional parallelThread, Optional stopAfterDiskEven, Optional> hosts) throws IOException { try { getScm().checkAdminAccess(getRemoteUser(), false); @@ -1567,7 +1568,7 @@ public List updateDiskBalancerConfiguration( } return scm.getDiskBalancerManager().updateDiskBalancerConfiguration( - threshold, bandwidthInMB, parallelThread, hosts); + threshold, bandwidthInMB, parallelThread, stopAfterDiskEven, hosts); } /** diff --git a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java index cbe071818c46..7e94a0aebf82 100644 --- a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java +++ b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java @@ -607,10 +607,10 @@ public List getDiskBalancerReport( @Override public List startDiskBalancer(Optional threshold, - Optional bandwidthInMB, Optional parallelThread, + Optional bandwidthInMB, Optional parallelThread, Optional stopAfterDiskEven, Optional> hosts) throws IOException { return storageContainerLocationClient.startDiskBalancer(threshold, - bandwidthInMB, parallelThread, hosts); + bandwidthInMB, parallelThread, stopAfterDiskEven, hosts); } @Override @@ -631,9 +631,9 @@ public List getDiskBalancerStatus( @Override public List updateDiskBalancerConfiguration( Optional threshold, Optional bandwidth, - Optional parallelThread, Optional> hosts) + Optional parallelThread, Optional stopAfterDiskEven, Optional> hosts) throws IOException { return storageContainerLocationClient.updateDiskBalancerConfiguration( - threshold, bandwidth, parallelThread, hosts); + threshold, bandwidth, parallelThread, stopAfterDiskEven, hosts); } } diff --git a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerCommands.java b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerCommands.java index eec770f51783..fd8a98468f8d 100644 --- a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerCommands.java +++ b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerCommands.java @@ -34,6 +34,7 @@ * [ -t/--threshold {@literal }] * [ -b/--bandwidthInMB {@literal }] * [ -p/--parallelThread {@literal }] + * [ -s/--stop-after-disk-even {@literal }] * [ -a/--all {@literal }] * [ -d/--datanodes {@literal }] * [ {@literal }] @@ -43,13 +44,16 @@ * datanodes * ozone admin datanode diskbalancer start -a * start balancer with default values in the configuration on all - * datanodes in the cluster + * datanodes in the cluster and stops automatically after balancing * ozone admin datanode diskbalancer start -t 5 -d {@literal } * start balancer with a threshold of 5% * ozone admin datanode diskbalancer start -b 20 -d {@literal } * start balancer with maximum 20MB/s diskbandwidth * ozone admin datanode diskbalancer start -p 5 -d {@literal } * start balancer with 5 parallel thread on each datanode + * ozone admin datanode diskbalancer start -s=false -a} + * start balancer on each datanode and will keep running even after + * disks are balanced until stopped by the stop command. * To stop: * ozone admin datanode diskbalancer stop -a * stop diskblancer on all datanodes diff --git a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java index 23084b89473a..c37f56019862 100644 --- a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java +++ b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java @@ -53,6 +53,10 @@ public class DiskBalancerStartSubcommand extends ScmSubcommand { description = "Max parallelThread for DiskBalancer.") private Optional parallelThread; + @Option(names = {"-s", "--stop-after-disk-even"}, + description = "Stop DiskBalancer automatically after disk utilization is even.") + private Optional stopAfterDiskEven; + @CommandLine.Mixin private DiskBalancerCommonOptions commonOptions = new DiskBalancerCommonOptions(); @@ -63,7 +67,7 @@ public void execute(ScmClient scmClient) throws IOException { return; } List errors = - scmClient.startDiskBalancer(threshold, bandwidthInMB, parallelThread, + scmClient.startDiskBalancer(threshold, bandwidthInMB, parallelThread, stopAfterDiskEven, commonOptions.getSpecifiedDatanodes()); System.out.println("Start DiskBalancer on datanode(s):\n" + diff --git a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java index 08deec169b37..6f1b7ea0d1de 100644 --- a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java +++ b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java @@ -53,6 +53,10 @@ public class DiskBalancerUpdateSubcommand extends ScmSubcommand { description = "Max parallelThread for DiskBalancer.") private Optional parallelThread; + @Option(names = {"-s", "--stop-after-disk-even"}, + description = "Stop DiskBalancer automatically after disk utilization is even.") + private Optional stopAfterDiskEven; + @CommandLine.Mixin private DiskBalancerCommonOptions commonOptions = new DiskBalancerCommonOptions(); @@ -64,7 +68,7 @@ public void execute(ScmClient scmClient) throws IOException { } List errors = scmClient.updateDiskBalancerConfiguration(threshold, bandwidthInMB, - parallelThread, commonOptions.getSpecifiedDatanodes()); + parallelThread, stopAfterDiskEven, commonOptions.getSpecifiedDatanodes()); System.out.println("Update DiskBalancer Configuration on datanode(s):\n" + commonOptions.getHostString()); diff --git a/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestDiskBalancerSubCommand.java b/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestDiskBalancerSubCommand.java index ac97a27f2530..091ae6815414 100644 --- a/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestDiskBalancerSubCommand.java +++ b/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestDiskBalancerSubCommand.java @@ -116,7 +116,7 @@ public void testDiskBalancerStartSubcommand() throws IOException { // Return error Mockito.when(scmClient.startDiskBalancer(Mockito.any(), Mockito.any(), - Mockito.any(), Mockito.any())) + Mockito.any(), Mockito.any(), Mockito.any())) .thenReturn(generateError(10)); try { @@ -127,7 +127,7 @@ public void testDiskBalancerStartSubcommand() throws IOException { // Do not return error Mockito.when(scmClient.startDiskBalancer(Mockito.any(), Mockito.any(), - Mockito.any(), Mockito.any())) + Mockito.any(), Mockito.any(), Mockito.any())) .thenReturn(generateError(0)); try { @@ -146,7 +146,7 @@ public void testDiskBalancerUpdateSubcommand() throws IOException { // Return error Mockito.when(scmClient.updateDiskBalancerConfiguration(Mockito.any(), - Mockito.any(), Mockito.any(), Mockito.any())) + Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any())) .thenReturn(generateError(10)); try { @@ -157,7 +157,7 @@ public void testDiskBalancerUpdateSubcommand() throws IOException { // Do not return error Mockito.when(scmClient.updateDiskBalancerConfiguration(Mockito.any(), - Mockito.any(), Mockito.any(), Mockito.any())) + Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any())) .thenReturn(generateError(0)); try { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java index c9083a0b336a..13d361ad2f37 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java @@ -21,7 +21,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; +import java.util.Arrays; import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -33,7 +37,11 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity; import org.apache.hadoop.hdds.scm.node.DatanodeInfo; import org.apache.hadoop.hdds.scm.node.DiskBalancerManager; +import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerService; +import org.apache.ozone.test.GenericTestUtils; +import org.apache.ozone.test.GenericTestUtils.LogCapturer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -55,6 +63,7 @@ public static void setup() throws Exception { ozoneConf = new OzoneConfiguration(); ozoneConf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, SCMContainerPlacementCapacity.class, PlacementPolicy.class); + ozoneConf.setTimeDuration("hdds.datanode.disk.balancer.service.interval", 3, TimeUnit.SECONDS); cluster = MiniOzoneCluster.newBuilder(ozoneConf).setNumDatanodes(3).build(); storageClient = new ContainerOperationClient(ozoneConf); cluster.waitForClusterToBeReady(); @@ -86,6 +95,40 @@ public void testDatanodeDiskBalancerReport() throws IOException { >= reportProtoList.get(1).getCurrentVolumeDensitySum()); } + @Test + public void testDiskBalancerStopAfterEven() throws IOException, + InterruptedException, TimeoutException { + //capture LOG for DiskBalancerManager and DiskBalancerService + LogCapturer logCapturer = LogCapturer.captureLogs(DiskBalancerManager.LOG); + LogCapturer dnLogCapturer = LogCapturer.captureLogs(DiskBalancerService.class); + + // Start DiskBalancer on all datanodes + diskBalancerManager.startDiskBalancer( + Optional.of(10.0), // threshold + Optional.of(10L), // bandwidth in MB + Optional.of(5), // parallel threads + Optional.of(true), // stopAfterDiskEven + Optional.empty()); // apply to all datanodes + + // verify logs for all DNs has started + String logs = logCapturer.getOutput(); + for (HddsDatanodeService dn : cluster.getHddsDatanodes()) { + String uuid = dn.getDatanodeDetails().getUuidString(); + assertTrue(logs.contains("Sending diskBalancerCommand: opType=START") && + logs.contains(uuid)); + } + + // Wait up to 5 seconds for all DNs to log the stop message + GenericTestUtils.waitFor(() -> { + String dnLogs = dnLogCapturer.getOutput(); + long count = Arrays.stream(dnLogs.split("\n")) + .filter(line -> line.contains("Disk balancer is stopped due to disk even as" + + " the property StopAfterDiskEven is set to true")) + .count(); + return count >= cluster.getHddsDatanodes().size(); + }, 100, 5000); // check every 100ms, timeout after 5s + } + @Test public void testDatanodeDiskBalancerStatus() throws IOException { // TODO: Test status command with datanodes in balancing