Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -326,24 +325,22 @@ public void addTransactions(Map<Long, List<Long>> containerBlocksMap)
}

@Override
public void close() throws IOException {
public void close() {
}

private void getTransaction(DeletedBlocksTransaction tx,
DatanodeDeletedBlockTransactions transactions,
Set<DatanodeDetails> dnList, Set<ContainerReplica> replicas,
Map<UUID, Map<Long, CmdStatus>> commandStatus) {
Set<ContainerReplica> replicas,
Map<DatanodeID, Map<Long, CmdStatus>> commandStatus) {
DeletedBlocksTransaction updatedTxn =
DeletedBlocksTransaction.newBuilder(tx)
.setCount(transactionStatusManager.getOrDefaultRetryCount(
tx.getTxID(), 0))
.setCount(transactionStatusManager.getRetryCount(tx.getTxID()))
.build();

for (ContainerReplica replica : replicas) {
DatanodeDetails details = replica.getDatanodeDetails();
final DatanodeID datanodeID = replica.getDatanodeDetails().getID();
if (!transactionStatusManager.isDuplication(
details, updatedTxn.getTxID(), commandStatus)) {
transactions.addTransactionToDN(details.getID(), updatedTxn);
datanodeID, tx.getTxID(), commandStatus)) {
transactions.addTransactionToDN(datanodeID, updatedTxn);
metrics.incrProcessedTransaction();
}
}
Expand Down Expand Up @@ -373,7 +370,7 @@ private Boolean checkInadequateReplica(Set<ContainerReplica> replicas,
if (!dnList.contains(datanodeDetails)) {
DatanodeDetails dnDetail = replica.getDatanodeDetails();
LOG.debug("Skip Container = {}, because DN = {} is not in dnList.",
containerId, dnDetail.getUuid());
containerId, dnDetail);
return true;
}
}
Expand Down Expand Up @@ -426,10 +423,10 @@ public DatanodeDeletedBlockTransactions getTransactions(

// Get the CmdStatus status of the aggregation, so that the current
// status of the specified transaction can be found faster
Map<UUID, Map<Long, CmdStatus>> commandStatus =
final Map<DatanodeID, Map<Long, CmdStatus>> commandStatus =
getSCMDeletedBlockTransactionStatusManager()
.getCommandStatusByTxId(dnList.stream().
map(DatanodeDetails::getUuid).collect(Collectors.toSet()));
map(DatanodeDetails::getID).collect(Collectors.toSet()));
ArrayList<Long> txIDs = new ArrayList<>();
metrics.setNumBlockDeletionTransactionDataNodes(dnList.size());
Table.KeyValue<Long, DeletedBlocksTransaction> keyValue = null;
Expand Down Expand Up @@ -458,8 +455,7 @@ public DatanodeDeletedBlockTransactions getTransactions(
metrics.incrSkippedTransaction();
continue;
}
getTransaction(
txn, transactions, dnList, replicas, commandStatus);
getTransaction(txn, transactions, replicas, commandStatus);
} else if (txn.getCount() >= maxRetry || containerManager.getContainer(id).isOpen()) {
metrics.incrSkippedTransaction();
}
Expand Down Expand Up @@ -510,7 +506,7 @@ public void setScmCommandTimeoutMs(long scmCommandTimeoutMs) {
public void recordTransactionCreated(DatanodeID dnId, long scmCmdId,
Set<Long> dnTxSet) {
getSCMDeletedBlockTransactionStatusManager()
.recordTransactionCreated(dnId.getUuid(), scmCmdId, dnTxSet);
.recordTransactionCreated(dnId, scmCmdId, dnTxSet);
}

@Override
Expand All @@ -520,7 +516,7 @@ public int getTransactionToDNsCommitMapSize() {

@Override
public void onDatanodeDead(DatanodeID dnId) {
getSCMDeletedBlockTransactionStatusManager().onDatanodeDead(dnId.getUuid());
getSCMDeletedBlockTransactionStatusManager().onDatanodeDead(dnId);
}

@Override
Expand All @@ -546,7 +542,7 @@ public void onMessage(
ContainerBlocksDeletionACKProto ackProto =
commandStatus.getBlockDeletionAck();
getSCMDeletedBlockTransactionStatusManager()
.commitTransactions(ackProto.getResultsList(), dnId.getUuid());
.commitTransactions(ackProto.getResultsList(), dnId);
metrics.incrBlockDeletionCommandSuccess();
metrics.incrDNCommandsSuccess(dnId, 1);
} else if (status == CommandStatus.Status.FAILED) {
Expand All @@ -558,7 +554,7 @@ public void onMessage(
}

getSCMDeletedBlockTransactionStatusManager()
.commitSCMCommandStatus(deleteBlockStatus.getCmdStatus(), dnId.getUuid());
.commitSCMCommandStatus(deleteBlockStatus.getCmdStatus(), dnId);
} finally {
lock.unlock();
}
Expand Down
Loading