From 146915bd285566847e96c73b77f6b866a53aed28 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Thu, 15 May 2025 10:00:43 -0700 Subject: [PATCH 1/2] HDDS-13051. Use DatanodeID in server-scm. --- .../hdds/scm/block/DeletedBlockLogImpl.java | 35 +++--- ...MDeletedBlockTransactionStatusManager.java | 103 ++++++++---------- .../balancer/AbstractFindTargetGreedy.java | 8 +- .../container/balancer/FindSourceGreedy.java | 8 +- .../hdds/scm/node/DatanodeUsageInfo.java | 5 + .../scm/pipeline/PipelineManagerImpl.java | 27 ++--- .../scm/safemode/DataNodeSafeModeRule.java | 7 +- .../SCMDatanodeHeartbeatDispatcher.java | 6 +- .../hdds/scm/block/TestDeletedBlockLog.java | 40 +++---- ...stSCMDeleteBlocksCommandStatusManager.java | 21 ++-- .../scm/pipeline/TestPipelineManagerImpl.java | 23 ++-- 11 files changed, 123 insertions(+), 160 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java index 8c39de41d145..6a54f0a9b790 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -326,24 +325,20 @@ public void addTransactions(Map> containerBlocksMap) } @Override - public void close() throws IOException { + public void close() { } private void getTransaction(DeletedBlocksTransaction tx, DatanodeDeletedBlockTransactions transactions, Set dnList, Set replicas, - Map> commandStatus) { - DeletedBlocksTransaction updatedTxn = - DeletedBlocksTransaction.newBuilder(tx) - .setCount(transactionStatusManager.getOrDefaultRetryCount( - tx.getTxID(), 0)) - .build(); - + Map> commandStatus) { for (ContainerReplica replica : replicas) { - DatanodeDetails details = replica.getDatanodeDetails(); - if (!transactionStatusManager.isDuplication( - details, updatedTxn.getTxID(), commandStatus)) { - transactions.addTransactionToDN(details.getID(), updatedTxn); + final DatanodeID datanodeID = replica.getDatanodeDetails().getID(); + if (!transactionStatusManager.isDuplication(datanodeID, tx.getTxID(), commandStatus)) { + final DeletedBlocksTransaction updatedTxn = DeletedBlocksTransaction.newBuilder(tx) + .setCount(transactionStatusManager.getRetryCount(tx.getTxID())) + .build(); + transactions.addTransactionToDN(datanodeID, updatedTxn); metrics.incrProcessedTransaction(); } } @@ -373,7 +368,7 @@ private Boolean checkInadequateReplica(Set replicas, if (!dnList.contains(datanodeDetails)) { DatanodeDetails dnDetail = replica.getDatanodeDetails(); LOG.debug("Skip Container = {}, because DN = {} is not in dnList.", - containerId, dnDetail.getUuid()); + containerId, dnDetail); return true; } } @@ -426,10 +421,10 @@ public DatanodeDeletedBlockTransactions getTransactions( // Get the CmdStatus status of the aggregation, so that the current // status of the specified transaction can be found faster - Map> commandStatus = + final Map> commandStatus = getSCMDeletedBlockTransactionStatusManager() .getCommandStatusByTxId(dnList.stream(). - map(DatanodeDetails::getUuid).collect(Collectors.toSet())); + map(DatanodeDetails::getID).collect(Collectors.toSet())); ArrayList txIDs = new ArrayList<>(); metrics.setNumBlockDeletionTransactionDataNodes(dnList.size()); Table.KeyValue keyValue = null; @@ -510,7 +505,7 @@ public void setScmCommandTimeoutMs(long scmCommandTimeoutMs) { public void recordTransactionCreated(DatanodeID dnId, long scmCmdId, Set dnTxSet) { getSCMDeletedBlockTransactionStatusManager() - .recordTransactionCreated(dnId.getUuid(), scmCmdId, dnTxSet); + .recordTransactionCreated(dnId, scmCmdId, dnTxSet); } @Override @@ -520,7 +515,7 @@ public int getTransactionToDNsCommitMapSize() { @Override public void onDatanodeDead(DatanodeID dnId) { - getSCMDeletedBlockTransactionStatusManager().onDatanodeDead(dnId.getUuid()); + getSCMDeletedBlockTransactionStatusManager().onDatanodeDead(dnId); } @Override @@ -546,7 +541,7 @@ public void onMessage( ContainerBlocksDeletionACKProto ackProto = commandStatus.getBlockDeletionAck(); getSCMDeletedBlockTransactionStatusManager() - .commitTransactions(ackProto.getResultsList(), dnId.getUuid()); + .commitTransactions(ackProto.getResultsList(), dnId); metrics.incrBlockDeletionCommandSuccess(); metrics.incrDNCommandsSuccess(dnId, 1); } else if (status == CommandStatus.Status.FAILED) { @@ -558,7 +553,7 @@ public void onMessage( } getSCMDeletedBlockTransactionStatusManager() - .commitSCMCommandStatus(deleteBlockStatus.getCmdStatus(), dnId.getUuid()); + .commitSCMCommandStatus(deleteBlockStatus.getCmdStatus(), dnId); } finally { lock.unlock(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java index 4be3a1a57461..f3ca531cc2ad 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java @@ -27,7 +27,6 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -35,10 +34,10 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.DatanodeID; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -58,7 +57,7 @@ public class SCMDeletedBlockTransactionStatusManager { private static final Logger LOG = LoggerFactory.getLogger(SCMDeletedBlockTransactionStatusManager.class); // Maps txId to set of DNs which are successful in committing the transaction - private final Map> transactionToDNsCommitMap; + private final Map> transactionToDNsCommitMap; // Maps txId to its retry counts; private final Map transactionToRetryCountMap; // The access to DeletedBlocksTXTable is protected by @@ -100,11 +99,10 @@ public SCMDeletedBlockTransactionStatusManager( protected static class SCMDeleteBlocksCommandStatusManager { private static final Logger LOG = LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class); - private final Map> scmCmdStatusRecord; + private final Map> scmCmdStatusRecord; private static final CmdStatus DEFAULT_STATUS = TO_BE_SENT; - private static final Set STATUSES_REQUIRING_TIMEOUT = - new HashSet<>(Arrays.asList(SENT)); + private static final Set STATUSES_REQUIRING_TIMEOUT = Collections.singleton(SENT); public SCMDeleteBlocksCommandStatusManager() { this.scmCmdStatusRecord = new ConcurrentHashMap<>(); @@ -128,14 +126,13 @@ public enum CmdStatus { } protected static final class CmdStatusData { - private final UUID dnId; + private final DatanodeID dnId; private final long scmCmdId; private final Set deletedBlocksTxIds; private Instant updateTime; private CmdStatus status; - private CmdStatusData( - UUID dnId, long scmTxID, Set deletedBlocksTxIds) { + private CmdStatusData(DatanodeID dnId, long scmTxID, Set deletedBlocksTxIds) { this.dnId = dnId; this.scmCmdId = scmTxID; this.deletedBlocksTxIds = deletedBlocksTxIds; @@ -146,7 +143,7 @@ public Set getDeletedBlocksTxIds() { return Collections.unmodifiableSet(deletedBlocksTxIds); } - public UUID getDnId() { + DatanodeID getDnId() { return dnId; } @@ -180,7 +177,7 @@ public String toString() { } protected static CmdStatusData createScmCmdStatusData( - UUID dnId, long scmCmdId, Set deletedBlocksTxIds) { + DatanodeID dnId, long scmCmdId, Set deletedBlocksTxIds) { return new CmdStatusData(dnId, scmCmdId, deletedBlocksTxIds); } @@ -190,22 +187,22 @@ protected void recordScmCommand(CmdStatusData statusData) { new ConcurrentHashMap<>()).put(statusData.getScmCmdId(), statusData); } - protected void onSent(UUID dnId, long scmCmdId) { + void onSent(DatanodeID dnId, long scmCmdId) { updateStatus(dnId, scmCmdId, CommandStatus.Status.PENDING); } - protected void onDatanodeDead(UUID dnId) { + void onDatanodeDead(DatanodeID dnId) { LOG.info("Clean SCMCommand record for Datanode: {}", dnId); scmCmdStatusRecord.remove(dnId); } - protected void updateStatusByDNCommandStatus(UUID dnId, long scmCmdId, + void updateStatusByDNCommandStatus(DatanodeID dnId, long scmCmdId, CommandStatus.Status newState) { updateStatus(dnId, scmCmdId, newState); } protected void cleanAllTimeoutSCMCommand(long timeoutMs) { - for (UUID dnId : scmCmdStatusRecord.keySet()) { + for (DatanodeID dnId : scmCmdStatusRecord.keySet()) { for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) { removeTimeoutScmCommand( dnId, getScmCommandIds(dnId, status), timeoutMs); @@ -213,14 +210,14 @@ protected void cleanAllTimeoutSCMCommand(long timeoutMs) { } } - public void cleanTimeoutSCMCommand(UUID dnId, long timeoutMs) { + void cleanTimeoutSCMCommand(DatanodeID dnId, long timeoutMs) { for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) { removeTimeoutScmCommand( dnId, getScmCommandIds(dnId, status), timeoutMs); } } - private Set getScmCommandIds(UUID dnId, CmdStatus status) { + private Set getScmCommandIds(DatanodeID dnId, CmdStatus status) { Set scmCmdIds = new HashSet<>(); Map record = scmCmdStatusRecord.get(dnId); if (record == null) { @@ -234,7 +231,7 @@ private Set getScmCommandIds(UUID dnId, CmdStatus status) { return scmCmdIds; } - private Instant getUpdateTime(UUID dnId, long scmCmdId) { + private Instant getUpdateTime(DatanodeID dnId, long scmCmdId) { Map record = scmCmdStatusRecord.get(dnId); if (record == null || record.get(scmCmdId) == null) { return null; @@ -242,7 +239,7 @@ private Instant getUpdateTime(UUID dnId, long scmCmdId) { return record.get(scmCmdId).getUpdateTime(); } - private void updateStatus(UUID dnId, long scmCmdId, + private void updateStatus(DatanodeID dnId, long scmCmdId, CommandStatus.Status newStatus) { Map recordForDn = scmCmdStatusRecord.get(dnId); if (recordForDn == null) { @@ -309,7 +306,7 @@ private void updateStatus(UUID dnId, long scmCmdId, } } - private void removeTimeoutScmCommand(UUID dnId, + private void removeTimeoutScmCommand(DatanodeID dnId, Set scmCmdIds, long timeoutMs) { Instant now = Instant.now(); for (Long scmCmdId : scmCmdIds) { @@ -323,7 +320,7 @@ private void removeTimeoutScmCommand(UUID dnId, } } - private CmdStatusData removeScmCommand(UUID dnId, long scmCmdId) { + private CmdStatusData removeScmCommand(DatanodeID dnId, long scmCmdId) { Map record = scmCmdStatusRecord.get(dnId); if (record == null || record.get(scmCmdId) == null) { return null; @@ -333,12 +330,10 @@ private CmdStatusData removeScmCommand(UUID dnId, long scmCmdId) { return statusData; } - public Map> getCommandStatusByTxId( - Set dnIds) { - Map> result = - new HashMap<>(scmCmdStatusRecord.size()); + Map> getCommandStatusByTxId(Set dnIds) { + final Map> result = new HashMap<>(scmCmdStatusRecord.size()); - for (UUID dnId : dnIds) { + for (DatanodeID dnId : dnIds) { Map record = scmCmdStatusRecord.get(dnId); if (record == null) { continue; @@ -361,7 +356,7 @@ private void clear() { } @VisibleForTesting - Map> getScmCmdStatusRecord() { + Map> getScmCmdStatusRecord() { return scmCmdStatusRecord; } } @@ -395,22 +390,19 @@ public void resetRetryCount(List txIDs) throws IOException { } } - public int getOrDefaultRetryCount(long txID, int defaultValue) { - return transactionToRetryCountMap.getOrDefault(txID, defaultValue); + int getRetryCount(long txID) { + return transactionToRetryCountMap.getOrDefault(txID, 0); } public void onSent(DatanodeDetails dnId, SCMCommand scmCommand) { - scmDeleteBlocksCommandStatusManager.onSent( - dnId.getUuid(), scmCommand.getId()); + scmDeleteBlocksCommandStatusManager.onSent(dnId.getID(), scmCommand.getId()); } - public Map> getCommandStatusByTxId( - Set dnIds) { + Map> getCommandStatusByTxId(Set dnIds) { return scmDeleteBlocksCommandStatusManager.getCommandStatusByTxId(dnIds); } - public void recordTransactionCreated( - UUID dnId, long scmCmdId, Set dnTxSet) { + void recordTransactionCreated(DatanodeID dnId, long scmCmdId, Set dnTxSet) { scmDeleteBlocksCommandStatusManager.recordScmCommand( SCMDeleteBlocksCommandStatusManager .createScmCmdStatusData(dnId, scmCmdId, dnTxSet)); @@ -428,21 +420,19 @@ public void cleanAllTimeoutSCMCommand(long timeoutMs) { scmDeleteBlocksCommandStatusManager.cleanAllTimeoutSCMCommand(timeoutMs); } - public void onDatanodeDead(UUID dnId) { + void onDatanodeDead(DatanodeID dnId) { scmDeleteBlocksCommandStatusManager.onDatanodeDead(dnId); } - public boolean isDuplication(DatanodeDetails dnDetail, long tx, - Map> commandStatus) { - if (alreadyExecuted(dnDetail.getUuid(), tx)) { + boolean isDuplication(DatanodeID datanodeID, long tx, Map> commandStatus) { + if (alreadyExecuted(datanodeID, tx)) { return true; } - return inProcessing(dnDetail.getUuid(), tx, commandStatus); + return inProcessing(datanodeID, tx, commandStatus); } - public boolean alreadyExecuted(UUID dnId, long txId) { - Set dnsWithTransactionCommitted = - transactionToDNsCommitMap.get(txId); + private boolean alreadyExecuted(DatanodeID dnId, long txId) { + final Set dnsWithTransactionCommitted = transactionToDNsCommitMap.get(txId); return dnsWithTransactionCommitted != null && dnsWithTransactionCommitted .contains(dnId); } @@ -455,11 +445,9 @@ public boolean alreadyExecuted(UUID dnId, long txId) { * @param dnId - ID of datanode which acknowledges the delete block command. */ @VisibleForTesting - public void commitTransactions( - List transactionResults, UUID dnId) { + public void commitTransactions(List transactionResults, DatanodeID dnId) { ArrayList txIDsToBeDeleted = new ArrayList<>(); - Set dnsWithCommittedTxn; for (DeleteBlockTransactionResult transactionResult : transactionResults) { if (isTransactionFailed(transactionResult)) { @@ -470,7 +458,7 @@ public void commitTransactions( metrics.incrBlockDeletionTransactionSuccess(); long txID = transactionResult.getTxID(); // set of dns which have successfully committed transaction txId. - dnsWithCommittedTxn = transactionToDNsCommitMap.get(txID); + final Set dnsWithCommittedTxn = transactionToDNsCommitMap.get(txID); final ContainerID containerId = ContainerID.valueOf( transactionResult.getContainerID()); if (dnsWithCommittedTxn == null) { @@ -494,9 +482,9 @@ public void commitTransactions( // the nodes returned in the pipeline match the replication factor. if (min(replicas.size(), dnsWithCommittedTxn.size()) >= container.getReplicationConfig().getRequiredNodes()) { - List containerDns = replicas.stream() + final List containerDns = replicas.stream() .map(ContainerReplica::getDatanodeDetails) - .map(DatanodeDetails::getUuid) + .map(DatanodeDetails::getID) .collect(Collectors.toList()); if (dnsWithCommittedTxn.containsAll(containerDns)) { transactionToDNsCommitMap.remove(txID); @@ -526,34 +514,29 @@ public void commitTransactions( } @VisibleForTesting - public void commitSCMCommandStatus(List deleteBlockStatus, - UUID dnId) { + void commitSCMCommandStatus(List deleteBlockStatus, DatanodeID dnId) { processSCMCommandStatus(deleteBlockStatus, dnId); scmDeleteBlocksCommandStatusManager. cleanTimeoutSCMCommand(dnId, scmCommandTimeoutMs); } - private boolean inProcessing(UUID dnId, long deletedBlocksTxId, - Map> commandStatus) { + static boolean inProcessing(DatanodeID dnId, long deletedBlocksTxId, + Map> commandStatus) { Map deletedBlocksTxStatus = commandStatus.get(dnId); return deletedBlocksTxStatus != null && deletedBlocksTxStatus.get(deletedBlocksTxId) != null; } - private void processSCMCommandStatus(List deleteBlockStatus, - UUID dnID) { - Map lastStatus = new HashMap<>(); + private void processSCMCommandStatus(List deleteBlockStatus, DatanodeID dnID) { Map summary = new HashMap<>(); - // The CommandStatus is ordered in the report. So we can focus only on the // last status in the command report. deleteBlockStatus.forEach(cmdStatus -> { - lastStatus.put(cmdStatus.getCmdId(), cmdStatus); summary.put(cmdStatus.getCmdId(), cmdStatus.getStatus()); }); LOG.debug("CommandStatus {} from Datanode: {} ", summary, dnID); - for (Map.Entry entry : lastStatus.entrySet()) { - CommandStatus.Status status = entry.getValue().getStatus(); + for (Map.Entry entry : summary.entrySet()) { + final CommandStatus.Status status = entry.getValue(); scmDeleteBlocksCommandStatusManager.updateStatusByDNCommandStatus( dnID, entry.getKey(), status); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/AbstractFindTargetGreedy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/AbstractFindTargetGreedy.java index dad5ce8534ca..0735f6946a66 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/AbstractFindTargetGreedy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/AbstractFindTargetGreedy.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -81,9 +80,7 @@ protected int compareByUsage(DatanodeUsageInfo a, DatanodeUsageInfo b) { if (ret != 0) { return ret; } - UUID uuidA = a.getDatanodeDetails().getUuid(); - UUID uuidB = b.getDatanodeDetails().getUuid(); - return uuidA.compareTo(uuidB); + return a.getDatanodeID().compareTo(b.getDatanodeID()); } private void setConfiguration(ContainerBalancerConfiguration conf) { @@ -228,8 +225,7 @@ public void increaseSizeEntering(DatanodeDetails target, long size) { } return; } - logger.warn("Cannot find {} in the candidates target nodes", - target.getUuid()); + logger.warn("Cannot find {} in the candidates target nodes", target); } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceGreedy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceGreedy.java index 6388684596eb..b641936bd89b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceGreedy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/balancer/FindSourceGreedy.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Map; import java.util.PriorityQueue; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; @@ -56,9 +55,7 @@ public class FindSourceGreedy implements FindSourceStrategy { if (ret != 0) { return ret; } - UUID uuidA = a.getDatanodeDetails().getUuid(); - UUID uuidB = b.getDatanodeDetails().getUuid(); - return uuidA.compareTo(uuidB); + return a.getDatanodeID().compareTo(b.getDatanodeID()); }); this.nodeManager = nodeManager; } @@ -110,8 +107,7 @@ public void increaseSizeLeaving(DatanodeDetails dui, long size) { addBackSourceDataNode(dui); return; } - LOG.warn("Cannot find datanode {} in candidate source datanodes", - dui.getUuid()); + LOG.warn("Cannot find datanode {} in candidate source datanodes", dui); } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeUsageInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeUsageInfo.java index 03fbba3041ff..f24a56c111aa 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeUsageInfo.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeUsageInfo.java @@ -19,6 +19,7 @@ import java.util.Comparator; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.DatanodeID; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeUsageInfoProto; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; @@ -128,6 +129,10 @@ public DatanodeDetails getDatanodeDetails() { return datanodeDetails; } + public DatanodeID getDatanodeID() { + return datanodeDetails.getID(); + } + /** * Gets SCMNodeStat of this DatanodeUsageInfo. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java index 56a29536b409..925833597337 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java @@ -551,16 +551,15 @@ public void closeStalePipelines(DatanodeDetails datanodeDetails) { @VisibleForTesting List getStalePipelines(DatanodeDetails datanodeDetails) { - List pipelines = getPipelines(); - return pipelines.stream() - .filter(p -> p.getNodes().stream() - .anyMatch(n -> n.getUuid() - .equals(datanodeDetails.getUuid()) - && (!n.getIpAddress() - .equals(datanodeDetails.getIpAddress()) - || !n.getHostName() - .equals(datanodeDetails.getHostName())))) - .collect(Collectors.toList()); + return getPipelines().stream() + .filter(p -> p.getNodes().stream().anyMatch(n -> sameIdDifferentHostOrAddress(n, datanodeDetails))) + .collect(Collectors.toList()); + } + + static boolean sameIdDifferentHostOrAddress(DatanodeDetails left, DatanodeDetails right) { + return left.getID().equals(right.getID()) + && (!left.getIpAddress().equals(right.getIpAddress()) + || !left.getHostName().equals(right.getHostName())); } /** @@ -904,12 +903,8 @@ private void recordMetricsForPipeline(Pipeline pipeline) { metrics.incNumPipelineContainSameDatanodes(); //TODO remove until pipeline allocation is proved equally distributed. for (Pipeline overlapPipeline : overlapPipelines) { - LOG.info("Pipeline: " + pipeline.getId().toString() + - " contains same datanodes as previous pipelines: " + - overlapPipeline.getId().toString() + " nodeIds: " + - pipeline.getNodes().get(0).getUuid().toString() + - ", " + pipeline.getNodes().get(1).getUuid().toString() + - ", " + pipeline.getNodes().get(2).getUuid().toString()); + LOG.info("{} and {} have exactly the same datanodes: {}", + pipeline.getId(), overlapPipeline.getId(), pipeline.getNodeSet()); } } return; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/DataNodeSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/DataNodeSafeModeRule.java index cb487ce8a3f4..1389b14ee31b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/DataNodeSafeModeRule.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/DataNodeSafeModeRule.java @@ -18,9 +18,10 @@ package org.apache.hadoop.hdds.scm.safemode; import java.util.HashSet; -import java.util.UUID; +import java.util.Set; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.DatanodeID; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; @@ -41,7 +42,7 @@ public class DataNodeSafeModeRule extends private int requiredDns; private int registeredDns = 0; // Set to track registered DataNodes. - private HashSet registeredDnSet; + private final Set registeredDnSet; private NodeManager nodeManager; public DataNodeSafeModeRule(EventQueue eventQueue, @@ -72,7 +73,7 @@ protected boolean validate() { @Override protected void process(NodeRegistrationContainerReport reportsProto) { - registeredDnSet.add(reportsProto.getDatanodeDetails().getUuid()); + registeredDnSet.add(reportsProto.getDatanodeDetails().getID()); registeredDns = registeredDnSet.size(); if (scmInSafeMode()) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java index dc79a21f8f99..40d60d0a9711 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java @@ -188,7 +188,7 @@ public List> dispatch(SCMHeartbeatRequestProto heartbeat) { } } if (LOG.isDebugEnabled()) { - LOG.debug("Heartbeat dispatched: datanode=" + datanodeDetails.getUuid() + ", Commands= " + commands); + LOG.debug("Heartbeat dispatched: datanode {}, Commands: {}", datanodeDetails, commands); } return commands; @@ -320,7 +320,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return this.getDatanodeDetails().getUuid().hashCode(); + return this.getDatanodeDetails().getID().hashCode(); } @Override @@ -368,7 +368,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return this.getDatanodeDetails().getUuid().hashCode(); + return this.getDatanodeDetails().getID().hashCode(); } @Override diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java index 140d45790fcb..59607591be9f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java @@ -41,7 +41,6 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -100,8 +99,8 @@ public class TestDeletedBlockLog { private StorageContainerManager scm; private List dnList; private SCMHADBTransactionBuffer scmHADBTransactionBuffer; - private Map containers = new HashMap<>(); - private Map> replicas = new HashMap<>(); + private final Map containers = new HashMap<>(); + private final Map> replicas = new HashMap<>(); private ScmBlockDeletingServiceMetrics metrics; private static final int THREE = ReplicationFactor.THREE_VALUE; private static final int ONE = ReplicationFactor.ONE_VALUE; @@ -146,7 +145,7 @@ private void setupContainerManager() throws IOException { when(containerManager.getContainerReplicas(any())) .thenAnswer(invocationOnMock -> { ContainerID cid = (ContainerID) invocationOnMock.getArguments()[0]; - return replicas.get(cid.getId()); + return replicas.get(cid); }); when(containerManager.getContainer(any())) .thenAnswer(invocationOnMock -> { @@ -159,7 +158,7 @@ private void setupContainerManager() throws IOException { Map map = (Map) invocationOnMock.getArguments()[0]; for (Map.Entry e : map.entrySet()) { - ContainerInfo info = containers.get(e.getKey().getId()); + ContainerInfo info = containers.get(e.getKey()); try { assertThat(e.getValue()).isGreaterThan(info.getDeleteTransactionId()); } catch (AssertionError err) { @@ -191,9 +190,10 @@ private void updateContainerMetadata(long cid, .setDatanodeDetails(datanodeDetails) .build()) .collect(Collectors.toSet()); - containers.put(cid, container); - containerTable.put(ContainerID.valueOf(cid), container); - replicas.put(cid, replicaSet); + final ContainerID containerID = container.containerID(); + containers.put(containerID, container); + containerTable.put(containerID, container); + replicas.put(containerID, replicaSet); } @AfterEach @@ -226,24 +226,21 @@ private Map> generateData(int dataSize, } private void addTransactions(Map> containerBlocksMap, - boolean shouldFlush) - throws IOException, TimeoutException { + boolean shouldFlush) throws IOException { deletedBlockLog.addTransactions(containerBlocksMap); if (shouldFlush) { scmHADBTransactionBuffer.flush(); } } - private void incrementCount(List txIDs) - throws IOException, TimeoutException { + private void incrementCount(List txIDs) throws IOException { deletedBlockLog.incrementCount(txIDs); scmHADBTransactionBuffer.flush(); // mock scmHADBTransactionBuffer does not flush deletedBlockLog deletedBlockLog.onFlush(); } - private void resetCount(List txIDs) - throws IOException, TimeoutException { + private void resetCount(List txIDs) throws IOException { deletedBlockLog.resetCount(txIDs); scmHADBTransactionBuffer.flush(); deletedBlockLog.onFlush(); @@ -254,7 +251,7 @@ private void commitTransactions( DatanodeDetails... dns) throws IOException { for (DatanodeDetails dnDetails : dns) { deletedBlockLog.getSCMDeletedBlockTransactionStatusManager() - .commitTransactions(transactionResults, dnDetails.getUuid()); + .commitTransactions(transactionResults, dnDetails.getID()); } scmHADBTransactionBuffer.flush(); } @@ -294,7 +291,7 @@ private List getAllTransactions() throws Exception { } private List getTransactions( - int maximumAllowedBlocksNum) throws IOException, TimeoutException { + int maximumAllowedBlocksNum) throws IOException { DatanodeDeletedBlockTransactions transactions = deletedBlockLog.getTransactions(maximumAllowedBlocksNum, new HashSet<>(dnList)); List txns = new LinkedList<>(); @@ -564,8 +561,7 @@ private void recordScmCommandToStatusManager( } private void sendSCMDeleteBlocksCommand(DatanodeID dnId, SCMCommand scmCommand) { - deletedBlockLog.onSent( - DatanodeDetails.newBuilder().setUuid(dnId.getUuid()).build(), scmCommand); + deletedBlockLog.onSent(DatanodeDetails.newBuilder().setID(dnId).build(), scmCommand); } private void assertNoDuplicateTransactions( @@ -619,7 +615,7 @@ private void commitSCMCommandStatus(Long scmCmdId, DatanodeID dnID, .getProtoBufMessage()); deletedBlockLog.getSCMDeletedBlockTransactionStatusManager() - .commitSCMCommandStatus(deleteBlockStatus, dnID.getUuid()); + .commitSCMCommandStatus(deleteBlockStatus, dnID); } private void createDeleteBlocksCommandAndAction( @@ -833,8 +829,7 @@ public void testPersistence() throws Exception { } @Test - public void testDeletedBlockTransactions() - throws IOException, TimeoutException { + public void testDeletedBlockTransactions() throws IOException { deletedBlockLog.setScmCommandTimeoutMs(Long.MAX_VALUE); mockContainerHealthResult(true); int txNum = 10; @@ -888,8 +883,7 @@ public void testDeletedBlockTransactions() } @Test - public void testDeletedBlockTransactionsOfDeletedContainer() - throws IOException, TimeoutException { + public void testDeletedBlockTransactionsOfDeletedContainer() throws IOException { int txNum = 10; List blocks; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMDeleteBlocksCommandStatusManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMDeleteBlocksCommandStatusManager.java index dfb8ad83b53c..a2f969914951 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMDeleteBlocksCommandStatusManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMDeleteBlocksCommandStatusManager.java @@ -26,11 +26,12 @@ import static org.junit.jupiter.api.Assertions.assertNull; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; +import org.apache.hadoop.hdds.protocol.DatanodeID; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -41,8 +42,8 @@ public class TestSCMDeleteBlocksCommandStatusManager { private SCMDeleteBlocksCommandStatusManager manager; - private UUID dnId1; - private UUID dnId2; + private DatanodeID dnId1; + private DatanodeID dnId2; private long scmCmdId1; private long scmCmdId2; private long scmCmdId3; @@ -56,8 +57,8 @@ public class TestSCMDeleteBlocksCommandStatusManager { public void setup() throws Exception { manager = new SCMDeleteBlocksCommandStatusManager(); // Create test data - dnId1 = UUID.randomUUID(); - dnId2 = UUID.randomUUID(); + dnId1 = DatanodeID.randomID(); + dnId2 = DatanodeID.randomID(); scmCmdId1 = 1L; scmCmdId2 = 2L; scmCmdId3 = 3L; @@ -208,10 +209,10 @@ public void testCleanAllTimeoutSCMCommand() { // Transactions in states EXECUTED and NEED_RESEND will be cleaned up // directly, while transactions in states PENDING_EXECUTED and SENT // will be cleaned up after timeout - recordAndSentCommand(manager, dnId1, Arrays.asList(scmCmdId1), - Arrays.asList(deletedBlocksTxIds1)); - recordAndSentCommand(manager, dnId2, Arrays.asList(scmCmdId2), - Arrays.asList(deletedBlocksTxIds2)); + recordAndSentCommand(manager, dnId1, Collections.singletonList(scmCmdId1), + Collections.singletonList(deletedBlocksTxIds1)); + recordAndSentCommand(manager, dnId2, Collections.singletonList(scmCmdId2), + Collections.singletonList(deletedBlocksTxIds2)); Map dn1StatusRecord = manager.getScmCmdStatusRecord().get(dnId1); @@ -238,7 +239,7 @@ public void testCleanAllTimeoutSCMCommand() { private void recordAndSentCommand( SCMDeleteBlocksCommandStatusManager statusManager, - UUID dnId, List scmCmdIds, List> txIds) { + DatanodeID dnId, List scmCmdIds, List> txIds) { assertEquals(scmCmdIds.size(), txIds.size()); for (int i = 0; i < scmCmdIds.size(); i++) { long scmCmdId = scmCmdIds.get(i); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java index 8b20e279a049..c7b9aac971e2 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java @@ -59,9 +59,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.RatisReplicationConfig; @@ -715,7 +713,7 @@ public void testAddContainerWithClosedPipeline() throws Exception { } @Test - public void testPipelineCloseFlow() throws IOException, TimeoutException { + public void testPipelineCloseFlow() throws IOException { LogCapturer logCapturer = LogCapturer.captureLogs(PipelineManagerImpl.class); PipelineManagerImpl pipelineManager = createPipelineManager(true); Pipeline pipeline = pipelineManager.createPipeline( @@ -754,17 +752,17 @@ public void testGetStalePipelines() throws IOException { // For existing pipelines List pipelines = new ArrayList<>(); - UUID[] uuids = new UUID[3]; + final DatanodeID[] ids = new DatanodeID[3]; String[] ipAddresses = new String[3]; String[] hostNames = new String[3]; for (int i = 0; i < 3; i++) { - uuids[i] = UUID.randomUUID(); + ids[i] = DatanodeID.randomID(); ipAddresses[i] = "1.2.3." + (i + 1); hostNames[i] = "host" + i; Pipeline pipeline = mock(Pipeline.class); DatanodeDetails datanodeDetails = mock(DatanodeDetails.class); - when(datanodeDetails.getUuid()).thenReturn(uuids[i]); + when(datanodeDetails.getID()).thenReturn(ids[i]); when(datanodeDetails.getIpAddress()).thenReturn(ipAddresses[i]); when(datanodeDetails.getHostName()).thenReturn(hostNames[i]); List nodes = new ArrayList<>(); @@ -785,8 +783,8 @@ public void testGetStalePipelines() throws IOException { // node with changed uuid DatanodeDetails node0 = mock(DatanodeDetails.class); - UUID changedUUID = UUID.randomUUID(); - when(node0.getUuid()).thenReturn(changedUUID); + DatanodeID changedUUID = DatanodeID.randomID(); + when(node0.getID()).thenReturn(changedUUID); when(node0.getIpAddress()).thenReturn(ipAddresses[0]); when(node0.getHostName()).thenReturn(hostNames[0]); @@ -795,7 +793,7 @@ public void testGetStalePipelines() throws IOException { // node with changed IP DatanodeDetails node1 = mock(DatanodeDetails.class); - when(node1.getUuid()).thenReturn(uuids[0]); + when(node1.getID()).thenReturn(ids[0]); when(node1.getIpAddress()).thenReturn("1.2.3.100"); when(node1.getHostName()).thenReturn(hostNames[0]); @@ -807,7 +805,7 @@ public void testGetStalePipelines() throws IOException { // node with changed host name DatanodeDetails node2 = mock(DatanodeDetails.class); - when(node2.getUuid()).thenReturn(uuids[0]); + when(node2.getID()).thenReturn(ids[0]); when(node2.getIpAddress()).thenReturn(ipAddresses[0]); when(node2.getHostName()).thenReturn("host100"); @@ -819,7 +817,7 @@ public void testGetStalePipelines() throws IOException { } @Test - public void testCloseStalePipelines() throws IOException, TimeoutException { + public void testCloseStalePipelines() throws IOException { SCMHADBTransactionBuffer buffer = new SCMHADBTransactionBufferStub(dbStore); PipelineManagerImpl pipelineManager = @@ -842,8 +840,7 @@ public void testCloseStalePipelines() throws IOException, TimeoutException { } @Test - public void testWaitForAllocatedPipeline() - throws IOException, TimeoutException { + public void testWaitForAllocatedPipeline() throws IOException { SCMHADBTransactionBuffer buffer = new SCMHADBTransactionBufferStub(dbStore); PipelineManagerImpl pipelineManager = From 3ebe937bc5c862d508720e5637a343712eed5c28 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Fri, 16 May 2025 10:16:13 -0700 Subject: [PATCH 2/2] Address review comments. --- .../hdds/scm/block/DeletedBlockLogImpl.java | 15 ++++++++------- .../hdds/scm/pipeline/PipelineManagerImpl.java | 2 +- .../server/SCMDatanodeHeartbeatDispatcher.java | 4 +--- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java index 6a54f0a9b790..ed80cdbdb3f6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java @@ -330,14 +330,16 @@ public void close() { private void getTransaction(DeletedBlocksTransaction tx, DatanodeDeletedBlockTransactions transactions, - Set dnList, Set replicas, + Set replicas, Map> commandStatus) { - for (ContainerReplica replica : replicas) { - final DatanodeID datanodeID = replica.getDatanodeDetails().getID(); - if (!transactionStatusManager.isDuplication(datanodeID, tx.getTxID(), commandStatus)) { - final DeletedBlocksTransaction updatedTxn = DeletedBlocksTransaction.newBuilder(tx) + DeletedBlocksTransaction updatedTxn = + DeletedBlocksTransaction.newBuilder(tx) .setCount(transactionStatusManager.getRetryCount(tx.getTxID())) .build(); + for (ContainerReplica replica : replicas) { + final DatanodeID datanodeID = replica.getDatanodeDetails().getID(); + if (!transactionStatusManager.isDuplication( + datanodeID, tx.getTxID(), commandStatus)) { transactions.addTransactionToDN(datanodeID, updatedTxn); metrics.incrProcessedTransaction(); } @@ -453,8 +455,7 @@ public DatanodeDeletedBlockTransactions getTransactions( metrics.incrSkippedTransaction(); continue; } - getTransaction( - txn, transactions, dnList, replicas, commandStatus); + getTransaction(txn, transactions, replicas, commandStatus); } else if (txn.getCount() >= maxRetry || containerManager.getContainer(id).isOpen()) { metrics.incrSkippedTransaction(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java index 925833597337..2b7b08235052 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java @@ -903,7 +903,7 @@ private void recordMetricsForPipeline(Pipeline pipeline) { metrics.incNumPipelineContainSameDatanodes(); //TODO remove until pipeline allocation is proved equally distributed. for (Pipeline overlapPipeline : overlapPipelines) { - LOG.info("{} and {} have exactly the same datanodes: {}", + LOG.info("{} and {} have exactly the same set of datanodes: {}", pipeline.getId(), overlapPipeline.getId(), pipeline.getNodeSet()); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java index 40d60d0a9711..1f73de568e43 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java @@ -187,9 +187,7 @@ public List> dispatch(SCMHeartbeatRequestProto heartbeat) { } } } - if (LOG.isDebugEnabled()) { - LOG.debug("Heartbeat dispatched: datanode {}, Commands: {}", datanodeDetails, commands); - } + LOG.debug("Heartbeat dispatched for datanode {} with commands {}", datanodeDetails, commands); return commands; }