Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -398,4 +398,45 @@ StatusAndMessages finalizeScmUpgrade(String upgradeClientID)
StatusAndMessages queryUpgradeFinalizationProgress(
String upgradeClientID, boolean force, boolean readonly)
throws IOException;

/**
* Get DiskBalancer status.
* @param count top datanodes that need balancing
* @return List of DatanodeDiskBalancerInfo.
* @throws IOException
*/
List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
int count) throws IOException;

/**
* Get DiskBalancer status.
* @param hosts If hosts is not null, return status of hosts; If hosts is
* null, return status of all datanodes in balancing.
* @return List of DatanodeDiskBalancerInfo.
* @throws IOException
*/
List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
Optional<List<String>> hosts) throws IOException;

/**
* Start DiskBalancer.
*/
void startDiskBalancer(
Optional<Double> threshold,
Optional<Double> bandwidth,
Optional<List<String>> hosts) throws IOException;

/**
* Stop DiskBalancer.
*/
void stopDiskBalancer(Optional<List<String>> hosts) throws IOException;


/**
* Update DiskBalancer Configuration.
*/
void updateDiskBalancerConfiguration(
Optional<Double> threshold,
Optional<Double> bandwidth,
Optional<List<String>> hosts) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -426,4 +426,34 @@ StatusAndMessages queryUpgradeFinalizationProgress(
Token<?> getContainerToken(ContainerID containerID) throws IOException;

long getContainerCount() throws IOException;

List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
int count, int clientVersion) throws IOException;

/**
* Get DiskBalancer status.
*/
List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
Optional<List<String>> hosts, int clientVersion) throws IOException;

/**
* Start DiskBalancer.
*/
void startDiskBalancer(
Optional<Double> threshold,
Optional<Double> bandwidth,
Optional<List<String>> hosts) throws IOException;

/**
* Stop DiskBalancer.
*/
void stopDiskBalancer(Optional<List<String>> hosts) throws IOException;

/**
* Update DiskBalancer Configuration.
*/
void updateDiskBalancerConfiguration(
Optional<Double> threshold,
Optional<Double> bandwidth,
Optional<List<String>> hosts) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm.storage;

