Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,34 @@ public class DeadNodeHandler implements EventHandler<DatanodeDetails> {
private final ContainerManager containerManager;
@Nullable
private final DeletedBlockLog deletedBlockLog;
@Nullable
private final DiskBalancerManager diskBalancerManager;

private static final Logger LOG =
LoggerFactory.getLogger(DeadNodeHandler.class);

public DeadNodeHandler(final NodeManager nodeManager,
final PipelineManager pipelineManager,
final ContainerManager containerManager) {
this(nodeManager, pipelineManager, containerManager, null);
this(nodeManager, pipelineManager, containerManager, null, null);
}

public DeadNodeHandler(final NodeManager nodeManager,
final PipelineManager pipelineManager,
final ContainerManager containerManager,
final DiskBalancerManager diskBalancerManager) {
this(nodeManager, pipelineManager, containerManager, diskBalancerManager, null);
}

public DeadNodeHandler(final NodeManager nodeManager,
final PipelineManager pipelineManager,
final ContainerManager containerManager,
final DiskBalancerManager diskBalancerManager,
@Nullable final DeletedBlockLog deletedBlockLog) {
this.nodeManager = nodeManager;
this.pipelineManager = pipelineManager;
this.containerManager = containerManager;
this.diskBalancerManager = diskBalancerManager;
this.deletedBlockLog = deletedBlockLog;
}

Expand Down Expand Up @@ -120,6 +131,12 @@ public void onMessage(final DatanodeDetails datanodeDetails,
nodeManager.getNode(datanodeDetails.getID())
.getParent() == null);
}

// Mark DiskBalancer status as UNKNOWN for the dead datanode
if (diskBalancerManager != null) {
LOG.info("Marking DiskBalancer status UNKNOWN for dead DN {}", datanodeDetails.getUuidString());
diskBalancerManager.markStatusUnknown(datanodeDetails);
}
} catch (NodeNotFoundException ex) {
// This should not happen, we cannot get a dead node event for an
// unregistered datanode!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
.map(dn -> getInfoProto(dn, clientVersion))
.collect(Collectors.toList());
} else {
return nodeManager.getAllNodes().stream()
return nodeManager.getNodes(IN_SERVICE,
HddsProtos.NodeState.HEALTHY).stream()
.filter(dn -> shouldReturnDatanode(filterStatus, dn))
.map(dn -> getInfoProto((DatanodeInfo)dn, clientVersion))
.collect(Collectors.toList());
Expand Down Expand Up @@ -302,7 +303,7 @@ private double getVolumeDataDensitySumForDatanodeDetails(
return volumeDensitySum;
}

private DiskBalancerStatus getStatus(DatanodeDetails datanodeDetails) {
public DiskBalancerStatus getStatus(DatanodeDetails datanodeDetails) {
return statusMap.computeIfAbsent(datanodeDetails,
dn -> new DiskBalancerStatus(DiskBalancerRunningStatus.UNKNOWN, new DiskBalancerConfiguration(), 0, 0, 0, 0));
}
Expand Down Expand Up @@ -333,6 +334,16 @@ public void processDiskBalancerReport(DiskBalancerReportProto reportProto,
}
}

public void markStatusUnknown(DatanodeDetails dn) {
DiskBalancerStatus currentStatus = statusMap.get(dn);
if (currentStatus != null &&
currentStatus.getRunningStatus() != DiskBalancerRunningStatus.UNKNOWN) {
DiskBalancerStatus unknownStatus = new DiskBalancerStatus(DiskBalancerRunningStatus.UNKNOWN,
new DiskBalancerConfiguration(), 0, 0, 0, 0);
statusMap.put(dn, unknownStatus);
}
}

private DiskBalancerConfiguration attachDiskBalancerConf(
DatanodeDetails dn, Optional<Double> threshold,
Optional<Long> bandwidthInMB, Optional<Integer> parallelThread) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ private void initializeEventHandlers() {
StaleNodeHandler staleNodeHandler =
new StaleNodeHandler(scmNodeManager, pipelineManager);
DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager,
pipelineManager, containerManager);
pipelineManager, containerManager, diskBalancerManager);
StartDatanodeAdminHandler datanodeStartAdminHandler =
new StartDatanodeAdminHandler(scmNodeManager, pipelineManager);
ReadOnlyHealthyToHealthyNodeHandler readOnlyHealthyToHealthyNodeHandler =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class TestDeadNodeHandler {
private File storageDir;
private SCMContext scmContext;
private DeletedBlockLog deletedBlockLog;
private DiskBalancerManager diskBalancerManager;

@BeforeEach
public void setup() throws IOException, AuthenticationException {
Expand All @@ -114,8 +115,10 @@ public void setup() throws IOException, AuthenticationException {
mockRatisProvider);
containerManager = scm.getContainerManager();
deletedBlockLog = mock(DeletedBlockLog.class);
diskBalancerManager = new DiskBalancerManager(conf, new EventQueue(),
SCMContext.emptyContext(), null);
deadNodeHandler = new DeadNodeHandler(nodeManager,
mock(PipelineManager.class), containerManager, deletedBlockLog);
mock(PipelineManager.class), containerManager, diskBalancerManager, deletedBlockLog);
healthyReadOnlyNodeHandler =
new HealthyReadOnlyNodeHandler(nodeManager,
pipelineManager);
Expand Down Expand Up @@ -217,6 +220,11 @@ public void testOnMessage(@TempDir File tempDir) throws Exception {
HddsTestUtils.quasiCloseContainer(containerManager,
container3.containerID());

//starting diskBalancer on all Datanodes
diskBalancerManager.addRunningDatanode(datanode1);
diskBalancerManager.addRunningDatanode(datanode2);
diskBalancerManager.addRunningDatanode(datanode3);

// First set the node to IN_MAINTENANCE and ensure the container replicas
// are not removed on the dead event
datanode1 = nodeManager.getNode(datanode1.getID());
Expand All @@ -233,6 +241,15 @@ public void testOnMessage(@TempDir File tempDir) throws Exception {
verify(deletedBlockLog, times(0))
.onDatanodeDead(datanode1.getUuid());

// Verify DiskBalancer status is marked UNKNOWN for the dead datanode
assertEquals(HddsProtos.DiskBalancerRunningStatus.UNKNOWN,
diskBalancerManager.getStatus(datanode1).getRunningStatus());
// Verify DiskBalancer status remains unchanged for other datanodes
assertEquals(HddsProtos.DiskBalancerRunningStatus.RUNNING,
diskBalancerManager.getStatus(datanode2).getRunningStatus());
assertEquals(HddsProtos.DiskBalancerRunningStatus.RUNNING,
diskBalancerManager.getStatus(datanode3).getRunningStatus());

Set<ContainerReplica> container1Replicas = containerManager
.getContainerReplicas(ContainerID.valueOf(container1.getContainerID()));
assertEquals(2, container1Replicas.size());
Expand Down