Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -332,7 +331,7 @@ public void close() throws IOException {
private void getTransaction(DeletedBlocksTransaction tx,
DatanodeDeletedBlockTransactions transactions,
Set<DatanodeDetails> dnList, Set<ContainerReplica> replicas,
Map<UUID, Map<Long, CmdStatus>> commandStatus) {
Map<DatanodeID, Map<Long, CmdStatus>> commandStatus) {
DeletedBlocksTransaction updatedTxn =
DeletedBlocksTransaction.newBuilder(tx)
.setCount(transactionStatusManager.getOrDefaultRetryCount(
Expand Down Expand Up @@ -373,7 +372,7 @@ private Boolean checkInadequateReplica(Set<ContainerReplica> replicas,
if (!dnList.contains(datanodeDetails)) {
DatanodeDetails dnDetail = replica.getDatanodeDetails();
LOG.debug("Skip Container = {}, because DN = {} is not in dnList.",
containerId, dnDetail.getUuid());
containerId, dnDetail.getID());
return true;
}
}
Expand Down Expand Up @@ -426,10 +425,10 @@ public DatanodeDeletedBlockTransactions getTransactions(

// Get the CmdStatus status of the aggregation, so that the current
// status of the specified transaction can be found faster
Map<UUID, Map<Long, CmdStatus>> commandStatus =
Map<DatanodeID, Map<Long, CmdStatus>> commandStatus =
getSCMDeletedBlockTransactionStatusManager()
.getCommandStatusByTxId(dnList.stream().
map(DatanodeDetails::getUuid).collect(Collectors.toSet()));
map(DatanodeDetails::getID).collect(Collectors.toSet()));
ArrayList<Long> txIDs = new ArrayList<>();
metrics.setNumBlockDeletionTransactionDataNodes(dnList.size());
Table.KeyValue<Long, DeletedBlocksTransaction> keyValue = null;
Expand Down Expand Up @@ -510,7 +509,7 @@ public void setScmCommandTimeoutMs(long scmCommandTimeoutMs) {
public void recordTransactionCreated(DatanodeID dnId, long scmCmdId,
Set<Long> dnTxSet) {
getSCMDeletedBlockTransactionStatusManager()
.recordTransactionCreated(dnId.getUuid(), scmCmdId, dnTxSet);
.recordTransactionCreated(dnId, scmCmdId, dnTxSet);
}

@Override
Expand All @@ -520,7 +519,7 @@ public int getTransactionToDNsCommitMapSize() {

@Override
public void onDatanodeDead(DatanodeID dnId) {
getSCMDeletedBlockTransactionStatusManager().onDatanodeDead(dnId.getUuid());
getSCMDeletedBlockTransactionStatusManager().onDatanodeDead(dnId);
}

@Override
Expand All @@ -546,7 +545,7 @@ public void onMessage(
ContainerBlocksDeletionACKProto ackProto =
commandStatus.getBlockDeletionAck();
getSCMDeletedBlockTransactionStatusManager()
.commitTransactions(ackProto.getResultsList(), dnId.getUuid());
.commitTransactions(ackProto.getResultsList(), dnId);
metrics.incrBlockDeletionCommandSuccess();
metrics.incrDNCommandsSuccess(dnId, 1);
} else if (status == CommandStatus.Status.FAILED) {
Expand All @@ -558,7 +557,7 @@ public void onMessage(
}

getSCMDeletedBlockTransactionStatusManager()
.commitSCMCommandStatus(deleteBlockStatus.getCmdStatus(), dnId.getUuid());
.commitSCMCommandStatus(deleteBlockStatus.getCmdStatus(), dnId);
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
import org.apache.hadoop.hdds.scm.container.ContainerID;
Expand All @@ -58,7 +58,7 @@ public class SCMDeletedBlockTransactionStatusManager {
private static final Logger LOG =
LoggerFactory.getLogger(SCMDeletedBlockTransactionStatusManager.class);
// Maps txId to set of DNs which are successful in committing the transaction
private final Map<Long, Set<UUID>> transactionToDNsCommitMap;
private final Map<Long, Set<DatanodeID>> transactionToDNsCommitMap;
// Maps txId to its retry counts;
private final Map<Long, Integer> transactionToRetryCountMap;
// The access to DeletedBlocksTXTable is protected by
Expand Down Expand Up @@ -100,7 +100,7 @@ public SCMDeletedBlockTransactionStatusManager(
protected static class SCMDeleteBlocksCommandStatusManager {
private static final Logger LOG =
LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class);
private final Map<UUID, Map<Long, CmdStatusData>> scmCmdStatusRecord;
private final Map<DatanodeID, Map<Long, CmdStatusData>> scmCmdStatusRecord;

private static final CmdStatus DEFAULT_STATUS = TO_BE_SENT;
private static final Set<CmdStatus> STATUSES_REQUIRING_TIMEOUT =
Expand Down Expand Up @@ -128,14 +128,14 @@ public enum CmdStatus {
}

protected static final class CmdStatusData {
private final UUID dnId;
private final DatanodeID dnId;
private final long scmCmdId;
private final Set<Long> deletedBlocksTxIds;
private Instant updateTime;
private CmdStatus status;

private CmdStatusData(
UUID dnId, long scmTxID, Set<Long> deletedBlocksTxIds) {
DatanodeID dnId, long scmTxID, Set<Long> deletedBlocksTxIds) {
this.dnId = dnId;
this.scmCmdId = scmTxID;
this.deletedBlocksTxIds = deletedBlocksTxIds;
Expand All @@ -146,7 +146,7 @@ public Set<Long> getDeletedBlocksTxIds() {
return Collections.unmodifiableSet(deletedBlocksTxIds);
}

public UUID getDnId() {
public DatanodeID getDnId() {
return dnId;
}

Expand Down Expand Up @@ -180,7 +180,7 @@ public String toString() {
}

protected static CmdStatusData createScmCmdStatusData(
UUID dnId, long scmCmdId, Set<Long> deletedBlocksTxIds) {
DatanodeID dnId, long scmCmdId, Set<Long> deletedBlocksTxIds) {
return new CmdStatusData(dnId, scmCmdId, deletedBlocksTxIds);
}

Expand All @@ -190,37 +190,37 @@ protected void recordScmCommand(CmdStatusData statusData) {
new ConcurrentHashMap<>()).put(statusData.getScmCmdId(), statusData);
}

protected void onSent(UUID dnId, long scmCmdId) {
protected void onSent(DatanodeID dnId, long scmCmdId) {
updateStatus(dnId, scmCmdId, CommandStatus.Status.PENDING);
}

protected void onDatanodeDead(UUID dnId) {
protected void onDatanodeDead(DatanodeID dnId) {
LOG.info("Clean SCMCommand record for Datanode: {}", dnId);
scmCmdStatusRecord.remove(dnId);
}

protected void updateStatusByDNCommandStatus(UUID dnId, long scmCmdId,
protected void updateStatusByDNCommandStatus(DatanodeID dnId, long scmCmdId,
CommandStatus.Status newState) {
updateStatus(dnId, scmCmdId, newState);
}

protected void cleanAllTimeoutSCMCommand(long timeoutMs) {
for (UUID dnId : scmCmdStatusRecord.keySet()) {
for (DatanodeID dnId : scmCmdStatusRecord.keySet()) {
for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
removeTimeoutScmCommand(
dnId, getScmCommandIds(dnId, status), timeoutMs);
}
}
}

public void cleanTimeoutSCMCommand(UUID dnId, long timeoutMs) {
public void cleanTimeoutSCMCommand(DatanodeID dnId, long timeoutMs) {
for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) {
removeTimeoutScmCommand(
dnId, getScmCommandIds(dnId, status), timeoutMs);
}
}

private Set<Long> getScmCommandIds(UUID dnId, CmdStatus status) {
private Set<Long> getScmCommandIds(DatanodeID dnId, CmdStatus status) {
Set<Long> scmCmdIds = new HashSet<>();
Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
if (record == null) {
Expand All @@ -234,15 +234,15 @@ private Set<Long> getScmCommandIds(UUID dnId, CmdStatus status) {
return scmCmdIds;
}

private Instant getUpdateTime(UUID dnId, long scmCmdId) {
private Instant getUpdateTime(DatanodeID dnId, long scmCmdId) {
Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
if (record == null || record.get(scmCmdId) == null) {
return null;
}
return record.get(scmCmdId).getUpdateTime();
}

private void updateStatus(UUID dnId, long scmCmdId,
private void updateStatus(DatanodeID dnId, long scmCmdId,
CommandStatus.Status newStatus) {
Map<Long, CmdStatusData> recordForDn = scmCmdStatusRecord.get(dnId);
if (recordForDn == null) {
Expand Down Expand Up @@ -309,7 +309,7 @@ private void updateStatus(UUID dnId, long scmCmdId,
}
}

private void removeTimeoutScmCommand(UUID dnId,
private void removeTimeoutScmCommand(DatanodeID dnId,
Set<Long> scmCmdIds, long timeoutMs) {
Instant now = Instant.now();
for (Long scmCmdId : scmCmdIds) {
Expand All @@ -323,7 +323,7 @@ private void removeTimeoutScmCommand(UUID dnId,
}
}

private CmdStatusData removeScmCommand(UUID dnId, long scmCmdId) {
private CmdStatusData removeScmCommand(DatanodeID dnId, long scmCmdId) {
Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
if (record == null || record.get(scmCmdId) == null) {
return null;
Expand All @@ -333,12 +333,12 @@ private CmdStatusData removeScmCommand(UUID dnId, long scmCmdId) {
return statusData;
}

public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId(
Set<UUID> dnIds) {
Map<UUID, Map<Long, CmdStatus>> result =
public Map<DatanodeID, Map<Long, CmdStatus>> getCommandStatusByTxId(
Set<DatanodeID> dnIds) {
Map<DatanodeID, Map<Long, CmdStatus>> result =
new HashMap<>(scmCmdStatusRecord.size());

for (UUID dnId : dnIds) {
for (DatanodeID dnId : dnIds) {
Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
if (record == null) {
continue;
Expand All @@ -361,7 +361,7 @@ private void clear() {
}

@VisibleForTesting
Map<UUID, Map<Long, CmdStatusData>> getScmCmdStatusRecord() {
Map<DatanodeID, Map<Long, CmdStatusData>> getScmCmdStatusRecord() {
return scmCmdStatusRecord;
}
}
Expand Down Expand Up @@ -401,16 +401,16 @@ public int getOrDefaultRetryCount(long txID, int defaultValue) {

public void onSent(DatanodeDetails dnId, SCMCommand<?> scmCommand) {
scmDeleteBlocksCommandStatusManager.onSent(
dnId.getUuid(), scmCommand.getId());
dnId.getID(), scmCommand.getId());
}

public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId(
Set<UUID> dnIds) {
public Map<DatanodeID, Map<Long, CmdStatus>> getCommandStatusByTxId(
Set<DatanodeID> dnIds) {
return scmDeleteBlocksCommandStatusManager.getCommandStatusByTxId(dnIds);
}

public void recordTransactionCreated(
UUID dnId, long scmCmdId, Set<Long> dnTxSet) {
DatanodeID dnId, long scmCmdId, Set<Long> dnTxSet) {
scmDeleteBlocksCommandStatusManager.recordScmCommand(
SCMDeleteBlocksCommandStatusManager
.createScmCmdStatusData(dnId, scmCmdId, dnTxSet));
Expand All @@ -428,20 +428,20 @@ public void cleanAllTimeoutSCMCommand(long timeoutMs) {
scmDeleteBlocksCommandStatusManager.cleanAllTimeoutSCMCommand(timeoutMs);
}

public void onDatanodeDead(UUID dnId) {
public void onDatanodeDead(DatanodeID dnId) {
scmDeleteBlocksCommandStatusManager.onDatanodeDead(dnId);
}

public boolean isDuplication(DatanodeDetails dnDetail, long tx,
Map<UUID, Map<Long, CmdStatus>> commandStatus) {
if (alreadyExecuted(dnDetail.getUuid(), tx)) {
Map<DatanodeID, Map<Long, CmdStatus>> commandStatus) {
if (alreadyExecuted(dnDetail.getID(), tx)) {
return true;
}
return inProcessing(dnDetail.getUuid(), tx, commandStatus);
return inProcessing(dnDetail.getID(), tx, commandStatus);
}

public boolean alreadyExecuted(UUID dnId, long txId) {
Set<UUID> dnsWithTransactionCommitted =
public boolean alreadyExecuted(DatanodeID dnId, long txId) {
Set<DatanodeID> dnsWithTransactionCommitted =
transactionToDNsCommitMap.get(txId);
return dnsWithTransactionCommitted != null && dnsWithTransactionCommitted
.contains(dnId);
Expand All @@ -456,10 +456,10 @@ public boolean alreadyExecuted(UUID dnId, long txId) {
*/
@VisibleForTesting
public void commitTransactions(
List<DeleteBlockTransactionResult> transactionResults, UUID dnId) {
List<DeleteBlockTransactionResult> transactionResults, DatanodeID dnId) {

ArrayList<Long> txIDsToBeDeleted = new ArrayList<>();
Set<UUID> dnsWithCommittedTxn;
Set<DatanodeID> dnsWithCommittedTxn;
for (DeleteBlockTransactionResult transactionResult :
transactionResults) {
if (isTransactionFailed(transactionResult)) {
Expand Down Expand Up @@ -494,9 +494,9 @@ public void commitTransactions(
// the nodes returned in the pipeline match the replication factor.
if (min(replicas.size(), dnsWithCommittedTxn.size())
>= container.getReplicationConfig().getRequiredNodes()) {
List<UUID> containerDns = replicas.stream()
List<DatanodeID> containerDns = replicas.stream()
.map(ContainerReplica::getDatanodeDetails)
.map(DatanodeDetails::getUuid)
.map(DatanodeDetails::getID)
.collect(Collectors.toList());
if (dnsWithCommittedTxn.containsAll(containerDns)) {
transactionToDNsCommitMap.remove(txID);
Expand Down Expand Up @@ -527,21 +527,21 @@ public void commitTransactions(

@VisibleForTesting
public void commitSCMCommandStatus(List<CommandStatus> deleteBlockStatus,
UUID dnId) {
DatanodeID dnId) {
processSCMCommandStatus(deleteBlockStatus, dnId);
scmDeleteBlocksCommandStatusManager.
cleanTimeoutSCMCommand(dnId, scmCommandTimeoutMs);
}

private boolean inProcessing(UUID dnId, long deletedBlocksTxId,
Map<UUID, Map<Long, CmdStatus>> commandStatus) {
private boolean inProcessing(DatanodeID dnId, long deletedBlocksTxId,
Map<DatanodeID, Map<Long, CmdStatus>> commandStatus) {
Map<Long, CmdStatus> deletedBlocksTxStatus = commandStatus.get(dnId);
return deletedBlocksTxStatus != null &&
deletedBlocksTxStatus.get(deletedBlocksTxId) != null;
}

private void processSCMCommandStatus(List<CommandStatus> deleteBlockStatus,
UUID dnID) {
DatanodeID dnID) {
Map<Long, CommandStatus> lastStatus = new HashMap<>();
Map<Long, CommandStatus.Status> summary = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy;
import org.apache.hadoop.hdds.scm.container.ContainerID;
Expand Down Expand Up @@ -81,9 +81,9 @@ protected int compareByUsage(DatanodeUsageInfo a, DatanodeUsageInfo b) {
if (ret != 0) {
return ret;
}
UUID uuidA = a.getDatanodeDetails().getUuid();
UUID uuidB = b.getDatanodeDetails().getUuid();
return uuidA.compareTo(uuidB);
DatanodeID idA = a.getDatanodeDetails().getID();
DatanodeID idB = b.getDatanodeDetails().getID();
return idA.compareTo(idB);
}

private void setConfiguration(ContainerBalancerConfiguration conf) {
Expand Down Expand Up @@ -228,8 +228,7 @@ public void increaseSizeEntering(DatanodeDetails target, long size) {
}
return;
}
logger.warn("Cannot find {} in the candidates target nodes",
target.getUuid());
logger.warn("Cannot find {} in the candidates target nodes", target.getID());
}

/**
Expand Down
Loading