Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,7 @@ List<DatanodeAdminError> startDiskBalancer(
Optional<Double> threshold,
Optional<Long> bandwidthInMB,
Optional<Integer> parallelThread,
Optional<Boolean> stopAfterDiskEven,
Optional<List<String>> hosts) throws IOException;

/**
Expand All @@ -512,5 +513,6 @@ List<DatanodeAdminError> updateDiskBalancerConfiguration(
Optional<Double> threshold,
Optional<Long> bandwidth,
Optional<Integer> parallelThread,
Optional<Boolean> stopAfterDiskEven,
Optional<List<String>> hosts) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ List<DatanodeAdminError> startDiskBalancer(
Optional<Double> threshold,
Optional<Long> bandwidthInMB,
Optional<Integer> parallelThread,
Optional<Boolean> stopAfterDiskEven,
Optional<List<String>> hosts) throws IOException;

/**
Expand All @@ -523,5 +524,6 @@ List<DatanodeAdminError> updateDiskBalancerConfiguration(
Optional<Double> threshold,
Optional<Long> bandwidthInMB,
Optional<Integer> parallelThread,
Optional<Boolean> stopAfterDiskEven,
Optional<List<String>> hosts) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,20 @@ public final class DiskBalancerConfiguration {
"service.")
private Class<?> containerChoosingPolicyClass;

@Config(key = "stop.after.disk.even",
type = ConfigType.BOOLEAN,
defaultValue = "true",
tags = {ConfigTag.DISKBALANCER},
description = "If true, the DiskBalancer will automatically stop once disks are balanced.")
private boolean stopAfterDiskEven = true;

public DiskBalancerConfiguration(Optional<Double> threshold,
Optional<Long> bandwidthInMB,
Optional<Integer> parallelThread) {
Optional<Integer> parallelThread, Optional<Boolean> stopAfterDiskEven) {
threshold.ifPresent(aDouble -> this.threshold = aDouble);
bandwidthInMB.ifPresent(aLong -> this.diskBandwidthInMB = aLong);
parallelThread.ifPresent(integer -> this.parallelThread = integer);
stopAfterDiskEven.ifPresent(bool -> this.stopAfterDiskEven = bool);
}

public DiskBalancerConfiguration() {
Expand Down Expand Up @@ -157,6 +165,14 @@ public Class<?> getContainerChoosingPolicyClass() {
return containerChoosingPolicyClass;
}

public boolean isStopAfterDiskEven() {
return stopAfterDiskEven;
}

public void setStopAfterDiskEven(boolean stopAfterDiskEven) {
this.stopAfterDiskEven = stopAfterDiskEven;
}

/**
* Gets the threshold value for DiskBalancer.
*
Expand Down Expand Up @@ -234,10 +250,11 @@ public String toString() {
"%-50s %s%n" +
"%-50s %s%n" +
"%-50s %s%n" +
"%-50s %s%n" +
"%-50s %s%n",
"Key", "Value",
"Threshold", threshold, "Max disk bandwidth", diskBandwidthInMB,
"Parallel Thread", parallelThread);
"Parallel Thread", parallelThread, "Stop After Disk Even", stopAfterDiskEven);
}

public HddsProtos.DiskBalancerConfigurationProto.Builder toProtobufBuilder() {
Expand All @@ -246,7 +263,8 @@ public HddsProtos.DiskBalancerConfigurationProto.Builder toProtobufBuilder() {

builder.setThreshold(threshold)
.setDiskBandwidthInMB(diskBandwidthInMB)
.setParallelThread(parallelThread);
.setParallelThread(parallelThread)
.setStopAfterDiskEven(stopAfterDiskEven);
return builder;
}

Expand All @@ -262,6 +280,9 @@ public static DiskBalancerConfiguration fromProtobuf(
if (proto.hasParallelThread()) {
config.setParallelThread(proto.getParallelThread());
}
if (proto.hasStopAfterDiskEven()) {
config.setStopAfterDiskEven(proto.getStopAfterDiskEven());
}
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,35 +31,38 @@ public class DiskBalancerInfo {
private double threshold;
private long bandwidthInMB;
private int parallelThread;
private boolean stopAfterDiskEven;
private DiskBalancerVersion version;
private long successCount;
private long failureCount;
private long bytesToMove;
private long balancedBytes;

public DiskBalancerInfo(boolean shouldRun, double threshold,
long bandwidthInMB, int parallelThread) {
this(shouldRun, threshold, bandwidthInMB, parallelThread,
long bandwidthInMB, int parallelThread, boolean stopAfterDiskEven) {
this(shouldRun, threshold, bandwidthInMB, parallelThread, stopAfterDiskEven,
DiskBalancerVersion.DEFAULT_VERSION);
}

public DiskBalancerInfo(boolean shouldRun, double threshold,
long bandwidthInMB, int parallelThread, DiskBalancerVersion version) {
long bandwidthInMB, int parallelThread, boolean stopAfterDiskEven, DiskBalancerVersion version) {
this.shouldRun = shouldRun;
this.threshold = threshold;
this.bandwidthInMB = bandwidthInMB;
this.parallelThread = parallelThread;
this.stopAfterDiskEven = stopAfterDiskEven;
this.version = version;
}

@SuppressWarnings("checkstyle:ParameterNumber")
public DiskBalancerInfo(boolean shouldRun, double threshold,
long bandwidthInMB, int parallelThread, DiskBalancerVersion version,
long bandwidthInMB, int parallelThread, boolean stopAfterDiskEven, DiskBalancerVersion version,
long successCount, long failureCount, long bytesToMove, long balancedBytes) {
this.shouldRun = shouldRun;
this.threshold = threshold;
this.bandwidthInMB = bandwidthInMB;
this.parallelThread = parallelThread;
this.stopAfterDiskEven = stopAfterDiskEven;
this.version = version;
this.successCount = successCount;
this.failureCount = failureCount;
Expand All @@ -73,6 +76,7 @@ public DiskBalancerInfo(boolean shouldRun,
this.threshold = diskBalancerConf.getThreshold();
this.bandwidthInMB = diskBalancerConf.getDiskBandwidthInMB();
this.parallelThread = diskBalancerConf.getParallelThread();
this.stopAfterDiskEven = diskBalancerConf.isStopAfterDiskEven();
this.version = DiskBalancerVersion.DEFAULT_VERSION;
}

Expand All @@ -86,11 +90,14 @@ public void updateFromConf(DiskBalancerConfiguration diskBalancerConf) {
if (parallelThread != diskBalancerConf.getParallelThread()) {
setParallelThread(diskBalancerConf.getParallelThread());
}
if (stopAfterDiskEven != diskBalancerConf.isStopAfterDiskEven()) {
setStopAfterDiskEven(diskBalancerConf.isStopAfterDiskEven());
}
}

public StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto toDiskBalancerReportProto() {
DiskBalancerConfiguration conf = new DiskBalancerConfiguration(Optional.of(threshold),
Optional.of(bandwidthInMB), Optional.of(parallelThread));
Optional.of(bandwidthInMB), Optional.of(parallelThread), Optional.of(stopAfterDiskEven));
HddsProtos.DiskBalancerConfigurationProto confProto = conf.toProtobufBuilder().build();

StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto.Builder builder =
Expand Down Expand Up @@ -136,6 +143,14 @@ public void setParallelThread(int parallelThread) {
this.parallelThread = parallelThread;
}

public boolean isStopAfterDiskEven() {
return stopAfterDiskEven;
}

public void setStopAfterDiskEven(boolean stopAfterDiskEven) {
this.stopAfterDiskEven = stopAfterDiskEven;
}

public DiskBalancerVersion getVersion() {
return version;
}
Expand All @@ -157,12 +172,13 @@ public boolean equals(Object o) {
Double.compare(that.threshold, threshold) == 0 &&
bandwidthInMB == that.bandwidthInMB &&
parallelThread == that.parallelThread &&
stopAfterDiskEven == that.stopAfterDiskEven &&
version == that.version;
}

@Override
public int hashCode() {
return Objects.hash(shouldRun, threshold, bandwidthInMB, parallelThread,
return Objects.hash(shouldRun, threshold, bandwidthInMB, parallelThread, stopAfterDiskEven,
version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand All @@ -58,7 +59,6 @@
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
import org.apache.hadoop.util.Time;
import org.apache.ratis.util.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -82,6 +82,7 @@ public class DiskBalancerService extends BackgroundService {
private double threshold;
private long bandwidthInMB;
private int parallelThread;
private boolean stopAfterDiskEven;

private DiskBalancerVersion version;

Expand Down Expand Up @@ -161,8 +162,8 @@ private void constructTmpDir() throws IOException {
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())) {
Path tmpDir = getDiskBalancerTmpDir(volume);
try {
FileUtils.deleteFully(tmpDir);
FileUtils.createDirectories(tmpDir);
FileUtils.deleteDirectory(tmpDir.toFile());
FileUtils.forceMkdir(tmpDir.toFile());
} catch (IOException ex) {
LOG.warn("Can not reconstruct tmp directory under volume {}", volume,
ex);
Expand Down Expand Up @@ -206,6 +207,7 @@ private void applyDiskBalancerInfo(DiskBalancerInfo diskBalancerInfo)
setThreshold(diskBalancerInfo.getThreshold());
setBandwidthInMB(diskBalancerInfo.getBandwidthInMB());
setParallelThread(diskBalancerInfo.getParallelThread());
setStopAfterDiskEven(diskBalancerInfo.isStopAfterDiskEven());
setVersion(diskBalancerInfo.getVersion());

// Default executorService is ScheduledThreadPoolExecutor, so we can
Expand Down Expand Up @@ -295,6 +297,10 @@ public void setParallelThread(int parallelThread) {
this.parallelThread = parallelThread;
}

public void setStopAfterDiskEven(boolean stopAfterDiskEven) {
this.stopAfterDiskEven = stopAfterDiskEven;
}

public void setVersion(DiskBalancerVersion version) {
this.version = version;
}
Expand All @@ -309,6 +315,7 @@ public DiskBalancerReportProto getDiskBalancerReportProto() {
.setThreshold(threshold)
.setDiskBandwidthInMB(bandwidthInMB)
.setParallelThread(parallelThread)
.setStopAfterDiskEven(stopAfterDiskEven)
.build())
.build();
}
Expand Down Expand Up @@ -354,6 +361,18 @@ public BackgroundTaskQueue getTasks() {
}

if (queue.isEmpty()) {
bytesToMove = 0;
if (stopAfterDiskEven) {
LOG.info("Disk balancer is stopped due to disk even as" +
" the property StopAfterDiskEven is set to true.");
setShouldRun(false);
try {
// Persist the updated shouldRun status into the YAML file
writeDiskBalancerInfoTo(getDiskBalancerInfo(), diskBalancerInfoFile);
} catch (IOException e) {
LOG.warn("Failed to persist updated DiskBalancerInfo to file.", e);
}
}
metrics.incrIdleLoopNoAvailableVolumePairCount();
} else {
bytesToMove = calculateBytesToMove(volumeSet);
Expand Down Expand Up @@ -463,20 +482,23 @@ public BackgroundTaskResult call() {
totalBalancedBytes.addAndGet(containerSize);
} catch (IOException e) {
moveSucceeded = false;
LOG.warn("Failed to move container {}", containerId, e);
if (diskBalancerTmpDir != null) {
try {
Files.deleteIfExists(diskBalancerTmpDir);
File dir = new File(String.valueOf(diskBalancerTmpDir));
FileUtils.deleteDirectory(dir);
} catch (IOException ex) {
LOG.warn("Failed to delete tmp directory {}", diskBalancerTmpDir,
ex);
}
}
if (diskBalancerDestDir != null) {
try {
Files.deleteIfExists(diskBalancerDestDir);
File dir = new File(String.valueOf(diskBalancerDestDir));
FileUtils.deleteDirectory(dir);
} catch (IOException ex) {
LOG.warn("Failed to delete dest directory {}: {}.",
diskBalancerDestDir, ex.getMessage());
LOG.warn("Failed to delete dest directory {}",
diskBalancerDestDir, ex);
}
}
// Only need to check for destVolume, sourceVolume's usedSpace is
Expand Down Expand Up @@ -514,7 +536,7 @@ private void postCall() {

public DiskBalancerInfo getDiskBalancerInfo() {
return new DiskBalancerInfo(shouldRun, threshold, bandwidthInMB,
parallelThread, version, metrics.getSuccessCount(),
parallelThread, stopAfterDiskEven, version, metrics.getSuccessCount(),
metrics.getFailureCount(), bytesToMove, metrics.getSuccessBytes());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public static DiskBalancerInfo readDiskBalancerInfoFile(File path)
diskBalancerInfoYaml.getThreshold(),
diskBalancerInfoYaml.getBandwidthInMB(),
diskBalancerInfoYaml.getParallelThread(),
diskBalancerInfoYaml.isStopAfterDiskEven(),
DiskBalancerVersion.getDiskBalancerVersion(
diskBalancerInfoYaml.version));
}
Expand All @@ -94,6 +95,7 @@ public static class DiskBalancerInfoYaml {
private double threshold;
private long bandwidthInMB;
private int parallelThread;
private boolean stopAfterDiskEven;

private int version;

Expand All @@ -102,11 +104,12 @@ public DiskBalancerInfoYaml() {
}

private DiskBalancerInfoYaml(boolean shouldRun, double threshold,
long bandwidthInMB, int parallelThread, int version) {
long bandwidthInMB, int parallelThread, boolean stopAfterDiskEven, int version) {
this.shouldRun = shouldRun;
this.threshold = threshold;
this.bandwidthInMB = bandwidthInMB;
this.parallelThread = parallelThread;
this.stopAfterDiskEven = stopAfterDiskEven;
this.version = version;
}

Expand Down Expand Up @@ -142,6 +145,14 @@ public int getParallelThread() {
return this.parallelThread;
}

public boolean isStopAfterDiskEven() {
return stopAfterDiskEven;
}

public void setStopAfterDiskEven(boolean stopAfterDiskEven) {
this.stopAfterDiskEven = stopAfterDiskEven;
}

public void setVersion(int version) {
this.version = version;
}
Expand All @@ -159,6 +170,7 @@ private static DiskBalancerInfoYaml getDiskBalancerInfoYaml(
diskBalancerInfo.getThreshold(),
diskBalancerInfo.getBandwidthInMB(),
diskBalancerInfo.getParallelThread(),
diskBalancerInfo.isStopAfterDiskEven(),
diskBalancerInfo.getVersion().getVersion());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class TestHeartbeatEndpointTask {
public void setup() {
datanodeStateMachine = mock(DatanodeStateMachine.class);
container = mock(OzoneContainer.class);
when(container.getDiskBalancerInfo()).thenReturn(new DiskBalancerInfo(true, 10, 20, 30));
when(container.getDiskBalancerInfo()).thenReturn(new DiskBalancerInfo(true, 10, 20, 30, true));
when(datanodeStateMachine.getContainer()).thenReturn(container);
PipelineReportsProto pipelineReportsProto = mock(PipelineReportsProto.class);
when(pipelineReportsProto.getPipelineReportList()).thenReturn(Collections.emptyList());
Expand Down
Loading