import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigTag;
import org.apache.hadoop.hdds.conf.ConfigType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class contains configuration values for the DiskBalancer.
*/
@ConfigGroup(prefix = "hdds.datanode.disk.balancer")
public final class DiskBalancerConfiguration {
private static final Logger LOG =
LoggerFactory.getLogger(DiskBalancerConfiguration.class);

@Config(key = "volume.density.threshold", type = ConfigType.AUTO,
defaultValue = "10", tags = {ConfigTag.DISKBALANCER},
description = "Threshold is a percentage in the range of 0 to 100. A " +
"datanode is considered balanced if for each volume, the " +
"utilization of the volume(used space to capacity ratio) differs" +
" from the utilization of the datanode(used space to capacity ratio" +
" of the entire datanode) no more than the threshold.")
private double threshold = 10d;

@Config(key = "max.disk.throughputInMBPerSec", type = ConfigType.AUTO,
defaultValue = "10", tags = {ConfigTag.DISKBALANCER},
description = "The max balance speed.")
private double diskBandwidth = 10;

@Config(key = "parallel.thread", type = ConfigType.AUTO,
defaultValue = "5", tags = {ConfigTag.DISKBALANCER},
description = "The max parallel balance thread count.")
private int parallelThread = 5;

/**
* Gets the threshold value for DiskBalancer.
*
* @return percentage value in the range 0 to 100
*/
public double getThreshold() {
return threshold;
}

public double getThresholdAsRatio() {
return threshold / 100;
}

/**
* Sets the threshold value for Disk Balancer.
*
* @param threshold a percentage value in the range 0 to 100
*/
public void setThreshold(double threshold) {
if (threshold < 0d || threshold >= 100d) {
throw new IllegalArgumentException(
"Threshold must be a percentage(double) in the range 0 to 100.");
}
this.threshold = threshold;
}

/**
* Gets the disk bandwidth value for Disk Balancer.
*
* @return max disk bandwidth per second
*/
public double getDiskBandwidth() {
return diskBandwidth;
}

/**
* Sets the disk bandwidth value for Disk Balancer.
*
* @param diskBandwidth the bandwidth to control balance speed
*/
public void setDiskBandwidth(double diskBandwidth) {
if (diskBandwidth <= 0d) {
throw new IllegalArgumentException(
"diskBandwidth must be a value larger than 0.");
}
this.diskBandwidth = diskBandwidth;
}

/**
* Gets the parallel thread for Disk Balancer.
*
* @return parallel thread
*/
public int getParallelThread() {
return parallelThread;
}

/**
* Sets the parallel thread for Disk Balancer.
*
* @param parallelThread the parallel thread count
*/
public void setParallelThread(int parallelThread) {
if (parallelThread <= 0) {
throw new IllegalArgumentException(
"parallelThread must be a value larger than 0.");
}
this.parallelThread = parallelThread;
}
@Override
public String toString() {
return String.format("Disk Balancer Configuration values:%n" +
"%-50s %s%n" +
"%-50s %s%n" +
"%-50s %s%n" +
"%-50s %s%n",
"Key", "Value",
"Threshold", threshold, "Max disk bandwidth", diskBandwidth,
"Parallel Thread", parallelThread);
}

public HddsProtos.DiskBalancerConfigurationProto.Builder toProtobufBuilder() {
HddsProtos.DiskBalancerConfigurationProto.Builder builder =
HddsProtos.DiskBalancerConfigurationProto.newBuilder();

builder.setThreshold(threshold)
.setDiskBandwidth(diskBandwidth)
.setParallelThread(parallelThread);
return builder;
}

static DiskBalancerConfiguration fromProtobuf(
@NotNull HddsProtos.DiskBalancerConfigurationProto proto,
@NotNull OzoneConfiguration ozoneConfiguration) {
DiskBalancerConfiguration config =
ozoneConfiguration.getObject(DiskBalancerConfiguration.class);
if (proto.hasThreshold()) {
config.setThreshold(proto.getThreshold());
}
if (proto.hasDiskBandwidth()) {
config.setDiskBandwidth(proto.getDiskBandwidth());
}
if (proto.hasParallelThread()) {
config.setParallelThread(proto.getParallelThread());
}
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,6 @@ public enum ConfigTag {
DELETION,
HA,
BALANCER,
UPGRADE
UPGRADE,
DISKBALANCER
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeAdminErrorResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeUsageInfoRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeUsageInfoResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoType;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesResponseProto;
Expand Down Expand Up @@ -1018,6 +1021,59 @@ public long getContainerCount() throws IOException {
return response.getContainerCount();
}

@Override
public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
int count, int clientVersion) throws IOException {
DatanodeDiskBalancerInfoRequestProto request =
DatanodeDiskBalancerInfoRequestProto.newBuilder()
.setInfoType(DatanodeDiskBalancerInfoType.report)
.setCount(count)
.build();

DatanodeDiskBalancerInfoResponseProto response =
submitRequest(Type.DatanodeDiskBalancerInfo,
builder -> builder.setDatanodeDiskBalancerInfoRequest(request))
.getDatanodeDiskBalancerInfoResponse();

return response.getInfoList();
}

@Override
public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
Optional<List<String>> hosts, int clientVersion) throws IOException {
DatanodeDiskBalancerInfoRequestProto.Builder requestBuilder =
DatanodeDiskBalancerInfoRequestProto.newBuilder()
.setInfoType(DatanodeDiskBalancerInfoType.status);
hosts.ifPresent(requestBuilder::addAllHosts);
DatanodeDiskBalancerInfoRequestProto request = requestBuilder.build();

DatanodeDiskBalancerInfoResponseProto response =
submitRequest(Type.DatanodeDiskBalancerInfo,
builder -> builder.setDatanodeDiskBalancerInfoRequest(request))
.getDatanodeDiskBalancerInfoResponse();

return response.getInfoList();
}

@Override
public void startDiskBalancer(Optional<Double> threshold,
Optional<Double> bandwidth, Optional<List<String>> hosts)
throws IOException {

}

@Override
public void stopDiskBalancer(Optional<List<String>> hosts)
throws IOException {

}

@Override
public void updateDiskBalancerConfiguration(Optional<Double> threshold,
Optional<Double> bandwidth, Optional<List<String>> hosts)
throws IOException {
}

@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;
Expand Down
21 changes: 21 additions & 0 deletions hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ message ScmContainerLocationRequest {
optional GetContainerReplicasRequestProto getContainerReplicasRequest = 39;
optional ReplicationManagerReportRequestProto replicationManagerReportRequest = 40;
optional ResetDeletedBlockRetryCountRequestProto resetDeletedBlockRetryCountRequest = 41;
optional DatanodeDiskBalancerInfoRequestProto DatanodeDiskBalancerInfoRequest = 42;
}

message ScmContainerLocationResponse {
Expand Down Expand Up @@ -127,6 +128,7 @@ message ScmContainerLocationResponse {
optional GetContainerReplicasResponseProto getContainerReplicasResponse = 39;
optional ReplicationManagerReportResponseProto getReplicationManagerReportResponse = 40;
optional ResetDeletedBlockRetryCountResponseProto resetDeletedBlockRetryCountResponse = 41;
optional DatanodeDiskBalancerInfoResponseProto DatanodeDiskBalancerInfoResponse = 42;

enum Status {
OK = 1;
Expand Down Expand Up @@ -174,6 +176,7 @@ enum Type {
GetContainerReplicas = 34;
GetReplicationManagerReport = 35;
ResetDeletedBlockRetryCount = 36;
DatanodeDiskBalancerInfo= 37;
}

/**
Expand Down Expand Up @@ -325,6 +328,24 @@ message DatanodeUsageInfoResponseProto {
repeated DatanodeUsageInfoProto info = 1;
}

enum DatanodeDiskBalancerInfoType{
report = 1;
status = 2;
}

/*
Datanode disk balancer status request message.
*/
message DatanodeDiskBalancerInfoRequestProto {
required DatanodeDiskBalancerInfoType infoType = 1;
optional uint32 count = 2;
repeated string hosts = 3;
}

message DatanodeDiskBalancerInfoResponseProto {
repeated DatanodeDiskBalancerInfoProto info = 1;
}

/*
Decommission a list of hosts
*/
Expand Down
13 changes: 13 additions & 0 deletions hadoop-hdds/interface-client/src/main/proto/hdds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -451,3 +451,16 @@ message ContainerBalancerConfigurationProto {
required bool shouldRun = 18;
optional int32 nextIterationIndex = 19;
}

message DiskBalancerConfigurationProto {
optional double threshold = 1;
optional double diskBandwidth = 2;
optional int32 parallelThread = 3;
}

message DatanodeDiskBalancerInfoProto {
required DatanodeDetailsProto node = 1;
required double currentVolumeDensitySum = 2;
optional bool diskBalancerRunning = 3;
optional DiskBalancerConfigurationProto diskBalancerConf = 4;
}
Loading